RxJs combineLatest without waiting for source observales to emit?

One way is prefixing all sources with startWith:

combineLatest([
  source1$.pipe(startWith(?)),
  source2$.pipe(startWith(?)),
])

that emits as soon as any of the source observables emits for the first time?

This looks like you might be looking for race(source1$, source2$) Observable creation method or maybe just merge(source1$, source2$).pipe(take(1)). But it really depends on what you want to do.


If I understand correctly you want a pattern like the following diagram:

stream1$ => ------ 1 ------ 12 -----------------------
stream2$ => ------------------------- 30 -------------

result$  => ------ 1 ------ 12 ------ 42 --------------

If one value is available, emit that. If both are available, emit the combination of both, a simple sum in this case (12 + 30 = 42);

First the input streams, I've made them subjects for the sake of this example, so we can push data in manually:

const stream1$ = new Subject();
const stream2$ = new Subject();

Next we'll combine the inputs, first piped through the startWith operator. This makes sure that combineLatest produces an observable that emits immediately - [null, null] to be precise.

const combined$ = combineLatest(
  stream1$.pipe(startWith(null)),
  stream2$.pipe(startWith(null)),
);

Now you have an observable that always emits arrays of length 2, containing any combination of your data (numbers in this example) and null, like the following diagram:

stream1$ | startWith(NULL) => NULL ----------- 1 ----------- 12 ----------------------------
stream2$ | startWith(NULL) => NULL ---------------------------------------- 30 -------------

combined$                     [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------

Finally you can inspect and map this output to your desired format: the sum of 2 numbers if both are available, or the first value to be available:

const processedCombinations$ = combined$.pipe(
  map(([data1, data2]) => {
    if (data1 === null) return data2;
    if (data2 === null) return data1;

    return data1 + data2;
  }),
);

Result:

combined$                  => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
processedCombinations$     => NULL ----------- 1 ----------- 12 ----------- 42 -------------

One problem remains: the first value emitted from combined$ is [null, null], causing processedCombinations$ to emit null initially. One way to fix this is to chain another pipe using skipWhile onto processedCombinations$:

const final$ = processedCombinations$.pipe(skipWhile((input) => input === null));

Result:

combined$                  => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
processedCombinations$     => NULL ----------- 1 ----------- 12 ----------- 42 -------------
final$                     => ---------------- 1 ----------- 12 ----------- 42 -------------

Another - imo better - way is to filter the combined$ stream before processedCombinations$ (now actually final$) is created from it:

const combinedFiltered$ = combined$.pipe(
    filter(([first, second])=> first !== null || second !== null),
);

const final$ = combinedFiltered$.pipe(
    map(([data1, data2]) => {
        if (data1 === null) return data2;
        if (data2 === null) return data1;

        return data1 + data2;
    }),
);

A corresponding diagram shows nicely how irrelevant values are eliminated as early in the stream hierarchy as possible:

combined$                  => [NULL, NULL] --- [1, NULL] --- [12, NULL] --- [12, 30] -------
combinedFiltered$          => ---------------- [1, NULL] --- [12, NULL] --- [12, 30] -------
final$                     => ---------------- 1 ----------- 12 ----------- 42 -------------

The above diagrams can be produced with this code:

final$.subscribe(console.log);

stream1$.next(1);
// logs: 1

stream1$.next(12);
// logs: 12

stream2$.next(30);
// logs: 42

Imports used:

import { combineLatest, Subject } from 'rxjs';
import { filter, map, skipWhile, startWith } from 'rxjs/operators';