There are hot and cold observables, this difference is usually not known or confused, leading to side effects and time spent on debugging. This article explains the difference and aspects worth paying attention to.
Cold Observables
When it comes to cold observables, they have the following properties:
The observable sequence is re-executed for each new subscriber
Each subscriber receives the entire sequence of values from the beginning
Cold observables produce a separate sequence of values for each subscriber
some examples are the observables created with RxJs operators like
of
orfrom
, or making HTTP requests in Angular withHttpClient
const stream$ = of(1, 2, 3);
stream$.subscribe({
next: value => console.log('subscriber 1: ', value),
error: error => console.log('subscriber 1 err: ', error),
complete: () => console.log('subscriber 1: completed')
});
stream$.subscribe({
next: value => console.log('subscriber 2: ', value),
error: error => console.log('subscriber 2 err: ', error),
complete: () => console.log('subscriber 2: completed')
});
// Output:
// subscriber 1: 1
// subscriber 1: 2
// subscriber 1: 3
// subscriber 1: completed
// subscriber 2: 1
// subscriber 2: 2
// subscriber 2: 3
// subscriber 2: completed
const stream$ = new Observable((subscriber) => {
console.log('inside observable fn: Observable has been called');
subscriber.next(1);
subscriber.next(2);
setTimeout(() => {
subscriber.next(3);
subscriber.complete();
}, 1000);
});
stream$.subscribe({
next: value => console.log('subscriber 1: ', value),
error: error => console.log('subscriber 1 err: ', error),
complete: () => console.log('subscriber 1: completed')
});
stream$.subscribe({
next: value => console.log('subscriber 2: ', value),
error: error => console.log('subscriber 2 err: ', error),
complete: () => console.log('subscriber 2: completed')
});
// Output:
// inside observable fn: Observable has benn called
// subscriber 1: 1
// subscriber 1: 2
// inside observable fn: Observable has benn called
// subscriber 2: 1
// subscriber 2: 2
// subscriber 1: 3
// subscriber 1: completed
// subscriber 2: 3
// subscriber 2: completed
from the last example, we can see that an Observable is like a wrapper over a function that next()
s values and eventually completes, subscribing to that Observable executes that function and sends the values to its Observers.
If the Observable is creating the values that it emits inside its function then this is why each subscriber gets the same values, cause the function is executed on every subscription.
Hot Observables
When it comes to hot observables, they have the following properties:
Hot observables produce values regardless of whether there are subscribers
Hot observables can be shared among subscribers
Subscribers only receive values emitted by the observable from the moment they subscribe onward, and late subscribers may miss earlier emitted values
Are typically created from sources external to RxJs, such as user events, sensor data, or web sockets
const fakeSocket = new Subject<number>();
const socket$ = fakeSocket.asObservable();
fakeSocket.next(0);
setTimeout(() => fakeSocket.next(1), 1000);
setTimeout(() => fakeSocket.next(2), 2000);
setTimeout(() => fakeSocket.next(3), 3000);
setTimeout(() => fakeSocket.next(4), 4000);
setTimeout(() => {fakeSocket.next(5); fakeSocket.complete();}, 5000);
socket$.subscribe({
next: (value) => console.log("subscriber 1: ", value),
error: err => console.log("subscriber 1 error: ", err),
complete: () => console.log("subscriber 1 complete")
});
setTimeout(() => socket$.subscribe({
next: (value) => console.log("subscriber 2: ", value),
error: err => console.log("subscriber 2 error: ", err),
complete: () => console.log("subscriber 2 complete")
}), 2500);
setTimeout(() => socket$.subscribe({
next: (value) => console.log("subscriber 3: ", value),
error: err => console.log("subscriber 3 error: ", err),
complete: () => console.log("subscriber 3 complete")
}), 6000);
// Output:
// subscriber 1: 1
// subscriber 1: 2
// subscriber 1: 3
// subscriber 2: 3
// subscriber 1: 4
// subscriber 2: 4
// subscriber 1: 5
// subscriber 2: 5
// subscriber 1 complete
// subscriber 2 complete
// subscriber 3 complete
in this example, we can see that subscriber 1 received all the values (except 0), then subscriber 2 came a little late to the party and got only the values that were emitted after it subscribed, and subscriber 3 came after the socket completed and received no values. We can also see that no subscriber received "0" since it was emitted when there were no subscribers to listen to it.
From Cold to Hot
warm observables
This is an example of another type of observable, the warm one, which acts like a hot with the difference that is lazy, meaning, the observable won't emit values until a subscriber arrives. After the first subscriber arrives the values emitted with be shared with all subscribers.
const stream$ = new Observable(subscriber => {
subscriber.next(1);
setTimeout(() => subscriber.next(2), 1000);
setTimeout(() => subscriber.next(3), 2000);
setTimeout(() => {subscriber.next(4); subscriber.complete()}, 3000);
}).pipe(share());
setTimeout(() => stream$.subscribe({
next: (value) => console.log("subscriber 1: ", value),
error: err => console.log("subscriber 1 error: ", err),
complete: () => console.log("subscriber 1 complete")
}), 2000);
setTimeout(() => stream$.subscribe({
next: (value) => console.log("subscriber 2: ", value),
error: err => console.log("subscriber 2 error: ", err),
complete: () => console.log("subscriber 2 complete")
}), 4000);
// Output
// subscriber 1: 1
// subscriber 1: 2
// subscriber 1: 3
// subscriber 2: 3
// subscriber 1: 4
// subscriber 2: 4
// subscriber 1 complete
// subscriber 2 complete
even though subscriber 1 arrived after 2 seconds, it still gets all the values as the Observable started execution at subscriber 1's arrival. With a hot observable, subscriber 2 should have arrived too late to the party, but because this was a warm one that started execution 2 seconds before subscriber 2 arrived, it still received the last 2 values.
beware of sharing a cold synchronous observable
const stream$ = of(1, 2, 3).pipe(share());
stream$.subscribe({
next: (value) => console.log("subscriber 1: ", value),
error: err => console.log("subscriber 1 error: ", err),
complete: () => console.log("subscriber 1 complete")
});
stream$.subscribe({
next: (value) => console.log("subscriber 2: ", value),
error: err => console.log("subscriber 2 error: ", err),
complete: () => console.log("subscriber 2 complete")
});
// Output
// subscriber 1: 1
// subscriber 1: 2
// subscriber 1: 3
// subscriber 1 complete
// subscriber 2: 1
// subscriber 2: 2
// subscriber 2: 3
// subscriber 2 complete
here we see that even though the observable was shared it still acted like a cold one, the observable was executed for each subscriber. That is because share
shares the emissions while the subscriber count is > 0, when an observable has a synchronous internal source like in our example, by the time the next subscriber arrives the first one already completed making the subscriber count = 0 thus the arrival of a new subscriber re-executes the observable.
using connect to multicast synchronous observables
One way of solving the issue above of sharing cold synchronous observables is by using the connect
operator, connect
multicasts the source observable and allows you to define how the source will be used before connection (sharing the source typically is done with merge
).
const source$ = of(1, 2, 3).pipe(
tap({
subscribe: () => console.log('subscription started'),
next: n => console.log(`source emitted ${ n }`)
})
);
source$.pipe(
connect(shared$ => merge(
shared$.pipe(map(n => `all ${ n }`)),
shared$.pipe(map(n => `double ${ n * 2 }`)),
shared$.pipe(map(n => `triple ${ n * 3 }`))
))
)
.subscribe(console.log);
// Output:
// subscription started
// source emitted 1
// all 1
// double 2
// triple 3
// source emitted 2
// all 2
// double 4
// triple 6
// source emitted 3
// all 3
// double 6
// triple 9
connectable
Using connectable
on an observable it multicasts it once connect()
is called on that observable.
const source$ = of(1, 2, 3).pipe(
tap({
subscribe: () => console.log('subscription started'),
next: n => console.log(`source emitted ${ n }`)
})
);
const stream$ = connectable(source$);
stream$.subscribe({
next: (value) => console.log("subscriber 1: ", value),
error: err => console.log("subscriber 1 error: ", err),
complete: () => console.log("subscriber 1 complete")
});
setTimeout(() => stream$.subscribe({
next: (value) => console.log("subscriber 2: ", value),
error: err => console.log("subscriber 2 error: ", err),
complete: () => console.log("subscriber 2 complete")
}), 1000);
setTimeout(() => stream$.connect(), 2000);
// Output
// subscription started
// source emitted 1
// subscriber 1: 1
// subscriber 2: 1
// source emitted 2
// subscriber 1: 2
// subscriber 2: 2
// source emitted 3
// subscriber 1: 3
// subscriber 2: 3
// subscriber 1 complete
// subscriber 2 complete
as we see from the example, no matter the time of arrival of subscribers, they didn't receive the emissions until connect()
was called, by calling connect()
it connects the source observable to all consumers.
Summary
// COLD
var cold = new Observable((observer) => {
var producer = new Producer();
// have observer listen to producer here
});
// HOT
var producer = new Producer();
var hot = new Observable((observer) => {
// have observer listen to producer here
});
The key difference between hot and cold observables lies in where the values are produced. Subscribing to a cold observable means re-executing the producer, subscribing to a hot one means just listening to an already existent producer.