RxJava Observing on calling/subscribing thread
Here's a simplified example updated for RxJava 2. It's the same concept as Marek's answer: an Executor that adds the runnables to a BlockingQueue that's being consumed on the caller's thread.
public class ThreadTest {
@Test
public void test() throws InterruptedException {
final BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
System.out.println("Caller thread: " + Thread.currentThread().getName());
Observable.fromCallable(new Callable<Integer>() {
@Override
public Integer call() throws Exception {
System.out.println("Observable thread: " + Thread.currentThread().getName());
return 1;
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.from(new Executor() {
@Override
public void execute(@NonNull Runnable runnable) {
tasks.add(runnable);
}
}))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
System.out.println("Observer thread: " + Thread.currentThread().getName());
}
});
tasks.take().run();
}
}
// Output:
// Caller thread main
// Observable thread RxCachedThreadScheduler-1
// Observer thread main
To answer your question, let me start from beginning, this allows other people to understand what you already know.
Schedulers
Schedulers play the same role as Executors for Java. Briefly - they decide on which thread actions are executed.
Usually an Observable and operators execute in current thread. Sometimes you can pass Scheduler to Observable or operator as a parameter (e.g. Observable.timer()).
Additionally RxJava provides 2 operators to specify Scheduler:
- subscribeOn - specify the Scheduler on which an Observable will operate
- observeOn - specify the Scheduler on which an observer will observe this Observable
To understand them quickly, I use a the example code:
On all samples, I will use helper createObservable, which emits a name of thread on which the Observable operates:
public static Observable<String> createObservable(){
return Observable.create((Subscriber<? super String> subscriber) -> {
subscriber.onNext(Thread.currentThread().getName());
subscriber.onCompleted();
}
);
}
Without schedulers:
createObservable().subscribe(message -> {
System.out.println("Case 1 Observable thread " + message);
System.out.println("Case 1 Observer thread " + Thread.currentThread().getName());
});
//will print:
//Case 1 Observable thread main
//Case 1 Observer thread main
SubscribeOn:
createObservable()
.subscribeOn(Schedulers.newThread())
.subscribe(message -> {
System.out.println("Case 2 Observable thread " + message);
System.out.println("Case 2 Observer thread " + Thread.currentThread().getName());
});
//will print:
//Case 2 Observable thread RxNewThreadScheduler-1
//Case 2 Observer thread RxNewThreadScheduler-1
SubscribeOn and ObserveOn:
reateObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.newThread())
.subscribe(message -> {
System.out.println("Case 3 Observable thread " + message);
System.out.println("Case 3 Observer thread " + Thread.currentThread().getName());
});
//will print:
//Case 3 Observable thread RxNewThreadScheduler-2
//Case 3 Observer thread RxNewThreadScheduler-1
ObserveOn:
createObservable()
.observeOn(Schedulers.newThread())
.subscribe(message -> {
System.out.println("Case 4 Observable thread " + message);
System.out.println("Case 4 Observer thread " + Thread.currentThread().getName());
});
//will print:
//Case 4 Observable thread main
//Case 4 Observer thread RxNewThreadScheduler-1
Answer:
AndroidSchedulers.mainThread() returns a sheduler which delegates work to MessageQueue associated with main thread.
For this purpose it uses android.os.Looper.getMainLooper() and android.os.Handler.
In other words, if you would like to specify particular thread, you must provide means to schedule and perform tasks on thread.
Underneath it may use any kind of MQ for storing tasks and logic which loops the Quee and execute tasks.
In java, we have Executor which is designated for such tasks. RxJava can easily create Scheduler from such Executor.
Below is example which shows how you can observe on main thread (not particular useful but show all required parts).
public class RunCurrentThread implements Executor {
private BlockingQueue<Runnable> tasks = new LinkedBlockingQueue<>();
public static void main(String[] args) throws InterruptedException {
RunCurrentThread sample = new RunCurrentThread();
sample.observerOnMain();
sample.runLoop();
}
private void observerOnMain() {
createObservable()
.subscribeOn(Schedulers.newThread())
.observeOn(Schedulers.from(this))
.subscribe(message -> {
System.out.println("Observable thread " + message);
System.out.println("Observer thread " + Thread.currentThread().getName());
});
;
}
public Observable<String> createObservable() {
return Observable.create((Subscriber<? super String> subscriber) -> {
subscriber.onNext(Thread.currentThread().getName());
subscriber.onCompleted();
}
);
}
private void runLoop() throws InterruptedException {
while(!Thread.interrupted()){
tasks.take().run();
}
}
@Override
public void execute(Runnable command) {
tasks.add(command);
}
}
And the last question, why your code does not terminate:
ThreadPoolExecutor uses non deamon threads by defult, thus your program does not end until they exist. You should use shutdown method to close the threads.