ReactiveX concat doesn't produce onNext from first observable if second fails immediately
Default behavior of observeOn
is that onError
events can jump in front of the queue, here is the quote from the docs:
Note that
onError
notifications will cut ahead ofonNext
notifications on the emission thread ifScheduler
is truly asynchronous.
Here is small test to illustrate the thing:
Scheduler newThreadScheduler = Schedulers.newThread();
Observable<Integer> stream = Observable.create(integerEmitter -> {
integerEmitter.onNext(1);
integerEmitter.onNext(2);
integerEmitter.onNext(3);
integerEmitter.onNext(4);
integerEmitter.onNext(5);
integerEmitter.onError(new RuntimeException());
}, Emitter.BackpressureMode.NONE);
TestSubscriber<Integer> subscriber = new TestSubscriber<>();
stream.subscribeOn(Schedulers.computation())
.observeOn(newThreadScheduler).subscribe(subscriber);
subscriber.awaitTerminalEvent();
subscriber.assertValues(1, 2, 3, 4, 5);
subscriber.assertError(RuntimeException.class);
Normally consumer would expect the following sequence: 1 > 2 > 3 > 4 > 5 > Error
. But using just observeOn
may put error ahead and test will fail.
This behavior was implemented long time ago here https://github.com/ReactiveX/RxJava/issues/1680, check for the motivation why it was done like that. To avoid such behavior one can use overloaded observeOn
with delayError
parameter:
indicates if the
onError
notification may not cut ahead ofonNext
notification on the other side of the scheduling boundary. Iftrue
a sequence ending inonError
will be replayed in the same order as was received from upstream
This is what you normally expect, so changing observeOn(newThreadScheduler)
to observeOn(newThreadScheduler, true)
will fix the test.
Then to the question of @Neil: why solution proposed by @Rostyslav is working? It is working, because there is no thread switch for the final sequence.
In the proposed solution final sequence is crafted from two sequences on the same thread: 1st sequence is data from cache, 2nd sequence is just error from network. They are crafted together on the same thread and after there is no thread switch - subscriber observes on the AndroidSchedulers.mainThread()
. If you try to change final Scheduler
to some other, it will fail again.
Operators in RxJava are designed to short-circuit onError notifications in general. Because the observables being concatenated are asynchronous sources then you are experiencing the short-circuit. If you don't want the short-circuit then you could do a concat on materialized observables and then perform the processing you desire:
Observable.concat(
getContentFromCache.materialize().subscribeOn(dbScheduler),
getContentFromNetwork.materialize().subscribeOn(networkScheduler)
)
Another approach would be to use onErrorResumeNext
:
Observable.concat(
getContentFromCache.subscribeOn(dbScheduler),
getContentFromNetwork.onErrorResumeNext(something)
.subscibeOn(networkScheduler)
)