How to use Rx.Nex extension ForEachAsync with async action
Would it be possible to write an extension which would accept an async action?
Not directly.
Rx subscriptions are necessarily synchronous because Rx is a push-based system. When a data item arrives, it travels through your query until it hits the final subscription - which in this case is to execute an Action
.
The await
-able methods provided by Rx are await
ing the sequence itself - i.e., ForEachAsync
is asynchronous in terms of the sequence (you are asynchronously waiting for the sequence to complete), but the subscription within ForEachAsync
(the action taken for each element) must still be synchronous.
In order to do a sync-to-async transition in your data pipeline, you'll need to have a buffer. An Rx subscription can (synchronously) add to the buffer as a producer while an asynchronous consumer is retrieving items and processing them. So, you'd need a producer/consumer queue that supports both synchronous and asynchronous operations.
The various block types in TPL Dataflow can satisfy this need. Something like this should suffice:
var obs = StreamDataFromSql().Buffer(BatchSize);
var buffer = new ActionBlock<IList<T>>(batch => WriteDataAsync(batch));
using (var subscription = obs.Subscribe(buffer.AsObserver()))
await buffer.Completion;
Note that there is no backpressure; as quickly as StreamDataFromSql
can push data, it'll be buffered and stored in the incoming queue of the ActionBlock
. Depending on the size and type of data, this can quickly use a lot of memory.