Observable, retry on error and cache only if completed

This is the solution we ended up with, after extending akarnokd's solution:

public class OnErrorRetryCache<T> {

    public static <T> Observable<T> from(Observable<T> source) {
         return new OnErrorRetryCache<>(source).deferred;
    }

    private final Observable<T> deferred;
    private final Semaphore singlePermit = new Semaphore(1);

    private Observable<T> cache = null;
    private Observable<T> inProgress = null;

    private OnErrorRetryCache(Observable<T> source) {
        deferred = Observable.defer(() -> createWhenObserverSubscribes(source));
    }

    private Observable<T> createWhenObserverSubscribes(Observable<T> source) 
    {
        singlePermit.acquireUninterruptibly();

        Observable<T> cached = cache;
        if (cached != null) {
            singlePermit.release();
            return cached;
        }

        inProgress = source
                .doOnCompleted(this::onSuccess)
                .doOnTerminate(this::onTermination)
                .replay()
                .autoConnect();

        return inProgress;
    }

    private void onSuccess() {
        cache = inProgress;
    }

    private void onTermination() {
        inProgress = null;
        singlePermit.release();
    }
}

We needed to cache the result of an http request from Retrofit. So this was created, with an observable that emits a single item in mind.

If an observer subscribed while the http request was being executed, we wanted it to wait and not execute the request twice, unless the in-progress one failed. To do that the semaphore allows single access to the block that creates or returns the cached observable, and if a new observable is created, we wait until that one terminates. Tests for the above can be found here


Well, for anyone still interested, I think I have a nicer way to achieve it with rx.

The key note is to use onErrorResumeNext, which will let you replace the Observable in case of error. so it should look something like this:

Observable<Object> apiCall = createApiCallObservable().cache(1);
//future call
apiCall.onErrorResumeNext(new Func1<Throwable, Observable<? extends Object>>() {
    public Observable<? extends Object> call(Throwable throwable) {
        return  createApiCallObservable();
        }
    });

That way, if the first call has failed the future call will just recall it (only once).

but every other caller who will try to use the first observable will failed and make a different request.

you made a reference to the original observable, let's just update it.

so, a lazy getter:

Observable<Object> apiCall;
private Observable<Object> getCachedApiCall() {
    if ( apiCall == null){
        apiCall = createApiCallObservable().cache(1);
    }
    return apiCall;
}

now, a getter that will retry if the previous was failed:

private Observable<Object> getRetryableCachedApiCall() {
    return getCachedApiCall().onErrorResumeNext(new Func1<Throwable, Observable<? extends Object>>() {
        public Observable<? extends Object> call(Throwable throwable) {
            apiCall = null;
            return getCachedApiCall();
        }
    });
}

Please note that it will only retry once for each time it is called.

So now your code will look something like this:

---------------------------------------------
// the first time we need it - this will be without a retry if you want..
getCachedApiCall().andSomeOtherStuff()
               .subscribe(subscriberA);

---------------------------------------------
//in the future when we need it again - for any other call so we will have a retry
getRetryableCachedApiCall().andSomeDifferentStuff()
               .subscribe(subscriberB);

You have to do some state-handling. Here is how I'd do this:

public class CachedRetry {

    public static final class OnErrorRetryCache<T> {
        final AtomicReference<Observable<T>> cached = 
                new AtomicReference<>();

        final Observable<T> result;

        public OnErrorRetryCache(Observable<T> source) {
            result = Observable.defer(() -> {
                for (;;) {
                    Observable<T> conn = cached.get();
                    if (conn != null) {
                        return conn;
                    }
                    Observable<T> next = source
                            .doOnError(e -> cached.set(null))
                            .replay()
                            .autoConnect();

                    if (cached.compareAndSet(null, next)) {
                        return next;
                    }
                }
            });
        }

        public Observable<T> get() {
            return result;
        }
    }

    public static void main(String[] args) {
        AtomicInteger calls = new AtomicInteger();
        Observable<Integer> source = Observable
                .just(1)
                .doOnSubscribe(() -> 
                    System.out.println("Subscriptions: " + (1 + calls.get())))
                .flatMap(v -> {
                    if (calls.getAndIncrement() == 0) {
                        return Observable.error(new RuntimeException());
                    }
                    return Observable.just(42);
                });

        Observable<Integer> o = new OnErrorRetryCache<>(source).get();

        o.subscribe(System.out::println, 
                Throwable::printStackTrace, 
                () -> System.out.println("Done"));

        o.subscribe(System.out::println, 
                Throwable::printStackTrace, 
                () -> System.out.println("Done"));

        o.subscribe(System.out::println, 
                Throwable::printStackTrace, 
                () -> System.out.println("Done"));
    }
}

It works by caching a fully-successful source and returns it to everyone. Otherwise, a (partially) failed source will crear the cache and the next call observer will trigger a resubscription.