shareReplayLatestWhileConnected with RxJS
As you mentioned, shareReplay(1)
pretty much gets you there. It will multicast the response to current subscribers and replay the last value (if there is one) to new subscribers. That seems like what you would want rather than shareBehavior
(if it existed) since you are calling an api and there isn't an initial value.
You should know that shareReplay
will create a subscription to the source stream but will only unsubscribe when refCount === 0
AND the source stream terminates (error or complete). This means that after the first subscription that the interval will start and even when there are no more subscriptions it will continue.
If you want to stop the interval when no-one is subscribed then use multicast(new ReplaySubject(1)).refCount()
. The multicast operator will create a single subscription to the source stream and push all values into the subject provided as an instance (multicast(new Subject())
) or by the factory (multicast(() => new Subject())
). All subscribers to the stream after the multicast will subscribe to the multicast subject. So when a value flows through the multicast operator all of its subscribers will get that value. You can change the type of subject that you pass to multicast to change its behavior. In your case you probably want a ReplaySubject
so that it will replay the last value to a new subscriber. You could use a BehaviorSubject
too if you felt that met your need.
Now the multicast
operator is connectable
meaning that you would have to call connect()
on the stream to make it hot. The refCount
operator basically makes a connectable observable act like an ordinary observable in that it will become hot when subscribed but will become cold when there are no subscribers. It does this be keeping an internal reference count (hence the name refCount
). When refCount === 0
it will disconnect.
This is the same thing as shareReplay(1)
with one minor but important difference which is that when there are no more subscribers that it will unsubscribe from the source stream. If you are using a factory method to create a new subject when subscribing to the source (ex: multicast(() => new ReplaySubject(1))
) then you will lose your value when the stream goes from hot to cold to hot since it will create a new subject each time it goes hot. If you want to keep the same subject between source subscriptions then you can pass in a subject instead of a factory (ex: multicast(new ReplaySubject(1))
or use its alias publishReplay(1)
.
As far as your last requirement of providing errors to your subscribers and then resubscribing, you can't call the error
callback on a subscription and then continue getting values on the next
callback. An unhandled error will end a subscription if it reaches it. So you have to catch it before it gets there and turn it into a normal message if you want your subscription to see it and still live. You can do this like so: catch((err) => of(err))
and just flag it somehow. If you want to mute it then return empty()
.
If you want to retry immediately then you could use the retryWhen
operator but you probably want to put that before the sharing operator to make it universal. However this also prevents your subscribers from knowing about an error. Since the root of your stream is an interval and the error came from the inner observable returned from the switchMap
, the error will not kill the source of the stream but it could kill the subscription. So as long as you handle the error (catch/catchError
) the api call will be retried on the next interval.
Also, you may want timer(0, 5000)
instead of interval so that your api call immediately fires and then fires on a 5 second interval after that.
So I would suggest something like the following:
let count = 0;
function makeApiCall() {
return Rx.Observable.of(count++).delay(1000);
}
const obs$ = Rx.Observable.timer(0, 5000)
.switchMap(() => makeApiCall().catch(() => Rx.Observable.empty()))
.publishReplay(1)
.refCount();
console.log('1 subscribe');
let firstSub = obs$.subscribe((x) => { console.log('1', x); });
let secondSub;
let thirdSub;
setTimeout(() => {
console.log('2 subscribe');
secondSub = obs$.subscribe((x) => { console.log('2', x); });
}, 7500);
setTimeout(() => {
console.log('1 unsubscribe');
firstSub.unsubscribe();
console.log('2 unsubscribe');
secondSub.unsubscribe();
}, 12000);
setTimeout(() => {
console.log('3 subscribe');
thirdSub = obs$.subscribe((x) => { console.log('3', x); });
}, 17000);
setTimeout(() => {
console.log('3 unsubscribe');
thirdSub.unsubscribe();
}, 30000);
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.5.10/Rx.min.js"></script>
For convenience, here are aliases for multicast:
publish() === multicast(new Subject())
publishReplay(#) === multicast(new ReplaySubject(#))
publishBehavior(value) === multicast(new BehaviorSubject(value))