Do I have to unsubscribe from completed observable?
Yes you are correct.
After a stream is terminated ( onComplete / onError has been called ), subscriber unsubscribes automatically. You should be able to test these behaviors using isUnsubscribed()
method on the Subscription object.
While you do not need to manually unsubscribe from a terminated stream, you can still create a memory leak using RxJava2 if you are not careful.
Consider the following code:
repository.getData()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(data -> myTextView.setText(data.toString()));
The lambda parameter in the subscribe is "syntatic sugar" over an anonymous inner class:
subscribe(new Consumer<Data>() {
@Override
public void accept(final Data data) {
myTextView.setText(data.toString());
}
});
On the JVM, an anonymous inner class maintains a reference to the outer class.
Assume that for the above naive code, the outer class is an Activity (this would also follow for a Fragment, Service, BroadcastReceiver or any class whose lifecycle is controlled by the Android OS).
The Activity subscribes to the Observer but then is destroyed by the Android OS in conditions of low-memory (you can mimic this effect by turning on Developer Options/Don't Keep Activities). If the work on Schedulers.io()
is still running when the Activity is destroyed, a reference will still be maintained to the Activity through the anonymous inner class. This means a memory leak that prevents the Activity from being finalized by the garbage collector. If the Activity has a number of Views or, say, a Bitmap object then the memory leak can be quite substantial.
There are a number of solutions here but one of them is to maintain a CompositeDisposable
object and to clear this in the onDestroy()
lifecycle method of the Android Activity:
public class MyActivity extends Activity {
DataRepository dataRepository;
CompositeDisposable disposables;
@Override
public void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
disposables = new CompositeDisposable();
}
public void onButtonClick(View v) {
repository.getData()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.doOnSubscribe(disposable -> disposables.add(disposable))
.subscribe(data -> myTextView.setText(data.toString()));
}
@Override
public void onDestroy() {
disposables.clear();
super.onDestroy();
}
}
You can refer to a good example of how to use RxJava in an Android app in the official Google Android Architecture Blueprints.