DisposableObserver vs. (Regular) Observer

It's mainly to avoid memory leaks, as it lets you cancel the subscription at any time. It can happen that an object that owns a subscription object reaches the end of its life cycle while the subscription is doing some heavy work that requires more time. In this case, the subscription will remain in memory, thus leaking the object that owns it. To avoid this, you can store the return value of the subscription (the disposable) and call dispose later at the end of the owner object's life cycle. Usually people end up adding each disposable to a CompositeDisposable (which is basically a bag of disposables) and then clearing them all together with CompositeDisposable.clear(), so that you don't have to keep track of each individual disposable.

Let's say you have a class called ItemPublisher that provides an infinite stream of events:

class ItemPublisher {
  // ...

  public Flowable<Item> getItemsStream() {
    // ...
  }
}

Now, imagine that you have a class called ItemHandler that subscribes to that stream:

class ItemHandler {
  // ...

  public void observeItems() {

    itemPublisher.getItemsStream()
      .subscribe(
        // handle onNext events,
        // handle onError events
        // infinite stream, so we don't need to handle onComplete
      );
  }
}

So, your software is running, and everything's great. At a certain moment, your ItemHandler instance reaches the end of its lifetime. The instance is supposed to be destroyed. However, since we're dealing with the Observer pattern here, ItemPublisher retains an implicit reference to the ItemHandler instance, that was passed to it when you called the subscribe method. Since the stream is infinite, that reference will never be deleted, thus stoping the GC from cleaning up the ItemHandler instance, causing a memory leak. This does not happen with infinite streams only: if ItemPublisher has a longer life than ItemHandler, the same will happen.

That is why we have the Disposable interface. When you call subscribe, you can use the overloaded version of the method that returns a Disposable. When you don't need the subscription anymore, you can call dispose() on it. So, in our example:

class ItemHandler {
  private Disposable subscriber;  // this gets initialized somewhere

  // ...

  public void observeItems() {

    itemPublisher.getItemsStream()
      .subscribe(this.getSubscriber());
  }


  // later, when you know that this ItemHandler instance is not needed anymore
  public void wrapItUp() {
    // ...
    subscriber.dispose();
  }
}

As I mentioned before, you also have the CompositeDisposable, that comes in handy when you have a lot of subscriptions. By using it, you can gather all subscriptions into one place, and then get rid of them all at once. For instance:

class ItemHandler {
  private CompositeDisposable disposables;

  // ...

  public void observeItems() {

    disposables.add(itemPublisher.getItemsStream()
      .subscribe(
        // ...
      )
    );
  }


  public void observeSomethingElse() {
     disposables.add(somethingElse.getStreamOfSomethingElse()
      .subscribe(
        // ...
      )
    );
  }

  // later, when you know that this ItemHandler instance is not needed anymore
  public void wrapItUp() {
    // ...
    disposables.clear();
  }
}

The key difference is in the fact that the implementation can be changed at any time, and the change causes the disposing of the current disposable.

"A MutableDisposable is a disposable that can have its implementation changed at any time. The act of changing the disposable implementation causes the current disposable to dispose."

Tags:

Java

Rx Java