Observable from chained Tasks

If you are going to roll your own async Generate function I would recommend the use of recursive scheduling instead of wrapping a while loop.

public static IObservable<TResult> Generate<TResult>(
    Func<Task<TResult>> initialState,
    Func<TResult, bool> condition,
    Func<TResult, Task<TResult>> iterate,
    Func<TResult, TResult> resultSelector,
    IScheduler scheduler = null) 
{
  var s = scheduler ?? Scheduler.Default;

  return Observable.Create<TResult>(async obs => {
    return s.Schedule(await initialState(), async (state, self) => 
    {
      if (!condition(state))
      {
        obs.OnCompleted();
        return;
      }

      obs.OnNext(resultSelector(state));

      self(await iterate(state));

    });
  });
}

This has a couple of advantages. First, you are able to cancel this, with a simple while loop there is no way to cancel it directly, in fact you don't even return for the subscribe function until the observable has completed. Second, this lets you control the scheduling/asynchrony of each item (which makes testing a breeze), this also makes it a better overall fit for library


After doing a good bit of testing I think this does the job nicely using the built-in Rx operators.

public static IObservable<TResult> Generate<TResult>(
    Func<Task<TResult>> initialState,
    Func<TResult, bool> condition,
    Func<TResult, Task<TResult>> iterate,
    Func<TResult, TResult> resultSelector,
    IScheduler scheduler = null) 
{
    return Observable.Create<TResult>(o =>
    {
        var current = default(TResult);
        return
            Observable
                .FromAsync(initialState)
                .Select(y => resultSelector(y))
                .Do(c => current = c)
                .Select(x =>
                    Observable
                        .While(
                            () => condition(current),
                            Observable
                                .FromAsync(() => iterate(current))
                                .Select(y => resultSelector(y))
                        .Do(c => current = c))
                        .StartWith(x))
                .Switch()
                .Where(x => condition(x))
                .ObserveOn(scheduler ?? Scheduler.Default)
                .Subscribe(o);
    });
}

I've tested this code with the following:

bool Continue(IEnumerable<BrokeredMessage> prev)
{
    return prev.Any();
}

Task<IEnumerable<BrokeredMessage>> ProduceFirst()
{
    return
        Task.FromResult(
            EnumerableEx.Return(
                new BrokeredMessage()
                {
                    SequenceNumber = 1
                }));
}

Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev) 
{
    return Task.FromResult(
        prev.Last().SequenceNumber < 3
            ? EnumerableEx.Return(
                new BrokeredMessage()
                {
                    SequenceNumber = prev.Last().SequenceNumber + 1 
                })
            : Enumerable.Empty<BrokeredMessage>());
}

public class BrokeredMessage
{
    public int SequenceNumber;
}

And running this sequence:

var ob = Generate(
    async () => await ProduceFirst(),
    prev => Continue(prev),
    async prev => await ProduceNext(prev),
    item => item);

I got this result:

result

My test code also used the Reactive Extension team's Interactive Extensions - NuGet "Ix-Main".