A forkJoin alternative for uncompleted observables?
There are several options:
Use
take(1)
withforkJoin()
to complete each source Observable:forkJoin(o1$.take(1), o2$.take(1))
Use
zip()
andtake(1)
to emit only when all source Observables have emitted the same number of items:zip(o1$, o2$).take(1)
Use
combineLatest()
to emit when any of the source Observables emit:combineLatest(o1$, o2$)
Jan 2019: Updated for RxJS 6
A small trick to avoid breaking of observable subscriptions if any one of the observable fails.
import { throwError, of, forkJoin } from "rxjs";
import { catchError, take } from "rxjs/operators";
//emits an error with specified value on subscription
const observables$ = [];
const observableThatWillComplete$ = of(1, 2, 3, 4, 5).pipe(take(1));
const observableThatWillFail$ = throwError(
"This is an error hence breaking the stream"
).pipe(catchError((error) => of(`Error Catched: ${error}`)));
observables$.push(observableThatWillComplete$, observableThatWillFail$);
forkJoin(observables$).subscribe(responses => {
console.log("Subscribed");
console.log(responses);
});
In addition to the other answer, consider using Observable.concat()
which will handle each observable in sequence. Here's an example:
const getPostOne$ = Rx.Observable.timer(3000).mapTo({id: 1});
const getPostTwo$ = Rx.Observable.timer(1000).mapTo({id: 2});
Rx.Observable.concat(getPostOne$, getPostTwo$).subscribe(res => console.log(res));
A good article outlining 6 operators you should know.