RxJs combineLatest without waiting for source observales to emit?
One way is prefixing all sources with startWith
:
combineLatest([
source1$.pipe(startWith(?)),
source2$.pipe(startWith(?)),
])
that emits as soon as any of the source observables emits for the first time?
This looks like you might be looking for race(source1$, source2$)
Observable creation method or maybe just merge(source1$, source2$).pipe(take(1))
. But it really depends on what you want to do.
If I understand correctly you want a pattern like the following diagram:
stream1$ => ------ 1 ------ 12 -----------------------
stream2$ => ------------------------- 30 -------------
result$ => ------ 1 ------ 12 ------ 42 --------------
If one value is available, emit that. If both are available, emit the combination of both, a simple sum in this case (12 + 30 = 42);
First the input streams, I've made them subjects for the sake of this example, so we can push data in manually:
const stream1$ = new Subject();
const stream2$ = new Subject();
Next we'll combine the inputs, first piped through the startWith operator. This makes sure that combineLatest produces an observable that emits immediately - [null, null]
to be precise.
const combined$ = combineLatest(
stream1$.pipe(startWith(null)),
stream2$.pipe(startWith(null)),
);
Now you have an observable that always emits arrays of length 2, containing any combination of your data (numbers in this example) and null, like the following diagram:
stream1$ | startWith(NULL) => NULL ----------- 1 ----------- 12 ----------------------------
stream2$ | startWith(NULL) => NULL ---------------------------------------- 30 -------------
combined$ [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
Finally you can inspect and map
this output to your desired format: the sum of 2 numbers if both are available, or the first value to be available:
const processedCombinations$ = combined$.pipe(
map(([data1, data2]) => {
if (data1 === null) return data2;
if (data2 === null) return data1;
return data1 + data2;
}),
);
Result:
combined$ => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
processedCombinations$ => NULL ----------- 1 ----------- 12 ----------- 42 -------------
One problem remains: the first value emitted from combined$
is [null, null]
, causing processedCombinations$
to emit null
initially. One way to fix this is to chain another pipe using skipWhile
onto processedCombinations$
:
const final$ = processedCombinations$.pipe(skipWhile((input) => input === null));
Result:
combined$ => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
processedCombinations$ => NULL ----------- 1 ----------- 12 ----------- 42 -------------
final$ => ---------------- 1 ----------- 12 ----------- 42 -------------
Another - imo better - way is to filter the combined$
stream before processedCombinations$
(now actually final$
) is created from it:
const combinedFiltered$ = combined$.pipe(
filter(([first, second])=> first !== null || second !== null),
);
const final$ = combinedFiltered$.pipe(
map(([data1, data2]) => {
if (data1 === null) return data2;
if (data2 === null) return data1;
return data1 + data2;
}),
);
A corresponding diagram shows nicely how irrelevant values are eliminated as early in the stream hierarchy as possible:
combined$ => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
combinedFiltered$ => ---------------- [1, NULL] --- [12, NULL] --- [12, 30] -------
final$ => ---------------- 1 ----------- 12 ----------- 42 -------------
The above diagrams can be produced with this code:
final$.subscribe(console.log);
stream1$.next(1);
// logs: 1
stream1$.next(12);
// logs: 12
stream2$.next(30);
// logs: 42
Imports used:
import { combineLatest, Subject } from 'rxjs';
import { filter, map, skipWhile, startWith } from 'rxjs/operators';