Observable.onSubscribe equivalent in RxJs
Another example that builds on Mark's answer is to use the factory functions:
Rx.Observable.prototype.doOnSubscribe = function(onSubscribe) {
const source = this;
return Rx.Observable.defer(() => {
onSubscribe();
return source;
});
}
Really like Mark's answer, but if you're using rxjs
version 5 or later, I'd prefer using pure functions instead of patching the prototype.
Update:
Implemented without empty
concat
hack using defer
as suggested by Ray
import {defer} from 'rxjs/observable/defer';
import {Observable} from 'rxjs/Observable';
/** Example
import {from} from 'rxjs/observable/from';
from([1, 2, 3])
.pipe(doOnSubscribe(() => console.log('subscribed to stream')))
.subscribe(x => console.log(x), null, () => console.log('completed'));
*/
export function doOnSubscribe<T>(onSubscribe: () => void): (source: Observable<T>) => Observable<T> {
return function inner(source: Observable<T>): Observable<T> {
return defer(() => {
onSubscribe();
return source;
});
};
}
https://gist.github.com/evxn/750702f7c8e8d5a32c7b53167fe14d8d
Original Answer:
import {empty} from 'rxjs/observable/empty';
import {concat, tap} from 'rxjs/operators';
import {Observable} from 'rxjs/Observable';
/** Example
import {from} from 'rxjs/observable/from';
from([1, 2, 3])
.pipe(doOnSubscribe(() => console.log('subscribed to stream')))
.subscribe(x => console.log(x), null, () => console.log('completed'));
*/
export function doOnSubscribe<T>(callback: () => void): (source: Observable<T>) => Observable<T> {
return function inner(source: Observable<T>): Observable<T> {
return empty().pipe(
tap(null, null, callback),
concat<T>(source)
);
};
}
This function is easily implemented by composition of the existing functions of RxJs5.
Rx.Observable.prototype.doOnSubscribe = function (onSubscribe) {
return Rx.Observable.empty()
.do(null,null, onSubscribe)
.concat(this);
};
Rx.Observable.from([1,2,3])
.doOnSubscribe(() => console.log('subscribed to stream'))
.subscribe(console.log, null, () => console.log('completed'))
<script src="https://cdnjs.cloudflare.com/ajax/libs/rxjs/5.0.3/Rx.js"></script>
When downstream subscribes to the doOnSubscribe
we first emit an empty observable and use the .do()
onComplete callback as doOnSubscribe
. Then we .concat()
the upstream to this and return this modified stream.