import Observable from 'zen-observable-ts';
INTERNAL_AWS_APPSYNC_REALTIME_PUBSUB_PROVIDER,
} from '@aws-amplify/core';
import { PubSubProvider, PubSubOptions, ProviderOptions } from './types';
import { AWSAppSyncRealTimeProvider } from './Providers';
const { isNode } = browserOrNode();
const logger = new Logger('PubSub');
export class PubSubClass {
private _options: PubSubOptions;
private _pluggables: PubSubProvider[];
private _awsAppSyncRealTimeProvider?: AWSAppSyncRealTimeProvider;
private get awsAppSyncRealTimeProvider() {
if (!this._awsAppSyncRealTimeProvider) {
this._awsAppSyncRealTimeProvider = new AWSAppSyncRealTimeProvider(
return this._awsAppSyncRealTimeProvider;
constructor(options?: PubSubOptions) {
this._options = options ?? {};
logger.debug('PubSub Options', this._options);
this.subscribe = this.subscribe.bind(this);
configure(options: PubSubOptions) {
const opt: Record<string, unknown> = options
? options.PubSub || options
logger.debug('configure PubSub', { opt });
this._options = Object.assign({}, this._options, opt);
this._pluggables.map(pluggable => pluggable.configure(this._options));
public async addPluggable(pluggable: PubSubProvider) {
if (pluggable && pluggable.getCategory() === 'PubSub') {
this._pluggables.push(pluggable);
const config = pluggable.configure(this._options);
removePluggable(providerName: string): void {
this._pluggables = this._pluggables.filter(
pluggable => pluggable.getProviderName() !== providerName
private getProviderByName(providerName: string | symbol) {
if (providerName === INTERNAL_AWS_APPSYNC_REALTIME_PUBSUB_PROVIDER) {
return this.awsAppSyncRealTimeProvider;
return this._pluggables.find(
pluggable => pluggable.getProviderName() === providerName
private getProviders(options: ProviderOptions = {}) {
const providerName = options.provider;
const provider = this.getProviderByName(providerName);
throw new Error(`Could not find provider named ${String(providerName)}`);
topics: string[] | string,
msg: Record<string, unknown> | string,
options?: ProviderOptions
this.getProviders(options).map(provider =>
provider.publish(topics, msg, options)
topics: string[] | string,
options?: ProviderOptions
): Observable<Record<string, unknown>> {
if (isNode && this._options && this._options.ssr) {
'Subscriptions are not supported for Server-Side Rendering (SSR)'
logger.debug('subscribe options', options);
const providers = this.getProviders(options);
return new Observable<Record<string, unknown>>(observer => {
const observables = providers.map(provider => ({
observable: provider.subscribe(topics, options),
const subscriptions = observables.map(({ provider, observable }) =>
next: (value: Record<string, unknown>) =>
observer.next({ provider, value }),
error: (error: Record<string, unknown>) =>
observer.error({ provider, error }),
subscriptions.forEach(subscription => subscription.unsubscribe());
export const PubSub = new PubSubClass();
Amplify.register(PubSub);