Is this a job for TPL Dataflow?
EDIT: Turns out I was very wrong. TransformBlock
does return items in the same order they came in, even if it is configured for parallelism. Because of that, the code in my original answer is completely useless and normal TransformBlock
can be used instead.
Original answer:
As far as I know only one parallelism construct in .Net supports returning processed items in the order they came in: PLINQ with AsOrdered()
. But it seems to me that PLINQ doesn't fit what you want well.
TPL Dataflow, on the other hand, fits well, I think, but it doesn't have a block that would support parallelism and returning items in order at the same time (TransformBlock
supports both of them, but not at the same time). Fortunately, Dataflow blocks were designed with composability in mind, so we can build our own block that does that.
But first, we have to figure out how to order the results. Using a concurrent dictionary, like you suggested, along with some synchronization mechanism, would certainly work. But I think there is a simpler solution: use a queue of Task
s. In the output task, you dequeue a Task
, wait for it to complete (asynchronously) and when it does, you send its result along. We still need some synchronization for the case when the queue is empty, but we can get that for free if we choose which queue to use cleverly.
So, the general idea is like this: what we're writing will be an IPropagatorBlock
, with some input and some output. The easiest way to create a custom IPropagatorBlock
is to create one block that processes the input, another block that produces the results and treat them as one using DataflowBlock.Encapsulate()
.
The input block will have to process the incoming items in the correct order, so no parallelization there. It will create a new Task
(actually, a TaskCompletionSource
, so that we can set the result of the Task
later), add it to the queue and then send the item for processing, along with some way to set the result of the correct Task
. Because we don't need to link this block to anything, we can use an ActionBlock
.
The output block will have to take Task
s from the queue, asynchronously wait for them, and then send them along. But since all blocks have a queue embedded in them, and blocks that take delegates have asynchronous waiting built-in, this will be very simple: new TransformBlock<Task<TOutput>, TOutput>(t => t)
. This block will work both as the queue and as the output block. Because of this, we don't have to deal with any synchronization.
The last piece of the puzzle is actually processing the items in parallel. For this, we can use another ActionBlock
, this time with MaxDegreeOfParallelism
set. It will take the input, process it, and set the result of the correct Task
in the queue.
Put together, it could look like this:
public static IPropagatorBlock<TInput, TOutput>
CreateConcurrentOrderedTransformBlock<TInput, TOutput>(
Func<TInput, TOutput> transform)
{
var queue = new TransformBlock<Task<TOutput>, TOutput>(t => t);
var processor = new ActionBlock<Tuple<TInput, Action<TOutput>>>(
tuple => tuple.Item2(transform(tuple.Item1)),
new ExecutionDataflowBlockOptions
{
MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
});
var enqueuer = new ActionBlock<TInput>(
async item =>
{
var tcs = new TaskCompletionSource<TOutput>();
await processor.SendAsync(
new Tuple<TInput, Action<TOutput>>(item, tcs.SetResult));
await queue.SendAsync(tcs.Task);
});
enqueuer.Completion.ContinueWith(
_ =>
{
queue.Complete();
processor.Complete();
});
return DataflowBlock.Encapsulate(enqueuer, queue);
}
After so much talk, that's quite a small amount of code, I think.
It seems you care about performance a lot, so you might need to fine tune this code. For example, it might make sense to set MaxDegreeOfParallelism
of the processor
block to something like Environment.ProcessorCount
, to avoid oversubscription. Also, if latency is more important than throughput to you, it might make sense to set MaxMessagesPerTask
of the same block to 1 (or another small number) so that when processing of an item is finished, it's sent to the output immediately.
Also, if you want to throttle incoming items, you could set BoundedCapacity
of enqueuer
.