RxJs: lossy form of zip operator

The following will give you the desired behavior:

Observable.zip(s1.take(1), s2.take(1)).repeat()

In RxJs 5.5+ pipe syntax:

zip(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());

const s1 = new Rx.Subject();
const s2 = new Rx.Subject();

Rx.Observable.zip(s1.take(1), s2.take(1)).repeat()
    .subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);  
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/Rx.js"></script>

Explanation:

  • repeat operator (in its current implementation) resubscribes to the source observable upon the latter's completion, i.e. in this particular case it resubscribes to zip upon every mutual emission.
  • zip combines two observables and waits for both of them to emit. combineLatest will do as well, it doesn't really matter because of take(1)
  • take(1) actually takes care of memory explosion and defines lossy behavior

If you want to take the last instead of the first value from each stream upon mutual emission use this:

Observable.combineLatest(s1, s2).take(1).repeat()

In RxJs 5.5+ pipe syntax:

combineLatest(s1.pipe(take(1)), s2.pipe(take(1))).pipe(repeat());

const s1 = new Rx.Subject();
const s2 = new Rx.Subject();

Rx.Observable.combineLatest(s1,s2).take(1).repeat()
    .subscribe(console.log);

s1.next(1); s1.next(2); s2.next(10); s1.next(3); s1.next(4); s2.next(20); s1.next(5); s1.next(6); s1.next(7); s2.next(30);  
s2.next(40); s2.next(50); s2.next(60); s2.next(70); 
<script src="https://unpkg.com/@reactivex/[email protected]/dist/global/Rx.js"></script>