RxJava 2 overriding IO scheduler in unit test

Figured it out! It had to do with the fact that in this code:

validate(data)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .flatMap { ... }

validate(data) was returning an Observable, which was emitting the following: emitter.onNext(null). Since RxJava 2 no longer accepts null values, flatMap was not getting called. I changed validate to return a Completable and updated the scheduler override to the following:

RxJavaPlugins.setIoSchedulerHandler { Schedulers.trampoline() }

Now the tests pass!


I suggest you take a different approach and add a layer of abstraction to your schedulers. This guy has a nice article about it.

It would look something like this in Kotlin

interface SchedulerProvider {
    fun ui(): Scheduler
    fun computation(): Scheduler
    fun trampoline(): Scheduler
    fun newThread(): Scheduler
    fun io(): Scheduler 
}

And then you override that with your own implementation of SchedulerProvider:

class AppSchedulerProvider : SchedulerProvider {
    override fun ui(): Scheduler {
        return AndroidSchedulers.mainThread()
    }

    override fun computation(): Scheduler {
        return Schedulers.computation()
    }

    override fun trampoline(): Scheduler {
        return Schedulers.trampoline()
    }

    override fun newThread(): Scheduler {
        return Schedulers.newThread()
    }

    override fun io(): Scheduler {
        return Schedulers.io()
    }
}

And one for testing classes:

class TestSchedulerProvider : SchedulerProvider {
    override fun ui(): Scheduler {
        return Schedulers.trampoline()
    }

    override fun computation(): Scheduler {
        return Schedulers.trampoline()
    }

    override fun trampoline(): Scheduler {
        return Schedulers.trampoline()
    }

    override fun newThread(): Scheduler {
        return Schedulers.trampoline()
    }

    override fun io(): Scheduler {
        return Schedulers.trampoline()
    }
}

Your code would look like this where you call RxJava:

mCompositeDisposable.add(mDataManager.getQuote()
        .subscribeOn(mSchedulerProvider.io())
        .observeOn(mSchedulerProvider.ui())
        .subscribe(Consumer<Quote> {
...

And you'll just override your implementation of SchedulerProvider based on where you test it. Here's a sample project for reference, I am linking the test file that would use the testable-version of SchedulerProvider: https://github.com/Obaied/DingerQuotes/blob/master/app/src/test/java/com/obaied/dingerquotes/QuotePresenterTest.kt#L31