How to call TriggerBatch automagically after a timeout if the number of queued items is less than the BatchSize?
Yes, you can accomplish this rather elegantly by chaining together blocks. In this case you want to setup a TransformBlock which you link "before" the BatchBlock. That would look something like this:
Timer triggerBatchTimer = new Timer(() => yourBatchBlock.TriggerBatch());
TransformBlock<T, T> timeoutTransformBlock = new TransformBlock<T, T>((value) =>
{
triggerBatchTimer.Change(5000, Timeout.Infinite);
return value;
});
timeoutTransformBlock.LinkTo(yourBatchBlock);
yourBufferBlock.LinkTo(timeoutTransformBlock);
Here is a policed version of the excellent Drew Marsh's solution. This one uses the DataflowBlock.Encapsulate
method to create a dataflow block that encapsulates the timer+batch functionality. Beyond the new argument timeout
, the CreateBatchBlock
method also supports all options available to the normal BatchBlock
constructor.
public static IPropagatorBlock<T, T[]> CreateBatchBlock<T>(int batchSize,
int timeout, GroupingDataflowBlockOptions dataflowBlockOptions = null)
{
dataflowBlockOptions = dataflowBlockOptions ?? new GroupingDataflowBlockOptions();
var batchBlock = new BatchBlock<T>(batchSize, dataflowBlockOptions);
var timer = new System.Threading.Timer(_ => batchBlock.TriggerBatch());
var transformBlock = new TransformBlock<T, T>((T value) =>
{
timer.Change(timeout, Timeout.Infinite);
return value;
}, new ExecutionDataflowBlockOptions()
{
BoundedCapacity = dataflowBlockOptions.BoundedCapacity,
CancellationToken = dataflowBlockOptions.CancellationToken,
EnsureOrdered = dataflowBlockOptions.EnsureOrdered,
MaxMessagesPerTask = dataflowBlockOptions.MaxMessagesPerTask,
NameFormat = dataflowBlockOptions.NameFormat,
TaskScheduler = dataflowBlockOptions.TaskScheduler
});
transformBlock.LinkTo(batchBlock, new DataflowLinkOptions()
{
PropagateCompletion = true
});
return DataflowBlock.Encapsulate(transformBlock, batchBlock);
}
Thanks to Drew Marsh for the idea of using a TransformBlock which greatly helped me with a recent solution. However, I believe that the timer needs to be reset AFTER the batch block (i.e. after it has either been triggered by the batch size being reached OR the TriggerBatch method being explicitly called within the timer callback). If you reset the timer every time you get a single item then it can potentially keep resetting several times without actually triggering a batch at all (constantly pushing the "dueTime" on the Timer further away).
This would make the code snippet look like the following:
Timer triggerBatchTimer = new Timer(() => yourBatchBlock.TriggerBatch(), null, 5000, Timeout.Infinite);
TransformBlock<T[], T[]> timeoutTransformBlock = new TransformBlock<T[], T[]>((value) =>
{
triggerBatchTimer.Change(5000, Timeout.Infinite);
return value;
});
yourBufferBlock.LinkTo(yourBatchBlock);
yourBatchBlock.LinkTo(timeoutTransformBlock)
timeoutTransformBlock.LinkTo(yourActionBlock);
// Start the producer which is populating the BufferBlock etc.