RX Observable.TakeWhile checks condition BEFORE each element but I need to perform the check after
There's no built in operators to do what you're asking, but here's one that uses Publish
to run two queries while only subscribing to the underlying observable once:
// Emits matching values, but includes the value that failed the filter
public static IObservable<T> TakeWhileInclusive<T>(
this IObservable<T> source, Func<T, bool> predicate)
{
return source.Publish(co => co.TakeWhile(predicate)
.Merge(co.SkipWhile(predicate).Take(1)));
}
And then:
var obs = listOfCommands.ToObservable()
.TakeWhileInclusive(c.CurrentIndex != c.TotalCount);
Final edit:
I based my solution off of Sergey's TakeWhileInclusive implementation in this thread - How to complete a Rx Observable depending on a condition in a event
public static IObservable<TSource> TakeUntil<TSource>(
this IObservable<TSource> source, Func<TSource, bool> predicate)
{
return Observable
.Create<TSource>(o => source.Subscribe(x =>
{
o.OnNext(x);
if (predicate(x))
o.OnCompleted();
},
o.OnError,
o.OnCompleted
));
}
You can use the TakeUntil
operator to take every item until a secondary source produces a value; in this case we can specify the second stream to be the first value after the predicate passes:
public static IObservable<TSource> TakeWhileInclusive<TSource>(
this IObservable<TSource> source,
Func<TSource, bool> predicate)
{
return source.TakeUntil(source.SkipWhile(x => predicate(x)).Skip(1));
}