RxJava: chaining observables
Sure, RxJava supports .map
which does this. From the RxJava Wiki:
Basically, it'd be:
loginObservable()
.switchMap( someData -> fetchUserDataObservable(someData) )
.map( userData -> cacheUserData(userData) )
.subscribe(new Subscriber<YourResult>() {
@Override
public void onCompleted() {
// observable stream has ended - no more logins possible
}
@Override
public void onError(Throwable e) {
// do something
}
@Override
public void onNext(YourType yourType) {
displayUserData();
}
});
This is the top post when Googling RxJava chain observables so I'll just add another common case where you wouldn't want to transform the data you receive, but chain it with another action (setting the data to a database, for example). Use .flatmap()
. Here's an example:
mDataManager
.fetchQuotesFromApi(limit)
.subscribeOn(mSchedulerProvider.io())
.observeOn(mSchedulerProvider.ui())
// OnErrorResumeNext and Observable.error() would propagate the error to
// the next level. So, whatever error occurs here, would get passed to
// onError() on the UI side.
.onErrorResumeNext(Function { Observable.error<List<Quote>>(it) })
.flatMap { t: List<Quote> ->
// Chain observable as such
mDataManager.setQuotesToDb(t).subscribe(
{},
{ e { "setQuotesToDb() error occurred: ${it.localizedMessage}" } },
{ d { "Done server set" } }
)
Observable.just(t)
}
.subscribeBy(
onNext = {},
onError = { mvpView?.showError("No internet connection") },
onComplete = { d { "onComplete(): done with fetching quotes from api" } }
)
This is RxKotlin2, but the idea is the same with RxJava & RxJava2:
Quick explanation:
- we try to fetch some data (quotes in this example) from an api with
mDataManager.fetchQuotesFromApi()
- We subscribe the observable to do stuff on
.io()
thread and show results on.ui()
thread. onErrorResumeNext()
makes sure that whatever error we encounter from fetching data is caught in this method. I wanna terminate the entire chain when there is an error there, so I return anObservable.error()
.flatmap()
is the chaining part. I wanna be able to set whatever data I get from the API to my database. I'm not transforming the data I received using.map()
, I'm simply doing something else with that data without transforming it.- I subscribe to the last chain of observables. If an error occurred with fetching data (first observable), it would be handled (in this case, propagated to the subscribed
onError()
) withonErrorResumeNext()
- I am very conscious that I'm subscribing to the DB observable (inside
flatmap()
). Any error that occurs through this observable will NOT be propagated to the lastsubscribeBy()
methods, since it is handled inside thesubscribe()
method inside the.flatmap()
chain.
The code comes from this project which is located here: https://github.com/Obaied/Sohan/blob/master/app/src/main/java/com/obaied/dingerquotes/ui/start/StartPresenter.kt