How to monitor number of RXJS subscriptions?
You are really asking three separate questions here, and I question whether you really need the full capability that you mention. Since most of the resource managment stuff you are asking for is already provided for by the library, doing custom tracking code seems to be redundant. The first two questions:
- Allocate global resource when the number of subscriptions becomes greater than 0
- Release global resource when the number of subscriptions becomes 0
Can be done with the using
+ share
operators:
class ExpensiveResource {
constructor () {
// Do construction
}
unsubscribe () {
// Do Tear down
}
}
// Creates a resource and ties its lifecycle with that of the created `Observable`
// generated by the second factory function
// Using will accept anything that is "Subscription-like" meaning it has a unsubscribe function.
const sharedStream$ = using(
// Creates an expensive resource
() => new ExpensiveResource(),
// Passes that expensive resource to an Observable factory function
er => timer(1000)
)
// Share the underlying source so that global creation and deletion are only
// processed when the subscriber count changes between 0 and 1 (or visa versa)
.pipe(share())
After that sharedStream$
can be passed around as a base stream which will manage the underlying resource (assuming you implemented your unsubscribe
correctly) so that the resource will be created and torn down as the number of subscribers transitions between 0 and 1.
Adjust the resource usage strategy based on the number of subscriptions
The third question I am most dubious on, but I'll answer it for completeness assuming you know your application better than I do (since I can't think of a reason why you would need specific handling at different usage levels other than going between 0 and 1).
Basically I would use a similar approach as above but I would encapuslate the transition logic slightly differently.
// Same as above
class ExpensiveResource {
unsubscribe() { console.log('Tear down this resource!')}
}
const usingReferenceTracking =
(onUp, onDown) => (resourceFactory, streamFactory) => {
let instance, refCount = 0
// Again manage the global resource state with using
const r$ = using(
// Unfortunately the using pattern doesn't let the resource escape the closure
// so we need to cache it for ourselves to use later
() => instance || (instance = resourceFactory()),
// Forward stream creation as normal
streamFactory
)
).pipe(
// Don't forget to clean up the stream after all is said and done
// Because its behind a share this should only happen when all subscribers unsubscribe
finalize(() => instance = null)
share()
)
// Use defer to trigger "onSubscribe" side-effects
// Note as well that these side-effects could be merged with the above for improved performance
// But I prefer them separate for easier maintenance.
return defer(() => onUp(instance, refCount += 1) || r$)
// Use finalize to handle the "onFinish" side-effects
.pipe(finalize(() => onDown(instance, refCount -= 1)))
}
const referenceTracked$ = usingReferenceTracking(
(ref, count) => console.log('Ref count increased to ' + count),
(ref, count) => console.log('Ref count decreased to ' + count)
)(
() => new ExpensiveResource(),
ref => timer(1000)
)
referenceTracked$.take(1).subscribe(x => console.log('Sub1 ' +x))
referenceTracked$.take(1).subscribe(x => console.log('Sub2 ' +x))
// Ref count increased to 1
// Ref count increased to 2
// Sub1 0
// Ref count decreased to 1
// Sub2 0
// Ref count decreased to 0
// Tear down this resource!
Warning: One side effect of this is that by definition the stream will be warm once it leaves the usingReferenceTracking
function, and it will go hot on first subscription. Make sure you take this into account during the subscription phase.
You could achieve it using defer to track subscriptions and finalize to track completions, e.g. as an operator:
// a custom operator that will count number of subscribers
function customOperator(onCountUpdate = noop) {
return function refCountOperatorFunction(source$) {
let counter = 0;
return defer(()=>{
counter++;
onCountUpdate(counter);
return source$;
})
.pipe(
finalize(()=>{
counter--;
onCountUpdate(counter);
})
);
};
}
// just a stub for `onCountUpdate`
function noop(){}
And then use it like:
const source$ = new Subject();
const result$ = source$.pipe(
customOperator( n => console.log('Count updated: ', n) )
);
Heres a code snippet illustrating this:
const { Subject, of, timer, pipe, defer } = rxjs;
const { finalize, takeUntil } = rxjs.operators;
const source$ = new Subject();
const result$ = source$.pipe(
customOperator( n => console.log('Count updated: ', n) )
);
// emit events
setTimeout(()=>{
source$.next('one');
}, 250);
setTimeout(()=>{
source$.next('two');
}, 1000);
setTimeout(()=>{
source$.next('three');
}, 1250);
setTimeout(()=>{
source$.next('four');
}, 1750);
// subscribe and unsubscribe
const subscriptionA = result$
.subscribe(value => console.log('A', value));
setTimeout(()=>{
result$.subscribe(value => console.log('B', value));
}, 500);
setTimeout(()=>{
result$.subscribe(value => console.log('C', value));
}, 1000);
setTimeout(()=>{
subscriptionA.unsubscribe();
}, 1500);
// complete source
setTimeout(()=>{
source$.complete();
}, 2000);
function customOperator(onCountUpdate = noop) {
return function refCountOperatorFunction(source$) {
let counter = 0;
return defer(()=>{
counter++;
onCountUpdate(counter);
return source$;
})
.pipe(
finalize(()=>{
counter--;
onCountUpdate(counter);
})
);
};
}
function noop(){}
<script src="https://unpkg.com/[email protected]/bundles/rxjs.umd.min.js"></script>
* NOTE: if your source$ is cold — you might need to share it.
Hope it helps