Reactive Throttle Returning All Items Added Within The TimeSpan

As I answered in the other post, yes you can! Using the Throttle and Window methods of Observable:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
{
    var closes = stream.Throttle(delay);
    return stream.Window(() => closes).SelectMany(window => window.ToList());
}

I amended Colonel Panic's BufferUntilInactive operator by adding a Publish component, so that it works correctly with cold observables too:

/// <summary>Projects each element of an observable sequence into consecutive
/// non-overlapping buffers, which are produced based on time and activity,
/// using the specified scheduler to run timers.</summary>
public static IObservable<IList<T>> BufferUntilInactive<T>(
    this IObservable<T> source, TimeSpan dueTime, IScheduler scheduler = default)
{
    scheduler ??= Scheduler.Default;
    return source.Publish(published =>
        published
            .Window(() => published.Throttle(dueTime, scheduler))
            .SelectMany(window => window.ToList())
    );
}

For completeness I've also added an optional IScheduler parameter, which configures the scheduler where the timer is run.