RabbitMQ asynchronous support
there is no async/await support built in to the RabbitMQ .NET client at this point. There is an open ticket for this on the RabbitMQ .NET Client repository
Rabbit supports dispatching to asynchronous message handlers using the AsyncEventingBasicConsumer
class. It works similarly to EventingBasicConsumer
, but allows you to register a callback which returns a Task
. The callback is dispatched to and the returned Task
is awaited by the RabbitMQ client.
var factory = new ConnectionFactory
{
HostName = "localhost",
DispatchConsumersAsync = true
};
using(var connection = cf.CreateConnection())
{
using(var channel = conn.CreateModel())
{
channel.QueueDeclare("testqueue", true, false, false, null);
var consumer = new AsyncEventingBasicConsumer(model);
consumer.Received += async (o, a) =>
{
Console.WriteLine("Message Get" + a.DeliveryTag);
await Task.Yield();
};
}
Console.ReadLine();
}
There is AsyncEventingBasicConsumer
and all that it does, is await
ing your async "event handlers" when message is received. That's the only thing that is made asynchronous here. Typically you don't get any profits from this, because you have only one "handler". Messages are still processed one-by-one. They are processed synchronously! Also you lose control of exception handling because awaiting is done inside Consumer.
Let me guess that by asynchronous message processing you mean some degree of parallelism.
What I ended up using is ActionBlock
from TPL Dataflow. ActionBlock
runs as much tasks as you configured it to, managing awaits and parellelism. Since it operates on Tasks, not Threads, it can manage with less resources, as long as they are truly asynchronous.
- Regular
EventingBasicConsumer
callsactionBlock.Post(something)
. - For parallel processing you need to tell RMQ to send you N messages before you
ack
them:model.BasicQos(0, N, true);
- ActionBlock has options with
MaxDegreeOfParallelism
property which also needs to be set to N. - ActionBlock runs
async Task
s which receive data posted earlier by Consumer. Tasks should not throw because ActionBlock stops all processing on exceptions. - Be careful to pass
CancellationToken
around and correctly wait for ActionBlock to finish all running Tasks:actionBlock.Complete(); await actionBlock.Completion;
To summarize current async
/TPL
support:
- As @paul-turner mentioned, there is now an
AsyncEventingBasicConsumer
which you can register events for and return aTask
. - There is also an
AsyncDefaultBasicConsumer
for which you can override virtual methods such asHandleBasicDeliver
and return aTask
. Original PR here (looks like it was also introduced in 5.0?) - Per the final comments on the above PR and this issue, it looks like they are working on a new, from-scratch .NET client which would more fully support
async
operations, but I don't see any specific links to that effort.