RxJs: lossy form of zip operator
The following will give you the desired behavior:
Observable.zip(s1.take(1), s2.take(1)).repeat()
In RxJs 5.5+
pipe syntax:
zip(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());
const s1 = new Rx.Subject();
const s2 = new Rx.Subject();
Rx.Observable.zip(s1.take(1), s2.take(1)).repeat()
.subscribe(console.log);
s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);
s2.next(40); s2.next(50); s2.next(60); s2.next(70);
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/Rx.js"></script>
Explanation:
repeat
operator (in its current implementation) resubscribes to the source observable upon the latter's completion, i.e. in this particular case it resubscribes tozip
upon every mutual emission.zip
combines two observables and waits for both of them to emit.combineLatest
will do as well, it doesn't really matter because oftake(1)
take(1)
actually takes care of memory explosion and defines lossy behavior
If you want to take the last instead of the first value from each stream upon mutual emission use this:
Observable.combineLatest(s1, s2).take(1).repeat()
In RxJs 5.5+
pipe syntax:
combineLatest(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());
const s1 = new Rx.Subject();
const s2 = new Rx.Subject();
Rx.Observable.combineLatest(s1,s2).take(1).repeat()
.subscribe(console.log);
s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);
s2.next(40); s2.next(50); s2.next(60); s2.next(70);
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/Rx.js"></script>