Best way in .NET to manage queue of tasks on a separate (single) thread
To create an asynchronous single degree of parallelism queue of work you can simply create a SemaphoreSlim
, initialized to one, and then have the enqueing method await
on the acquisition of that semaphore before starting the requested work.
public class TaskQueue
{
private SemaphoreSlim semaphore;
public TaskQueue()
{
semaphore = new SemaphoreSlim(1);
}
public async Task<T> Enqueue<T>(Func<Task<T>> taskGenerator)
{
await semaphore.WaitAsync();
try
{
return await taskGenerator();
}
finally
{
semaphore.Release();
}
}
public async Task Enqueue(Func<Task> taskGenerator)
{
await semaphore.WaitAsync();
try
{
await taskGenerator();
}
finally
{
semaphore.Release();
}
}
}
Of course, to have a fixed degree of parallelism other than one simply initialize the semaphore to some other number.
Your best option as I see it is using TPL Dataflow
's ActionBlock
:
var actionBlock = new ActionBlock<string>(address =>
{
if (!IsDuplicate(address))
{
LocateAddress(address);
}
});
actionBlock.Post(context.Request.UserHostAddress);
TPL Dataflow
is robust, thread-safe, async
-ready and very configurable actor-based framework (available as a nuget)
Here's a simple example for a more complicated case. Let's assume you want to:
- Enable concurrency (limited to the available cores).
- Limit the queue size (so you won't run out of memory).
- Have both
LocateAddress
and the queue insertion beasync
. - Cancel everything after an hour.
var actionBlock = new ActionBlock<string>(async address =>
{
if (!IsDuplicate(address))
{
await LocateAddressAsync(address);
}
}, new ExecutionDataflowBlockOptions
{
BoundedCapacity = 10000,
MaxDegreeOfParallelism = Environment.ProcessorCount,
CancellationToken = new CancellationTokenSource(TimeSpan.FromHours(1)).Token
});
await actionBlock.SendAsync(context.Request.UserHostAddress);