Reactive Extensions: Process events in batches + add delay between every batch
There's a specific Buffer method overload just for this: https://msdn.microsoft.com/en-us/library/hh229200(v=vs.103).aspx
observable.Buffer(TimeSpan.FromSeconds(5), 50);
If you'd prefer not to sleep threads, you can do this:
var tick = Observable.Interval(TimeSpan.FromSeconds(5));
eventAsObservable
.Buffer(50)
.Zip(tick, (res, _) => res)
.Subscribe(DoProcessing);
This is a surprisingly difficult problem to solve. More so because the enticing idea of using the Zip
operator to align the observable with an Observable.Interval
, is buggy and dangerously inefficient. The main problem with the Zip
operator, when used with asymmetric observables, is that it buffers the elements of the fastest producing observable, resulting potentially to massive memory allocation during a long-life subscription. IMHO the use of this operator should be limited to pairs of observables that are expected to produce an equal (or close to equal) number of elements in the long run.
The buggy behavior of the Zip
+Observable.Interval
combo emerges when the Observable.Interval
emits values faster than the source observable. In that case the superfluous values emitted by the Observable.Interval
are buffered, so when the source observable emits the next element there is already a buffered Interval
value to form a pair, resulting to the violation of the "minimum interval between elements" policy.
Below is an implementation of a custom WithInterval
operator that imposes a minimum interval between consecutive elements of an observable sequence. This operator will then be used for solving the specific problem of this question, that involves buffers instead of individual elements:
/// <summary>Intercepts a minimum interval between consecutive elements of an
/// observable sequence.</summary>
public static IObservable<T> WithInterval<T>(this IObservable<T> source,
TimeSpan interval, IScheduler scheduler = null)
{
return source
.Scan((Observable.Return(0L), (IObservable<T>)null), (state, x) =>
{
var (previousTimer, _) = state;
var timer = (scheduler != null ? Observable.Timer(interval, scheduler)
: Observable.Timer(interval)).PublishLast();
var delayed = previousTimer.Select(_ => x).Finally(() => timer.Connect());
return (timer, delayed);
})
.Select(e => e.Item2)
.Concat();
}
This implementation places an Observable.Timer
between consecutive elements. The tricky part is how to activate each timer at exactly the right moment. It is achieved by Publish
ing the timers, and having each timer warm (Connect
) the next one when it completes.
With this operator in place, implementing a custom BatchWithInterval
operator is trivial:
/// <summary>Projects each element of an observable sequence into consecutive
/// non-overlapping buffers which are produced based on element count information,
/// intercepting a minimum interval between consecutive buffers.</summary>
public static IObservable<IList<T>> BatchWithInterval<T>(this IObservable<T> source,
int count, TimeSpan interval, IScheduler scheduler = null)
{
return source.Buffer(count).WithInterval(interval, scheduler);
}
Usage example:
var subscription = eventAsObservable
.BatchWithInterval(50, TimeSpan.FromSeconds(10))
.Subscribe(DoProcessing);