RxJS Debounce with priority
You have to store and update the item with the highest priority and map to this highest
value which you then pass to debounceTime
.
let highest = null;
source$.pipe(
map(v => highest = highest && highest.priority > v.priority ? highest : v),
debounceTime(2000),
tap(() => highest = null)
);
You can create your own operator that does this with the help of defer
. defer
makes sure that every subscriber gets its own highest
variable, as every subscriber will get its own new Observable created by calling the given factory function.
function debounceTimeHighest<T>(dueTime: number, getHighest: (curr: T, high: T) => T): MonoTypeOperatorFunction<T> {
return (source: Observable<T>) => defer(() => {
let highest: T = null;
return source.pipe(
map(item => highest = highest ? getHighest(item, highest) : item),
debounceTime(dueTime),
tap(() => highest = null)
);
});
}
// Usage
source$.pipe(
debounceTimeHighest(2000, (v1, v2) => v1.priority >= v2.priority ? v1 : v2)
)
The code above is Typescript. If you want plain Javascript just remove all the types.
https://stackblitz.com/edit/rxjs-hitqxk
You can combine the debounceTime and buffer and filter operator to achieve what you need. I have developed this small example for it.
https://stackblitz.com/edit/typescript-lwzt4k
/*
Collect clicks that occur, after 250ms emit array of clicks
*/
clicks$.pipe(
buffer(clicks$.pipe(debounceTime(1000))),
// if array is greater than 1, double click occured
map((clickArray) => {
document.querySelector('#emittedObjects').innerHTML = (`<div>${JSON.stringify(clickArray)}</div>`);
const sortedArray = clickArray.sort((a, b) => {
return a.priority < b.priority ? 1 : -1;
});
const output = sortedArray.length > 0 ? sortedArray[0] : null;
document.querySelector('#mappedOutput').innerHTML = JSON.stringify(output);
return output;
})
)
.subscribe((obj) => {
const str = obj ? JSON.stringify(obj) : 'NULL';
document.querySelector('#throttledOutput').innerHTML = `<div>THROTTLED: ${str}</div>`;
});
I'll offer the following solution, based around using scan
to offer up the highest given priority emission so far for consideration by debounceTime()
. Note that scan needs to reconsider new data after every successful debounce, so I use the operator window()
to split up the emissions, starting a new observable window after every emission by debounceTime()
.
Here is the CodeSandbox
And here is some simplified code from the CodeSandbox showing the important bits:
const resetScan$ = new Subject();
source$.pipe(
window(resetScan$),
mergeMap(win$ => win$.pipe(
scan((acc, cur) => acc.priority >= cur.priority ? acc : cur )
)),
debounceTime(debounceDelay),
tap(() => resetScan$.next())
);