How to implement time expiry hot observable in RxJS (or general in Reactive Extensions)
All you are missing is to schedule a task to replace your cachedData
with a new AsyncSubject
after a time period. Here's how to do it as a new Rx.Observable
method:
Rx.Observable.prototype.cacheWithExpiration = function(expirationMs, scheduler) {
var source = this,
cachedData = undefined;
// Use timeout scheduler if scheduler not supplied
scheduler = scheduler || Rx.Scheduler.timeout;
return Rx.Observable.create(function (observer) {
if (!cachedData) {
// The data is not cached.
// create a subject to hold the result
cachedData = new Rx.AsyncSubject();
// subscribe to the query
source.subscribe(cachedData);
// when the query completes, start a timer which will expire the cache
cachedData.subscribe(function () {
scheduler.scheduleWithRelative(expirationMs, function () {
// clear the cache
cachedData = undefined;
});
});
}
// subscribe the observer to the cached data
return cachedData.subscribe(observer);
});
};
Usage:
// a *cold* observable the issues a slow query each time it is subscribed
var data = Rx.Observable.return(42).delay(5000);
// the cached query
var cachedData = data.cacheWithExpiration(15000);
// first observer must wait
cachedData.subscribe();
// wait 3 seconds
// second observer gets result instantly
cachedData.subscribe();
// wait 15 seconds
// observer must wait again
cachedData.subscribe();