RxJava `Completable.andThen` is not executing serially?
The issue is not with andThen
but with the statement Observable.just(mUser.name)
inside andThen
. The just
operator will try to create the observable immediately though it will emit only after Completable.fromAction
.
Problem here is , while trying to create the Observable
using just , the mUser
is null.
Solution : You need to defer the creation of the String Observable till a subscription happens , till the upstream of andThen
starts emission.
Instead of andThen(Observable.just(mUser.name));
use
andThen(Observable.defer(() -> Observable.just(mUser.name)));
Or
andThen(Observable.fromCallable(() -> mUser.name));
I don't think @Sarath Kn's answer is 100% correct. Yes just
will create observable as soon as it's called, but andThen
is still calling just
at an unexpected time.
We can compare andThen
with flatMap
to get some better understanding. Here is a fully runnable test:
package com.example;
import org.junit.Test;
import io.reactivex.Completable;
import io.reactivex.Observable;
import io.reactivex.observers.TestObserver;
import io.reactivex.schedulers.Schedulers;
public class ExampleTest {
@Test
public void createsIntermediateObservable_AfterSubscribing() {
Observable<String> coldObservable = getObservableSource()
.flatMap(integer -> getIntermediateObservable())
.subscribeOn(Schedulers.trampoline())
.observeOn(Schedulers.trampoline());
System.out.println("Cold obs created... subscribing");
TestObserver<String> testObserver = coldObservable.test();
testObserver.awaitTerminalEvent();
/*
Resulting logs:
Creating observable source
Cold obs created... subscribing
Emitting 1,2,3
Creating intermediate observable
Creating intermediate observable
Creating intermediate observable
Emitting complete notification
IMPORTANT: see that intermediate observables are created AFTER subscribing
*/
}
@Test
public void createsIntermediateObservable_BeforeSubscribing() {
Observable<String> coldObservable = getCompletableSource()
.andThen(getIntermediateObservable())
.subscribeOn(Schedulers.trampoline())
.observeOn(Schedulers.trampoline());
System.out.println("Cold obs created... subscribing");
TestObserver<String> testObserver = coldObservable.test();
testObserver.awaitTerminalEvent();
/*
Resulting logs:
Creating completable source
Creating intermediate observable
Cold obs created... subscribing
Emitting complete notification
IMPORTANT: see that intermediate observable is created BEFORE subscribing =(
*/
}
private Observable<Integer> getObservableSource() {
System.out.println("Creating observable source");
return Observable.create(emitter -> {
System.out.println("Emitting 1,2,3");
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
System.out.println("Emitting complete notification");
emitter.onComplete();
});
}
private Observable<String> getIntermediateObservable() {
System.out.println("Creating intermediate observable");
return Observable.just("A");
}
private Completable getCompletableSource() {
System.out.println("Creating completable source");
return Completable.create(emitter -> {
System.out.println("Emitting complete notification");
emitter.onComplete();
});
}
}
You can see that when we use flatmap
, the just
is called after subscribing, which makes sense. If the intermediate observable depended on the items emitted to the flatmap
then of course the system can't create the intermediate observable before subscription. It would not yet have any values. You can imagine this wouldn't work if flatmap
called just
before subscribing:
.flatMap(integer -> getIntermediateObservable(integer))
What is weird is that andThen
is able to create it's inner observable (i.e. call just
) before subscribing. It makes sense that it can do this. The only thing andThen
is going to receive is a complete notification, so there is no reason NOT to create the intermediate observable early. The only problem is that it's not the expected behavior.
@Sarath Kn's solution is correct, but for the wrong reason. If we use defer
we can see things working as expected:
@Test
public void usingDefer_CreatesIntermediateObservable_AfterSubscribing() {
Observable<String> coldObservable = getCompletableSource()
.andThen(Observable.defer(this::getIntermediateObservable))
.subscribeOn(Schedulers.trampoline())
.observeOn(Schedulers.trampoline());
System.out.println("Cold obs created... subscribing");
TestObserver<String> testObserver = coldObservable.test();
testObserver.awaitTerminalEvent();
/*
Resulting logs:
Creating completable source
Cold obs created... subscribing
Emitting complete notification
Creating intermediate observable
IMPORTANT: see that intermediate observable is created AFTER subscribing =) YEAY!!
*/
}