How to do guaranteed message delivery with SignalR?
SignalR doesn't guarantee message delivery. Since SignalR doesn't block when you call client methods, you can invoke client methods very quickly as you've discovered. Unfortunately, the client might not always be ready to receive messages immediately once you send them, so SignalR has to buffer messages.
Generally speaking, SignalR will buffer up to 1000 messages per client. Once the client falls behind by over 1000 messages, it will start missing messages. This DefaultMessageBufferSize of 1000 can be increased, but this will increase SignalR's memory usage and it still won't guarantee message delivery.
http://www.asp.net/signalr/overview/signalr-20/performance-and-scaling/signalr-performance#tuning
If you want to guarantee message delivery, you will have to ACK them yourself. You can, as you suggested, only send a message after the previous message has been acknowledged. You can also ACK multiple messages at a time if waiting for an ACK for each message is too slow.
You'll want to resend messages until you receive an acknowledgement from the other client.
Instead of immediately sending messages, queue them up and have a background thread/timer send the messages.
Here's a performant queue that would work.
public class MessageQueue : IDisposable
{
private readonly ConcurrentQueue<Message> _messages = new ConcurrentQueue<Message>();
public int InQueue => _messages.Count;
public int SendInterval { get; }
private readonly Timer _sendTimer;
private readonly ISendMessage _messageSender;
public MessageQueue(ISendMessage messageSender, uint sendInterval) {
_messageSender = messageSender ?? throw new ArgumentNullException(nameof(messageSender));
SendInterval = (int)sendInterval;
_sendTimer = new Timer(timerTick, this, Timeout.Infinite, Timeout.Infinite);
}
public void Start() {
_sendTimer.Change(SendInterval, Timeout.Infinite);
}
private readonly ConcurrentQueue<Guid> _recentlyReceived = new ConcurrentQueue<Guid>();
public void ResponseReceived(Guid id) {
if (_recentlyReceived.Contains(id)) return; // We've already received a reply for this message
// Store current message locally
var message = _currentSendingMessage;
if (message == null || id != message.MessageId)
throw new InvalidOperationException($"Received response {id}, but that message hasn't been sent.");
// Unset to signify that the message has been successfully sent
_currentSendingMessage = null;
// We keep id's of recently received messages because it's possible to receive a reply
// more than once, since we're sending the message more than once.
_recentlyReceived.Enqueue(id);
if(_recentlyReceived.Count > 100) {
_recentlyReceived.TryDequeue(out var _);
}
}
public void Enqueue(Message m) {
_messages.Enqueue(m);
}
// We may access this variable from multiple threads, but there's no need to lock.
// The worst thing that can happen is we send the message again after we've already
// received a reply.
private Message _currentSendingMessage;
private void timerTick(object state) {
try {
var message = _currentSendingMessage;
// Get next message to send
if (message == null) {
_messages.TryDequeue(out message);
// Store so we don't have to peek the queue and conditionally dequeue
_currentSendingMessage = message;
}
if (message == null) return; // Nothing to send
// Send Message
_messageSender.Send(message);
} finally {
// Only start the timer again if we're done ticking.
try {
_sendTimer.Change(SendInterval, Timeout.Infinite);
} catch (ObjectDisposedException) {
}
}
}
public void Dispose() {
_sendTimer.Dispose();
}
}
public interface ISendMessage
{
void Send(Message message);
}
public class Message
{
public Guid MessageId { get; }
public string MessageData { get; }
public Message(string messageData) {
MessageId = Guid.NewGuid();
MessageData = messageData ?? throw new ArgumentNullException(nameof(messageData));
}
}
Here's some example code using the MessageQueue
public class Program
{
static void Main(string[] args) {
try {
const int TotalMessageCount = 1000;
var messageSender = new SimulatedMessageSender();
using (var messageQueue = new MessageQueue(messageSender, 10)) {
messageSender.Initialize(messageQueue);
for (var i = 0; i < TotalMessageCount; i++) {
messageQueue.Enqueue(new Message(i.ToString()));
}
var startTime = DateTime.Now;
Console.WriteLine("Starting message queue");
messageQueue.Start();
while (messageQueue.InQueue > 0) {
Thread.Yield(); // Want to use Thread.Sleep or Task.Delay in the real world.
}
var endTime = DateTime.Now;
var totalTime = endTime - startTime;
var messagesPerSecond = TotalMessageCount / totalTime.TotalSeconds;
Console.WriteLine($"Messages Per Second: {messagesPerSecond:#.##}");
}
} catch (Exception ex) {
Console.Error.WriteLine($"Unhandled Exception: {ex}");
}
Console.WriteLine();
Console.WriteLine("==== Done ====");
Console.ReadLine();
}
}
public class SimulatedMessageSender : ISendMessage
{
private MessageQueue _queue;
public void Initialize(MessageQueue queue) {
if (_queue != null) throw new InvalidOperationException("Already initialized.");
_queue = queue ?? throw new ArgumentNullException(nameof(queue));
}
private static readonly Random _random = new Random();
public void Send(Message message) {
if (_queue == null) throw new InvalidOperationException("Not initialized");
var chanceOfFailure = _random.Next(0, 20);
// Drop 1 out of 20 messages
// Most connections won't even be this bad.
if (chanceOfFailure != 0) {
_queue.ResponseReceived(message.MessageId);
}
}
}
Extending given answer, I did the following:
I decided to generate UUID for each message on client side, which sends the message, using one of tested UUID generators in JS.
Then, send this UUID alongside with a message. After the other client receives a message along with UUID, he sends confirmation of delivery back to the sender (confirmation contains said UUID).
After sender receives back his generated message UUID, he is sure, that message was successfully processed.
Also, I block sending messages until confirmation is received.