awaitable Task based queue
I don't know of a lock-free solution, but you can take a look at the new Dataflow library, part of the Async CTP. A simple BufferBlock<T>
should suffice, e.g.:
BufferBlock<int> buffer = new BufferBlock<int>();
Production and consumption are most easily done via extension methods on the dataflow block types.
Production is as simple as:
buffer.Post(13);
and consumption is async-ready:
int item = await buffer.ReceiveAsync();
I do recommend you use Dataflow if possible; making such a buffer both efficient and correct is more difficult than it first appears.
Simple approach with C# 8.0 IAsyncEnumerable
and Dataflow library
// Instatiate an async queue
var queue = new AsyncQueue<int>();
// Then, loop through the elements of queue.
// This loop won't stop until it is canceled or broken out of
// (for that, use queue.WithCancellation(..) or break;)
await foreach(int i in queue) {
// Writes a line as soon as some other Task calls queue.Enqueue(..)
Console.WriteLine(i);
}
With an implementation of AsyncQueue
as follows:
public class AsyncQueue<T> : IAsyncEnumerable<T>
{
private readonly SemaphoreSlim _enumerationSemaphore = new SemaphoreSlim(1);
private readonly BufferBlock<T> _bufferBlock = new BufferBlock<T>();
public void Enqueue(T item) =>
_bufferBlock.Post(item);
public async IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken token = default)
{
// We lock this so we only ever enumerate once at a time.
// That way we ensure all items are returned in a continuous
// fashion with no 'holes' in the data when two foreach compete.
await _enumerationSemaphore.WaitAsync();
try {
// Return new elements until cancellationToken is triggered.
while (true) {
// Make sure to throw on cancellation so the Task will transfer into a canceled state
token.ThrowIfCancellationRequested();
yield return await _bufferBlock.ReceiveAsync(token);
}
} finally {
_enumerationSemaphore.Release();
}
}
}