How to handle exceptions in OnNext when using ObserveOn?

We're addressing this issue in Rx v2.0, starting with the RC release. You can read all about it on our blog at http://blogs.msdn.com/rxteam. It basically boils down to more disciplined error handling in the pipeline itself, combined with a SubscribeSafe extension method (to redirect errors during subscription into the OnError channel), and a Catch extension method on IScheduler (to wrap a scheduler with exception handling logic around scheduled actions).

Concerning the ExceptionToError method proposed here, it has one flaw. The IDisposable subscription object can still be null when the callbacks run; there's a fundamental race condition. To work around this, you'd have to use a SingleAssignmentDisposable.


I looked at the native SubscribeSafe method that is supposed to solve this problem, but I can't make it work. This method has a single overload that accepts an IObserver<T>:

// Subscribes to the specified source, re-routing synchronous exceptions during
// invocation of the IObservable<T>.Subscribe(IObserver<T>) method to the
// observer's IObserver<T>.OnError(Exception) channel. This method is typically
// used when writing query operators.
public static IDisposable SubscribeSafe<T>(this IObservable<T> source,
    IObserver<T> observer);

I tried passing an observer created by the Observer.Create factory method, but the exceptions in the onNext handler continue crashing the process¹, just like they do with the normal Subscribe. So I ended up writing my own version of SubscribeSafe. This one accepts three handlers as arguments, and funnels any exceptions thrown by the onNext and onCompleted handlers to the onError handler.

/// <summary>Subscribes an element handler, an error handler, and a completion
/// handler to an observable sequence. Any exceptions thrown by the element or
/// the completion handler are propagated through the error handler.</summary>
public static IDisposable SubscribeSafe<T>(this IObservable<T> source,
    Action<T> onNext, Action<Exception> onError, Action onCompleted)
{
    // Arguments validation omitted
    var disposable = new SingleAssignmentDisposable();
    disposable.Disposable = source.Subscribe(
        value =>
        {
            try { onNext(value); } catch (Exception ex) { onError(ex); disposable.Dispose(); }
        }, onError, () =>
        {
            try { onCompleted(); } catch (Exception ex) { onError(ex); }
        }
    );
    return disposable;
}

Beware, an unhandled exception in the onError handler will still crash the process!

¹ Only exceptions thrown when the handler is invoked asynchronously on the ThreadPool.


There's a difference between errors in subscription and errors in the observable. A quick test:

var xs = new Subject<int>();

xs.Subscribe(x => { Console.WriteLine(x); if (x % 3 == 0) throw new System.Exception("Error in subscription"); }, 
             ex => Console.WriteLine("Error in source: " + ex.Message));

Run with this and you'll get a nice handled error in the source:

xs.OnNext(1);
xs.OnNext(2);
xs.OnError(new Exception("from source"));

Run with this and you'll get an unhandled error in the subscription:

xs.OnNext(1);
xs.OnNext(2);
xs.OnNext(3);

What your solution has done is take errors in the subscription and make them errors in the source. And you've done this on the original stream, rather than on a per subscription basis. You may or may not have intended to do this, but it's almost certainly wrong.

The 'right' way to do it is to add the error handling you need directly to the subscribing action, which is where it belongs. If you don't want to modify your subscription functions directly, you can use a little helper:

public static Action<T> ActionAndCatch<T>(Action<T> action, Action<Exception> catchAction)
{
    return item =>
    {
        try { action(item); }
        catch (System.Exception e) { catchAction(e); }
    };
}

And now to use it, again showing the difference between the different errors:

xs.Subscribe(ActionAndCatch<int>(x => { Console.WriteLine(x); if (x % 3 == 0) throw new System.Exception("Error in subscription"); },
                                 ex => Console.WriteLine("Caught error in subscription: " + ex.Message)),
             ex => Console.WriteLine("Error in source: " + ex.Message));

Now we can handle (separately) errors in the source and error in the subscription. Of course, any of these actions can be defined in a method, making the above code as simple as (potentially):

xs.Subscribe(ActionAndCatch(Handler, ExceptionHandler), SourceExceptionHandler);

Edit

In the comments we then started discussing the fact that errors in the subscription are pointing to errors in the stream itself, and you wouldn't want other subscribers on that stream. This is a completely different type of issue. I would be inclined to write an observable Validate extension to handle this scenario:

public static IObservable<T> Validate<T>(this IObservable<T> source, Predicate<T> valid)
{
    return Observable.Create<T>(o => {
        return source.Subscribe(
            x => {
                if (valid(x)) o.OnNext(x);
                else       o.OnError(new Exception("Could not validate: " + x));
            }, e => o.OnError(e), () => o.OnCompleted()
        );
    });
}

Then simple to use, without mixing metaphors (errors only in source):

xs
.Validate(x => x != 3)
.Subscribe(x => Console.WriteLine(x),
             ex => Console.WriteLine("Error in source: " + ex.Message));

If you still want suppressed exceptions in Subscribe you should use one of the other discussed methods.


Your current solution is not ideal. As stated by one of the Rx people here:

Rx operators do not catch exceptions that occur in a call to OnNext, OnError, or OnCompleted. This is because we expect that (1) the observer implementor knows best how to handle those exceptions and we can't do anything reasonable with them and (2) if an exception occurs then we want that to bubble out and not be handled by Rx.

Your current solution gets the IObservable to handle errors thrown by the IObserver, which doesn't make sense as semantically the IObservable should have no knowledge of the things observing it. Consider the following example:

var errorFreeSource = new Subject<int>();
var sourceWithExceptionToError = errorFreeSource.ExceptionToError();
var observerThatThrows = Observer.Create<int>(x =>
  {
      if (x % 5 == 0)
          throw new Exception();
  },
  ex => Console.WriteLine("There's an argument that this should be called"),
  () => Console.WriteLine("OnCompleted"));
var observerThatWorks = Observer.Create<int>(
    x => Console.WriteLine("All good"),
    ex => Console.WriteLine("But definitely not this"),
    () => Console.WriteLine("OnCompleted"));
sourceWithExceptionToError.Subscribe(observerThatThrows);
sourceWithExceptionToError.Subscribe(observerThatWorks);
errorFreeSource.OnNext(1);
errorFreeSource.OnNext(2);
errorFreeSource.OnNext(3);
errorFreeSource.OnNext(4);
errorFreeSource.OnNext(5);
Console.ReadLine();

Here there is no issue with the source, or the observerThatWorks, but its OnError will be called due to an unrelated error with another Observer. To stop exceptions in a different thread from ending the process, you'll have to catch them in that thread, so put a try/catch block in your observers.