Reactive Throttle Returning All Items Added Within The TimeSpan
As I answered in the other post, yes you can! Using the Throttle
and Window
methods of Observable
:
public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
{
var closes = stream.Throttle(delay);
return stream.Window(() => closes).SelectMany(window => window.ToList());
}
I amended Colonel Panic's BufferUntilInactive
operator by adding a Publish
component, so that it works correctly with cold observables too:
/// <summary>Projects each element of an observable sequence into consecutive
/// non-overlapping buffers, which are produced based on time and activity,
/// using the specified scheduler to run timers.</summary>
public static IObservable<IList<T>> BufferUntilInactive<T>(
this IObservable<T> source, TimeSpan dueTime, IScheduler scheduler = default)
{
scheduler ??= Scheduler.Default;
return source.Publish(published =>
published
.Window(() => published.Throttle(dueTime, scheduler))
.SelectMany(window => window.ToList())
);
}
For completeness I've also added an optional IScheduler
parameter, which configures the scheduler where the timer is run.