TransformBlock never completes
TransformBlock needs a ITargetBlock where he can post the transformation.
var writeCustomerBlock = new ActionBlock<int>(c => Console.WriteLine(c));
transformBlock.LinkTo(
writeCustomerBlock, new DataflowLinkOptions
{
PropagateCompletion = true
});
After this it completes.
I think I understand it now. An instance of TransformBlock
is not considered "complete" until the following conditions are met:
TransformBlock.Complete()
has been calledInputCount == 0
– the block has applied its transformation to every incoming elementOutputCount == 0
– all transformed elements have left the output buffer
In my program, there is no target block that is linked to the source TransformBlock
, so the source block never gets to flush its output buffer.
As a workaround, I added a second BufferBlock
that is used to store transformed elements.
static void Main(string[] args)
{
var inputBufferBlock = new BufferBlock<int>();
var calculatorBlock = new TransformBlock<int, int>(i =>
{
Console.WriteLine("Calculating {0}²", i);
return (int)Math.Pow(i, 2);
}, new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 8 });
var outputBufferBlock = new BufferBlock<int>();
using (inputBufferBlock.LinkTo(calculatorBlock, new DataflowLinkOptions { PropagateCompletion = true }))
using (calculatorBlock.LinkTo(outputBufferBlock, new DataflowLinkOptions { PropagateCompletion = true }))
{
foreach (var number in Enumerable.Range(1, 1000))
{
inputBufferBlock.Post(number);
}
inputBufferBlock.Complete();
calculatorBlock.Completion.Wait();
IList<int> results;
if (outputBufferBlock.TryReceiveAll(out results))
{
foreach (var result in results)
{
Console.WriteLine("x² = {0}", result);
}
}
}
}
The reason your pipeline hangs is that both BufferBlock
and TransformBlock
evidently don't complete until they emptied themselves of items (I guess that the desired behavior of IPropagatorBlock
s although I haven't found documentation on it).
This can be verified with a more minimal example:
var bufferBlock = new BufferBlock<int>();
bufferBlock.Post(0);
bufferBlock.Complete();
bufferBlock.Completion.Wait();
This blocks indefinitely unless you add bufferBlock.Receive();
before completing.
If you remove the items from your pipeline before blocking by either your TryReceiveAll
code block, connecting another ActionBlock
to the pipeline, converting your TransformBlock
to an ActionBlock
or any other way this will no longer block.
About your specific solution, it seems that you don't need a BufferBlock
or TransformBlock
at all since blocks have an input queue for themselves and you don't use the return value of the TransformBlock
. This could be achieved with just an ActionBlock
:
var block = new ActionBlock<int>(
i =>
{
Console.WriteLine("Calculating {0}²", i);
Console.WriteLine("x² = {0}", (int)Math.Pow(i, 2));
},
new ExecutionDataflowBlockOptions {MaxDegreeOfParallelism = 8});
foreach (var number in Enumerable.Range(1, 1000))
{
block.Post(number);
}
block.Complete();
block.Completion.Wait();