How can I queue a task to Celery from C#?
I don't know whether the question is still relevant, but hopefully the answer will help others.
Here is how I succeeded in queening a task to Celery example worker.
You'll need to establish connection between your producer(client) to RabbitMQ as described here.
ConnectionFactory factory = new ConnectionFactory(); factory.UserName = username; factory.Password = password; factory.VirtualHost = virtualhost; factory.HostName = hostname; factory.Port = port; IConnection connection = factory.CreateConnection(); IModel channel = connection.CreateModel();
In default RabbitMQ configuration there is only Guest user which can only be used for local connections (from 127.0.0.1). An answer to this question explains how to define users in RabbitMQ.
Next - creating a callback to get results. This example is using Direct reply-to, so an answer listener will look like:
var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => { var ansBody = ea.Body; var ansMessage = Encoding.UTF8.GetString(ansBody); Console.WriteLine(" [x] Received {0}", ansMessage); Console.WriteLine(" [x] Done"); }; channel.BasicConsume(queue: "amq.rabbitmq.reply-to", noAck: true, consumer: consumer);
Creating a task message that Celery will consume:
IDictionary<string, object> headers = new Dictionary<string, object>(); headers.Add("task", "tasks.add"); Guid id = Guid.NewGuid(); headers.Add("id", id.ToString()); IBasicProperties props = channel.CreateBasicProperties(); props.Headers = headers; props.CorrelationId = (string)headers["id"]; props.ContentEncoding = "utf-8"; props.ContentType = "application/json"; props.ReplyTo = "amq.rabbitmq.reply-to"; object[] taskArgs = new object[] { 1, 200 }; object[] arguments = new object[] { taskArgs, new object(), new object()}; MemoryStream stream = new MemoryStream(); DataContractJsonSerializer ser = new DataContractJsonSerializer(typeof(object[])); ser.WriteObject(stream, arguments); stream.Position = 0; StreamReader sr = new StreamReader(stream); string message = sr.ReadToEnd(); var body = Encoding.UTF8.GetBytes(message);
And finally, publishing the message to RabbitMQ:
channel.BasicPublish(exchange: "", routingKey: "celery", basicProperties: props, body: body);
Celery comes with Flower. Flower provides a REST API to managing tasks. https://flower.readthedocs.io/en/latest/api.html#post--api-task-async-apply-(.+) In most cases this will be much simpler and robust to use than creating tasks manually and inserting them on the MQ.