Observable from chained Tasks
If you are going to roll your own async Generate
function I would recommend the use of recursive scheduling instead of wrapping a while loop.
public static IObservable<TResult> Generate<TResult>(
Func<Task<TResult>> initialState,
Func<TResult, bool> condition,
Func<TResult, Task<TResult>> iterate,
Func<TResult, TResult> resultSelector,
IScheduler scheduler = null)
{
var s = scheduler ?? Scheduler.Default;
return Observable.Create<TResult>(async obs => {
return s.Schedule(await initialState(), async (state, self) =>
{
if (!condition(state))
{
obs.OnCompleted();
return;
}
obs.OnNext(resultSelector(state));
self(await iterate(state));
});
});
}
This has a couple of advantages. First, you are able to cancel this, with a simple while loop there is no way to cancel it directly, in fact you don't even return for the subscribe function until the observable has completed. Second, this lets you control the scheduling/asynchrony of each item (which makes testing a breeze), this also makes it a better overall fit for library
After doing a good bit of testing I think this does the job nicely using the built-in Rx operators.
public static IObservable<TResult> Generate<TResult>(
Func<Task<TResult>> initialState,
Func<TResult, bool> condition,
Func<TResult, Task<TResult>> iterate,
Func<TResult, TResult> resultSelector,
IScheduler scheduler = null)
{
return Observable.Create<TResult>(o =>
{
var current = default(TResult);
return
Observable
.FromAsync(initialState)
.Select(y => resultSelector(y))
.Do(c => current = c)
.Select(x =>
Observable
.While(
() => condition(current),
Observable
.FromAsync(() => iterate(current))
.Select(y => resultSelector(y))
.Do(c => current = c))
.StartWith(x))
.Switch()
.Where(x => condition(x))
.ObserveOn(scheduler ?? Scheduler.Default)
.Subscribe(o);
});
}
I've tested this code with the following:
bool Continue(IEnumerable<BrokeredMessage> prev)
{
return prev.Any();
}
Task<IEnumerable<BrokeredMessage>> ProduceFirst()
{
return
Task.FromResult(
EnumerableEx.Return(
new BrokeredMessage()
{
SequenceNumber = 1
}));
}
Task<IEnumerable<BrokeredMessage>> ProduceNext(IEnumerable<BrokeredMessage> prev)
{
return Task.FromResult(
prev.Last().SequenceNumber < 3
? EnumerableEx.Return(
new BrokeredMessage()
{
SequenceNumber = prev.Last().SequenceNumber + 1
})
: Enumerable.Empty<BrokeredMessage>());
}
public class BrokeredMessage
{
public int SequenceNumber;
}
And running this sequence:
var ob = Generate(
async () => await ProduceFirst(),
prev => Continue(prev),
async prev => await ProduceNext(prev),
item => item);
I got this result:
My test code also used the Reactive Extension team's Interactive Extensions - NuGet "Ix-Main".