How to make one Observable sequence wait for another to complete before emitting?
skipUntil() with last()
skipUntil : ignore emitted items until another observable has emitted
last: emit last value from a sequence (i.e. wait until it completes then emit)
Note that anything emitted from the observable passed to skipUntil
will cancel the skipping, which is why we need to add last()
- to wait for the stream to complete.
main$.skipUntil(sequence2$.pipe(last()))
Official: https://rxjs-dev.firebaseapp.com/api/operators/skipUntil
Possible issue: Note that last()
by itself will error if nothing is emitted. The last()
operator does have a default
parameter but only when used in conjunction with a predicate. I think if this situation is a problem for you (if sequence2$
may complete without emitting) then one of these should work (currently untested):
main$.skipUntil(sequence2$.pipe(defaultIfEmpty(undefined), last()))
main$.skipUntil(sequence2$.pipe(last(), catchError(() => of(undefined))
Note that undefined
is a valid item to be emitted, but could actually be any value. Also note that this is the pipe attached to sequence2$
and not the main$
pipe.
Here's a reusable way of doing it (it's typescript but you can adapt it to js):
function waitFor<T>(signal: Observable<any>) {
return (source: Observable<T>) => signal.pipe(
first(),
switchMap(_ => source),
);
}
and you can use it like any operator:
var two = someOtherObservable.pipe(waitFor(one), take(1));
It's basically an operator that defers the subscribe on the source observable until the signal observable emits the first event.
If you want to make sure that the order of execution is retained you can use flatMap as the following example
const first = Rx.Observable.of(1).delay(1000).do(i => console.log(i));
const second = Rx.Observable.of(11).delay(500).do(i => console.log(i));
const third = Rx.Observable.of(111).do(i => console.log(i));
first
.flatMap(() => second)
.flatMap(() => third)
.subscribe(()=> console.log('finished'));
The outcome would be:
"1"
"11"
"111"
"finished"
A couple ways I can think of
import {take, publish} from 'rxjs/operators'
import {concat} from 'rxjs'
//Method one
var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1));
concat(one, two).subscribe(function() {/*do something */});
//Method two, if they need to be separate for some reason
var one = someObservable.pipe(take(1));
var two = someOtherObservable.pipe(take(1), publish());
two.subscribe(function(){/*do something */});
one.subscribe(function(){/*do something */}, null, two.connect.bind(two));