How to Separate IObservable and IObserver

At first you must familiarize yourself with the theory of "cold" and "hot" observables. Here is the definition from the Introduction to RX.

  1. Cold are sequences that are passive and start producing notifications on request (when subscribed to).
  2. Hot are sequences that are active and produce notifications regardless of subscriptions.

What you want is a hot observable, and the problem is that the Observable.Create method creates cold observables. But you can make any observable hot by using the Publish operator. This operator provides a way to have a single underlying subscription shared by multiple independent observers. Example:

int index = 0;
var coldObservable = Observable.Create<int>(observer =>
{
    _ = Task.Run(async () =>
    {
        while (true)
        {
            observer.OnNext(++index);
            await Task.Delay(1000);
        }
    });
    return Disposable.Empty;
});

IConnectableObservable<int> hotObservable = coldObservable.Publish();
hotObservable.Connect(); // Causes the start of the loop

hotObservable.Subscribe(s => Console.WriteLine($"Observer A received #{s}"));
hotObservable.Subscribe(s => Console.WriteLine($"Observer B received #{s}"));

The coldObservable created by the Observable.Create is subscribed when the hotObservable.Connect method is invoked, and then all notifications generated by that single subscription are propagated to all subscribers of the hotObservable.

Output:

Observer A received #1
Observer B received #1
Observer A received #2
Observer B received #2
Observer A received #3
Observer B received #3
Observer A received #4
Observer B received #4
Observer A received #5
Observer B received #5
Observer A received #6
Observer B received #6
...

Important: the purpose of the example above is to demonstrate the Publish operator, and not to serve as an example of good quality RX code. One of its problems is that by subscribing the observers after connecting to the source becomes theoretically possible that the first notification will not be send to some or all of the observers, because it may be created before their subscription. There is a race condition in other words.

There is an alternative method of managing the lifetime of an IConnectableObservable, the operator RefCount:

Returns an observable sequence that stays connected to the source as long as there is at least one subscription to the observable sequence.

var hotObservable = coldObservable.Publish().RefCount();

This way you don't need to Connect manually. The connection occurs automatically with the first subscription, and it is disposed automatically with the last unsubscription.


I've added this as an answer because I feel that the code that Christian posted in his answer is dangerous as it's mixing Tasks and Rx and there are race conditions.

Here's an alternative that fixes most of these issues:

public class UnitTest1
{
    private string GetData() => "Hi";
    
    private IDisposable Subscriber(IObservable<string> observable, string name) =>
        observable.Subscribe(s => Debug.WriteLine($"Name: {name} Message: {s}"));
    
    public async Task Messaging()
    {
        var coldObservable =
            Observable
                .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1.0))
                .Select(_ => GetData());
                
        var publisher = coldObservable.Publish();

        var subscriptions =
            new CompositeDisposable(
                Subscriber(publisher, "One"),
                Subscriber(publisher, "Two"),
                publisher.Connect());

        await Task.Delay(TimeSpan.FromSeconds(5.0));

        subscriptions.Dispose();
    }
}

Better yet, though, I would look at doing it this way:

public class UnitTest1
{
    private string GetData() => "Hi";
    
    private IObservable<string> Subscriber(IObservable<string> observable, string name) =>
        observable.Select(s => $"Name: {name} Message: {s}");
    
    public async Task Messaging()
    {
        var coldObservable =
            Observable
                .Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1.0))
                .Select(_ => GetData())
                .Do(_ => Debug.WriteLine("Called GetData()"))
                .Publish(published =>
                    Observable
                        .Merge(
                            Subscriber(published, "One"),
                            Subscriber(published, "Two")))
                .TakeUntil(Observable.Timer(TimeSpan.FromSeconds(5.0)))
                .Do(x => Debug.WriteLine(x));
    
        await coldObservable;
    }
}

It's always best to use the inbuilt operators for Rx rather than hybrid approaches with tasks.