Understanding Observable - Hot vs Cold

Understanding Observable - Hot vs Cold

There are hot and cold observables, this difference is usually not known or confused, leading to side effects and time spent on debugging. This article explains the difference and aspects worth paying attention to.

Cold Observables

When it comes to cold observables, they have the following properties:

  • The observable sequence is re-executed for each new subscriber

  • Each subscriber receives the entire sequence of values from the beginning

  • Cold observables produce a separate sequence of values for each subscriber

  • some examples are the observables created with RxJs operators like of or from, or making HTTP requests in Angular with HttpClient

const stream$ = of(1, 2, 3);

stream$.subscribe({
    next: value => console.log('subscriber 1: ', value),
    error: error => console.log('subscriber 1 err: ', error),
    complete: () => console.log('subscriber 1: completed')
});

stream$.subscribe({
    next: value => console.log('subscriber 2: ', value),
    error: error => console.log('subscriber 2 err: ', error),
    complete: () => console.log('subscriber 2: completed')
});

// Output:
// subscriber 1: 1
// subscriber 1: 2
// subscriber 1: 3
// subscriber 1: completed
// subscriber 2: 1
// subscriber 2: 2
// subscriber 2: 3
// subscriber 2: completed
const stream$ = new Observable((subscriber) => {
    console.log('inside observable fn: Observable has been called');
    subscriber.next(1);
    subscriber.next(2);
    setTimeout(() => {
        subscriber.next(3);
        subscriber.complete();
    }, 1000);
});

stream$.subscribe({
    next: value => console.log('subscriber 1: ', value),
    error: error => console.log('subscriber 1 err: ', error),
    complete: () => console.log('subscriber 1: completed')
});

stream$.subscribe({
    next: value => console.log('subscriber 2: ', value),
    error: error => console.log('subscriber 2 err: ', error),
    complete: () => console.log('subscriber 2: completed')
});

// Output:
// inside observable fn: Observable has benn called
// subscriber 1: 1
// subscriber 1: 2
// inside observable fn: Observable has benn called
// subscriber 2: 1
// subscriber 2: 2
// subscriber 1: 3
// subscriber 1: completed
// subscriber 2: 3
// subscriber 2: completed

from the last example, we can see that an Observable is like a wrapper over a function that next()s values and eventually completes, subscribing to that Observable executes that function and sends the values to its Observers.

If the Observable is creating the values that it emits inside its function then this is why each subscriber gets the same values, cause the function is executed on every subscription.

Hot Observables

When it comes to hot observables, they have the following properties:

  • Hot observables produce values regardless of whether there are subscribers

  • Hot observables can be shared among subscribers

  • Subscribers only receive values emitted by the observable from the moment they subscribe onward, and late subscribers may miss earlier emitted values

  • Are typically created from sources external to RxJs, such as user events, sensor data, or web sockets

const fakeSocket = new Subject<number>();
const socket$ = fakeSocket.asObservable();

fakeSocket.next(0);
setTimeout(() => fakeSocket.next(1), 1000);
setTimeout(() => fakeSocket.next(2), 2000);
setTimeout(() => fakeSocket.next(3), 3000);
setTimeout(() => fakeSocket.next(4), 4000);
setTimeout(() => {fakeSocket.next(5); fakeSocket.complete();}, 5000);

socket$.subscribe({
    next: (value) => console.log("subscriber 1: ", value),
    error: err => console.log("subscriber 1 error: ", err),
    complete: () => console.log("subscriber 1 complete")
});

setTimeout(() => socket$.subscribe({
    next: (value) => console.log("subscriber 2: ", value),
    error: err => console.log("subscriber 2 error: ", err),
    complete: () => console.log("subscriber 2 complete")
}), 2500);

setTimeout(() => socket$.subscribe({
    next: (value) => console.log("subscriber 3: ", value),
    error: err => console.log("subscriber 3 error: ", err),
    complete: () => console.log("subscriber 3 complete")
}), 6000);

// Output:
// subscriber 1: 1
// subscriber 1: 2
// subscriber 1: 3
// subscriber 2: 3
// subscriber 1: 4
// subscriber 2: 4
// subscriber 1: 5
// subscriber 2: 5
// subscriber 1 complete
// subscriber 2 complete
// subscriber 3 complete

in this example, we can see that subscriber 1 received all the values (except 0), then subscriber 2 came a little late to the party and got only the values that were emitted after it subscribed, and subscriber 3 came after the socket completed and received no values. We can also see that no subscriber received "0" since it was emitted when there were no subscribers to listen to it.

From Cold to Hot

warm observables

This is an example of another type of observable, the warm one, which acts like a hot with the difference that is lazy, meaning, the observable won't emit values until a subscriber arrives. After the first subscriber arrives the values emitted with be shared with all subscribers.

const stream$ = new Observable(subscriber => {
    subscriber.next(1);
    setTimeout(() => subscriber.next(2), 1000);
    setTimeout(() => subscriber.next(3), 2000);
    setTimeout(() => {subscriber.next(4); subscriber.complete()}, 3000);
}).pipe(share());

setTimeout(() => stream$.subscribe({
    next: (value) => console.log("subscriber 1: ", value),
    error: err => console.log("subscriber 1 error: ", err),
    complete: () => console.log("subscriber 1 complete")
}), 2000);

setTimeout(() => stream$.subscribe({
    next: (value) => console.log("subscriber 2: ", value),
    error: err => console.log("subscriber 2 error: ", err),
    complete: () => console.log("subscriber 2 complete")
}), 4000);

// Output
// subscriber 1: 1
// subscriber 1: 2
// subscriber 1: 3
// subscriber 2: 3
// subscriber 1: 4
// subscriber 2: 4
// subscriber 1 complete
// subscriber 2 complete

even though subscriber 1 arrived after 2 seconds, it still gets all the values as the Observable started execution at subscriber 1's arrival. With a hot observable, subscriber 2 should have arrived too late to the party, but because this was a warm one that started execution 2 seconds before subscriber 2 arrived, it still received the last 2 values.

beware of sharing a cold synchronous observable

const stream$ = of(1, 2, 3).pipe(share());

stream$.subscribe({
    next: (value) => console.log("subscriber 1: ", value),
    error: err => console.log("subscriber 1 error: ", err),
    complete: () => console.log("subscriber 1 complete")
});

stream$.subscribe({
    next: (value) => console.log("subscriber 2: ", value),
    error: err => console.log("subscriber 2 error: ", err),
    complete: () => console.log("subscriber 2 complete")
});

// Output
// subscriber 1: 1
// subscriber 1: 2
// subscriber 1: 3
// subscriber 1 complete
// subscriber 2: 1
// subscriber 2: 2
// subscriber 2: 3
// subscriber 2 complete

here we see that even though the observable was shared it still acted like a cold one, the observable was executed for each subscriber. That is because share shares the emissions while the subscriber count is > 0, when an observable has a synchronous internal source like in our example, by the time the next subscriber arrives the first one already completed making the subscriber count = 0 thus the arrival of a new subscriber re-executes the observable.

using connect to multicast synchronous observables

One way of solving the issue above of sharing cold synchronous observables is by using the connect operator, connect multicasts the source observable and allows you to define how the source will be used before connection (sharing the source typically is done with merge ).

const source$ = of(1, 2, 3).pipe(
    tap({
        subscribe: () => console.log('subscription started'),
        next: n => console.log(`source emitted ${ n }`)
    })
);

source$.pipe(
    connect(shared$ => merge(
        shared$.pipe(map(n => `all ${ n }`)),
        shared$.pipe(map(n => `double ${ n * 2 }`)),
        shared$.pipe(map(n => `triple ${ n * 3 }`))
    ))
)
.subscribe(console.log);

// Output:
// subscription started
// source emitted 1
// all 1
// double 2
// triple 3
// source emitted 2
// all 2
// double 4
// triple 6
// source emitted 3
// all 3
// double 6
// triple 9

connectable

Using connectable on an observable it multicasts it once connect() is called on that observable.

const source$ = of(1, 2, 3).pipe(
    tap({
        subscribe: () => console.log('subscription started'),
        next: n => console.log(`source emitted ${ n }`)
    })
);
const stream$ = connectable(source$);

stream$.subscribe({
    next: (value) => console.log("subscriber 1: ", value),
    error: err => console.log("subscriber 1 error: ", err),
    complete: () => console.log("subscriber 1 complete")
});

setTimeout(() => stream$.subscribe({
    next: (value) => console.log("subscriber 2: ", value),
    error: err => console.log("subscriber 2 error: ", err),
    complete: () => console.log("subscriber 2 complete")
}), 1000);

setTimeout(() => stream$.connect(), 2000);

// Output
// subscription started
// source emitted 1
// subscriber 1: 1
// subscriber 2: 1
// source emitted 2
// subscriber 1: 2
// subscriber 2: 2
// source emitted 3
// subscriber 1: 3
// subscriber 2: 3
// subscriber 1 complete
// subscriber 2 complete

as we see from the example, no matter the time of arrival of subscribers, they didn't receive the emissions until connect() was called, by calling connect() it connects the source observable to all consumers.

Summary

// COLD
var cold = new Observable((observer) => {
  var producer = new Producer();
  // have observer listen to producer here
});

// HOT
var producer = new Producer();
var hot = new Observable((observer) => {
  // have observer listen to producer here
});

The key difference between hot and cold observables lies in where the values are produced. Subscribing to a cold observable means re-executing the producer, subscribing to a hot one means just listening to an already existent producer.