Operator that skips the next emission from the source whenever another Observable emits
I can think of two solutions to this:
Using
.filter()
,.do()
and a few side-effects.This is mayne easier to understand solution even though it's not that "Rx" way:
function skipNextWhen(other) { let skipNext = false; return this.merge(other.do(() => skipNext = true).filter(() => false)) .filter(val => { const doSkip = skipNext; skipNext = false; return !doSkip; }); }
I'm using
merge()
just to updateskipNext
,other
's value is always ignored.Using
.scan()
:This solution is without any state variables and side-effects.
function skipNextWhen(other) { const SKIP = 'skip'; return this.merge(other.mapTo(SKIP)) .scan((acc, val) => { if (acc === SKIP) { return null; } else if (val === SKIP) { return SKIP; } else { return val; } }, []) .filter(val => Boolean(val) && val !== SKIP); }
Basically, when
SKIP
arrives I return it right away because it's going to be passed again inacc
parameter by thescan()
operator and later ignored byfilter()
.If I receive a normal value but the previous value was
SKIP
I ignore it and return justnull
which is later filter away.
Both solutions give the same result:
Observable.prototype.skipNextWhen = skipNextWhen;
const source = Observable.range(1, 10)
.concatMap(val => Observable.of(val).delay(100));
source
.skipNextWhen(Observable.interval(350))
.subscribe(console.log);
This prints the following:
1
2
3
5
6
8
9
10
Just be aware that you're not in fact creating new operator. You just have a shortcut for an operator chain. This for example doesn't let you unsubscribe from other
when the source completes.
I've started a (very) small library of some rxjs utils I've wanted. It happens to have a function to do exactly what you ask: skipAfter
. From the docs:
source: -1-----2-----3-----4-----5-|
skip$: ----0----------0-0----------
result: -1-----------3-----------5-|
The library is here: https://github.com/simontonsoftware/s-rxjs-utils