gRPC keeping response streams open for subscriptions
The problem you're experiencing is due to the fact that MessengerServer.SubscribeForMessages
returns immediately. Once that method returns, the stream is closed.
You'll need an implementation similar to this to keep the stream alive:
public class MessengerService : MessengerServiceBase
{
private static readonly ConcurrentDictionary<User, IServerStreamWriter<Message>> MessageSubscriptions =
new Dictionary<User, IServerStreamWriter<Message>>();
public override async Task SubscribeForMessages(User request, IServerStreamWriter<ReferralAssignment> responseStream, ServerCallContext context)
{
if (!MessageSubscriptions.TryAdd(request))
{
// User is already subscribed
return;
}
// Keep the stream open so we can continue writing new Messages as they are pushed
while (!context.CancellationToken.IsCancellationRequested)
{
// Avoid pegging CPU
await Task.Delay(100);
}
// Cancellation was requested, remove the stream from stream map
MessageSubscriptions.TryRemove(request);
}
}
As far as unsubscribing / cancellation goes, there are two possible approaches:
- The client can hold onto a
CancellationToken
and callCancel()
when it wants to disconnect - The server can hold onto a
CancellationToken
which you would then store along with theIServerStreamWriter
in theMessageSubscriptions
dictionary via aTuple
or similar. Then, you could introduce anUnsubscribe
method on the server which looks up theCancellationToken
byUser
and callsCancel
on it server-side