RxJS multicast operators, better known as sharing operators, are probably the most complicated topic to understand in the jungle that is RxJS. In this article, I’ll try to clarify the subject by looking at it in a different way.

The key to really comprehend them is to understand the mechanism behind them, and the problem which they solve.

Let’s start from the beginning with a high-level explanation of RxJS building blocks:

Observables 101

In RxJS, observables are cold by default. What it means is that each time we call subscribe() we execute the underlying producer function.

To understand this better, let’s create a basic Observable implementation:

class Observable {
constructor(subscriptionFn) {
this.subscriptionFn = subscriptionFn;
}

subscribe(observer) {
return this.subscriptionFn(observer);
}
}

The Observable class considers a single parameter — a subscription function. The subscription function is named as such because it’s invoked by our Observable whenever someone calls subscribe().

Sometimes people refer to the subscription function as the “producer” function, because it also produces the values for the observer.

The subscription function receives an observer. It’s simply a plain object with three optional methods: next()error(), and complete().

The subscribe() method takes an observer and calls the subscription function with it. Note that we’re not going to discuss the unsubscribeprocess, but in a nutshell, each subscription should be disposable, returning an a function, which is responsible for any cleanup that should be performed once the subscription is no longer needed.

It’s as simple as that. Now, let’s create an observable that wraps the native XHR API:

function http(url) {
// This function will be called when we call http().subscribe()
const subscriptionFn = observer => {
log('Observable execution: http');
const xhr = new XMLHttpRequest();
xhr.addEventListener('load', () => {
if (xhr.readyState === 4 && xhr.status === 200) {
observer.next(JSON.parse(xhr.responseText));
observer.complete();
}
});
xhr.open('GET', url);
xhr.send();

return () => xhr.abort()
}

return new Observable(subscriptionFn);
}

The http implementation is pretty straightforward. We create a function that takes a URL, sends an HTTP request, and returns a new Observablethat will emit the response when it succeeds.

Now, based on our observable implementation, what do you think will happen when we subscribe to our http observable twice?

// A small observer helper
const observer = tag => ({
next(value) {
console.log(`${tag}:`, value);
}
});

http('https://jsonplaceholder.typicode.com/users')
.subscribe(observer('subscriber-1'));

http('https://jsonplaceholder.typicode.com/users')
.subscribe(observer('subscriber-2'));

If you said we’ll see two outgoing requests in the network panel, you got it right. When we review the implementation of our Observable class, we can clearly understand why this happens; Each subscriber invokes the subscription function, which in this case causes the XHR call to run twice — one for each subscriber.

Operators 101

Operators are essentially observables. The only difference is that, as the name suggests, operators operate on a source observable.

Building your own operator is as simple as writing a function:


function map(fn) {
return source => {
return new Observable(observer => {
log('Observable execution: map');
return source.subscribe({
next(value) {
observer.next(fn(value));
}
});
});
};
}

The map() function returns a function that takes a provided source observable and returns a new observable, which is subscribed to the source.

Before we can use our operator, we need to create the pipe() method:

class Observable {
constructor(subscriptionFn) {
this.subscriptionFn = subscriptionFn;
}

subscribe(observer) {
return this.subscriptionFn(observer);
}

pipe(...operators) {
return operators.reduce((source, next) => next(source), this);
}
}

Yes, it’s as easy as that — only one line of code! The pipe method takes an array of operators, loops over it, and each time invokes the next operator, passing it the result of the previous one as its source.

If we use this in our example, we get the following expression:


map(http): Observable
http('https://jsonplaceholder.typicode.com/users')
.pipe(map(res => res[0]))
.subscribe(observer('subscriber'));

When we call subscribe() we’re executing the map Observable, which itself subscribes to the http observable. When the http source observable emits a new value, the value first reaches the map subscription function. Then, after applying the projection function on the value, the map Observable emits the value to the last subscription. This is called the observable chain.

If we subscribe to our observable twice this time, we’ll invoke each one of the observables in the chain twice:

const firstUser$ = http('https://jsonplaceholder.typicode.com/users')
.pipe(map(res => res[0]));

firstUser$.subscribe(observer('subscriber-1'));
firstUser$.subscribe(observer('subscriber-2'));

However, what if we don’t want this behavior? What if we want to run the subscription function only once, regardless of how many subscribers it has?

For instance, what if we want to make a single HTTP call and share (i.e., multicast) the result with all the subscribers. That’s one example of an instance where we need to use a Subject.

Subjects 101

Subject might seem like an intimidating entity in RxJS, but the truth is that it’s a fairly simple concept — a Subject is both an observable and an observer.

It’s an observable because it implements the subscribe() method, and it’s also an observer because it implements the observer interface — next()error(), and complete().

Let’s create a simple implementation of a Subject:

class Subject extends Observable {
constructor() {
super();
this.observers = [];
}

subscribe(observer) {
this.observers.push(observer);
}

next(value) {
this.observers.forEach(observer => observer.next(value));
}

error(error) {
this.observers.forEach(observer => observer.error(error));
}

complete() {
this.observers.forEach(observer => observer.complete());
}
}

A subject can act as a proxy between the source observable and many observers, making it possible for multiple observers to share the same observable execution.

Let’s modify our example to use a Subject and see how it works:

const subject = new Subject();
subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

http('https://jsonplaceholder.typicode.com/users')
.pipe(map(res => res[0]))
.subscribe(subject);

When we call the subject subscribe() method, it makes one simple operation: It pushes our observer into the observers’ array.

Then, because a Subject also implements the observer pattern, it can subscribe to the source observable. When the source observable emits, it’ll call the subject next() method which will result in a new notification for each one of the Subject’s subscribers.

So now we’ll have a single execution of the original subscription function, and therefore we’ll see only one HTTP request in the network panel.

You Were Late to the Party: Late Subscribers

What happens if the source observable had already emitted a value before you subscribed?

We can’t demonstrate this with our previous example, as the http observable is async, and therefore we’re always subscribed to it before it emits the value.

Let’s quickly create the of observable to help us show this case:

function of(...values) {
return new Observable(observer => {
log('Observable execution: of');
values.forEach(value => observer.next(value));
});
}

The of observable gets an array of values and emits them synchronously one by one. Now let’s subscribe to the subject after it subscribes to the ofobservable:


const subject = new Subject();
of(1, 2, 3).subscribe(subject);

subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

Our subscribers don’t receive any value whatsoever. The reason for this is that our Subject implementation doesn’t support late subscribers. When the of observable emitted the values, the subscribers hadn’t registered yet, which caused the values to be missed.

A real-world example from Angular can be when a source observable emits, but you have a component that at the time doesn’t exist on the page because of a falsy ngIf. When it’s eventually added to the page, it subscribes to the source, but it won’t get any previously emitted value.

One way to solve it is by using a ShareReplay Subject. Let’s create a naive implementation for this, and see how it works:

class ReplaySubject extends Subject {
constructor(bufferSize) {
super();
this.observers = [];
this.bufferSize = bufferSize;
this.buffer = [];
}

subscribe(observer) {
this.buffer.forEach(val => observer.next(val));
this.observers.push(observer);
}

next(value) {
if (this.buffer.length === this.bufferSize) {
this.buffer.shift();
}

this.buffer.push(value);
this.observers.forEach(observer => observer.next(value));
}
}

The concept is relatively simple. As the name suggests, ReplaySubject is a special subject that “replays,” i.e., emit old values, to any new subscribers.

Each notification is broadcast to all subscribers and saved for any futureobservers, subject to the buffer size policy.

Let’s refactor our previous example and use a ReplaySubject:

const subject = new ReplaySubject(3);
of(1, 2, 3).subscribe(subject);

subject.subscribe(observer('subscriber1'));
subject.subscribe(observer('subscriber2'));

Now the result is different. Despite subscribing after the source emitted the values, we still get all of them.

We can summarize that ReplaySubject functionality is to multicast the source values to all of its subscribers, and cache the values (based on the buffer size) to make it possible for late subscribers to receive them.

Before you continue, I recommend you to try to create a BehaviorSubject by yourself. I will include the solution in the complete code gist.

Now it’s time to introduce the multicast RxJS operators, and hopefully, the examples we’ve looked at will help you understand them more easily.

Multicast Operators

Let’s explain them one by one:

Multicast and Connect

The multicast() operator takes a Subject and uses it to share the source execution:

import { interval, Subject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';

const connectableObservable = interval(1000).pipe(
multicast(new Subject())
)

const observer1 = connectableObservable.subscribe(log);
const observer2 = connectableObservable.subscribe(log);

const connectableSubscription = (connectableObservable as ConnectableObservable<any>)
.connect();

The multicast returns what’s known as a ConnectableObservable, which has a connect() method. It has one simple job – subscribes to the source with the provided subject:

connect() {
return source.subcribe(providedSubject);
}

The connect() method makes it possible to control when to start the execution of the source Observable. One crucial thing to remember is that in this case, to unsubscribe from the source, we need to unsubscribe from the connectable subscription:

 connectableSubscription.unsubscribe(); 

We’re not limited to a plain Subject. We can replace it with any other type of subject, such as a ReplaySubject, for example:

import { interval, ReplaySubject, ConnectableObservable } from 'rxjs';
import { multicast } from 'rxjs/operators';

const connectableObservable = interval(1000).pipe(
multicast(new ReplaySubject(1))
)

const observer1 = connectableObservable.subscribe(log);

setTimeout(() => {
// Late subscriber
connectableObservable.subscribe(log);
}, 3000)

const connectable = (connectableObservable as ConnectableObservable<any>).connect();

Based on the previous example, you can infer what’s happening here under the hood.

Keep in mind that when using multicast(), we can either pass a Subject or a factory function that returns a new Subject. We can’t reuse a subject when it already saw complete; therefore, when we use a factory function, we make it reusable in case that the source completed, and we need to subscribe again.

interval(1000).pipe(
    multicast(() => new Subject())
)

RefCount

When we use the multicast() operator, we’re responsible for calling connect() to start the source observable execution. In addition to that, we’re also responsible for avoiding memory leaks by manually unsubscribing from the connectable subscription.

It would be more efficient and would help reduce errors, to make this process automatic. Luckily the good people in RxJS have already thought of this, and they’ve created the refCount operator.

The refCount operator is based on reference counting; It looks at the number of current subscribers. If that number changes from zero to one, it calls connect(), i.e., subscribes to the source. If that number changes back to zero, it unsubscribes.

const source = interval(1000).pipe(
multicast(new Subject()),
refCount()
)

// refCount === 1 => source.subscribe();
const observer1 = source.subscribe(log);

// refCount === 2
const observer2 = source.subscribe(log);

setTimeout(() => {
// refCount - 1
observer1.unsubscribe();
// refCount - 1
observer2.unsubscribe();
// refCount === 0 => source.unsubcribe();
}, 3000)

Note that when using refCount, we get back a normal observable, instead of a ConnectableObservable.

Publish and Its Variants

Because using multicast() + Subject + refCount() is a typical pattern in RxJS, the team there have created shortcut operators to make it easier for us. Let’s go over the different variations we have:

  • publish() — is a shortcut for multicast(() => new Subject()):
const connectableObservable = interval(1000).pipe(
    publish()
)

connectableObservable.connect();

publishBehavior() — is a shortcut for multicast(new BehaviorSubject()):

const connectableObservable = interval(1000).pipe(
publishBehavior(100)
)

connectableObservable.connect();

publishReplay(x) — is a shortcut for multicast(() => new ReplaySubject(x)):

const connectableObservable = interval(1000).pipe(
publishReplay(3)
)

connectableObservable.connect();

publishLast() — is a shortcut for multicast(new AsyncSubject()):

const connectableObservable = interval(1000).pipe(
take(2),
publishLast()
)

connectableObservable.connect();

share() — is a shortcut for multicast(() => new Subject()) + refCount():

const source = interval(1000).pipe(
share()
)

shareReplay(bufferSize) — is a multicasting operator that uses a ReplaySubject(). It doesn’t internally use the multicast operator itself, and as a result it always returns an observable, rather than a ConnectableObservable. It can be used either with a refCount, or without it. Here are both variations:


interval(1000).pipe(
shareReplay({ refCount: true, bufferSize: 1 })
)

interval(1000).pipe(
shareReplay(1)
)

When shareReplay is called with { refCount: false }, it’s the same as calling shareReplay(x).

In that case, reference counting isn’t activated. This means that as long as the source hasn’t complete or errored, it’ll still be subscribed to it, regardless whether there are any active subscribers or not. So all new subscribers will receive the last X buffered values.

shareReplay vs publishReplay + refCount

At first glance, shareReplay({ refCount: true, bufferSize: X }) looks identical to publishReplay(X) + refCount(), but that’s not entirely accurate.

Let’s see what they share and where they differ:

They share the same refCount behavior — subscribe and unsubscribe from the source based on updates to the number of subscribes. They also share the same behavior when the source completes; Any new subscriber will get the X last emitted values.

However, they behave differently when the source hasn’t completed. In that case, when we use publishReplay(X) + refCount(), any new subscriber will get the last X emitted values from the ReplaySubject’s buffer and will resubscribe again to the source using the same ReplaySubject.

Conversely, if we look at the same situation with shareReplay({ refCount: true, bufferSize: 1 }), we’ll not get the last X emitted values, since under the hood it creates a new ReplaySubject instance, and uses it to resubscribe to the source. You can see it in the following examples:

const source = interval(1000).pipe(
publishReplay(1),
refCount()
);

const one = source.subscribe(observer('subcriber-1'));

setTimeout(() => {
one.unsubscribe();

// This subscriber will get the last emitted values from the source
const two = source.subscribe(observer('subcriber-2'));
}, 3000);
const source = interval(1000).pipe(
shareReplay({ refCount: true, bufferSize: 1 })
);

const one = source.subscribe(observer('subcriber-1'));

setTimeout(() => {
one.unsubscribe();

// This subscriber will NOT get the last emitted values from the source
const two = source.subscribe(observer('subcriber-2'));
}, 3000);

Real-World Angular Examples

As we’ve learned, RxJS offers various sharing operators for us to use. Let’s look at a couple of examples where we can employ them.

Using Share

Let’s say that we have a component that needs to get some data from a source observable. It can be an HTTP call, a store, or any other implementation you have. Additionally, it needs to perform manipulations on the data, such as filtering, sorting, and so forth:


@Component({
template: `
<users-list [users]="allUsers$ | async"></users-list>
`,
})
export class UsersPageComponent {
allUsers$: Observable<User[]>;

constructor(private http: HttpClient) {
}

ngOnInit() {
this.allUsers$ = this.http.get('https://api/users').pipe(
map(users => filter/sort),
);
}
}

Now let’s say we need to add another component, one which only displays the first user. If we subscribe to the existing source as-is:

@Component({
template: `
<user [user]="firstUser$ | async"></user>
<users-list [users]="allUsers$ | async"></users-list>
`,
})
export class UsersPageComponent {
allUsers$: Observable<User[]>;
firstUser$: Observable<User>;

constructor(private http: HttpClient) {
}

ngOnInit() {
this.allUsers$ = this.http.get('https://api/users').pipe(
map(users => filter/sort),
);

this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
}
}

We’ll have two HTTP requests, and our operations, such as sorting and filtering, will run twice. To avoid this, we need to share the observable execution:

@Component({
template: `
<user [user]="firstUser$ | async"></user>
<users-list [users]="allUsers$ | async"></users-list>
`,
})
export class UsersPageComponent {
allUsers$: Observable<User[]>;
firstUser$: Observable<User>;

constructor(private http: HttpClient) {
}

ngOnInit() {
this.allUsers$ = this.http.get('https://api/users').pipe(
map(users => filter/sort),
share()
);

this.firstUser$ = this.allUsers$.pipe(map(users => users[0]));
}
}

As we’ve learned, this creates a new Subject that internally subscribes to the source. When the source emits, the subject will notify each one of the subscribers.

The solution is achieved because now, when we subscribe to firstUser$, we’re subscribing to the internal subject, not directly to the source.

Using ShareReplay

shareReplay() is mostly used when we need the ability to share, cache, and replay the X emitted values. A typical example is a singleton service that performs an HTTP request:


@Injectable({ providedIn: 'root' })
export class BlogService {
posts$ = this.http.get('https://jsonplaceholder.typicode.com/posts')
.pipe(shareReplay(1));

constructor(private http: HttpClient) {}
}


Here, it doesn’t matter how many components or future components are interested in the data from the posts call; It will always perform a single HTTP request and return the data from the internal ReplaySubject’s buffer.

You might encounter a case, where you want to cancel a request that hasn’t completed yet, and there are no active subscribers. In this case, you’ll need to use refCount.

You can find the complete code here.