Reactive Extensions: Throttle/Sample with varying interval
Many hours later, and with some sleep on it, I got it.
public static IObservable<T> Sample<T>(this IObservable<T> source, Func<T, TimeSpan> intervalSelector)
{
return source.TimeInterval()
.Scan(Tuple.Create(TimeSpan.Zero, false, default(T)), (acc, v) =>
{
if(v.Interval >= acc.Item1)
{
return Tuple.Create(intervalSelector(v.Value), true, v.Value);
}
return Tuple.Create(acc.Item1 - v.Interval, false, v.Value);
})
.Where(t => t.Item2)
.Select(x => x.Item3);
}
This works as I want: each time it produces a value x
, it stops producing values until intervalSelector(x)
time passes.