Calling TaskCompletionSource.SetResult in a non blocking manner
I've discovered that TaskCompletionSource.SetResult(); invokes the code awaiting the task before returning. In my case that result in a deadlock.
Yes, I have a blog post documenting this (AFAIK it's not documented on MSDN). The deadlock happens because of two things:
- There's a mixture of
async
and blocking code (i.e., anasync
method is callingWait
). - Task continuations are scheduled using
TaskContinuationOptions.ExecuteSynchronously
.
I recommend starting with the simplest possible solution: removing the first thing (1). I.e., don't mix async
and Wait
calls:
await SendAwaitResponse("first message");
SendAwaitResponse("second message").Wait();
Instead, use await
consistently:
await SendAwaitResponse("first message");
await SendAwaitResponse("second message");
If you need to, you can Wait
at an alternative point further up the call stack (not in an async
method).
That's my most-recommended solution. However, if you want to try removing the second thing (2), you can do a couple of tricks: either wrap the SetResult
in a Task.Run
to force it onto a separate thread (my AsyncEx library has *WithBackgroundContinuations
extension methods that do exactly this), or give your thread an actual context (such as my AsyncContext
type) and specify ConfigureAwait(false)
, which will cause the continuation to ignore the ExecuteSynchronously
flag.
But those solutions are much more complex than just separating the async
and blocking code.
As a side note, take a look at TPL Dataflow; it sounds like you may find it useful.
As your app is a console app, it runs on the default synchronization context, where the await
continuation callback will be called on the same thread the awaiting task has become completed on. If you want to switch threads after await SendAwaitResponse
, you can do so with await Task.Yield()
:
await SendAwaitResponse("first message");
await Task.Yield();
// will be continued on a pool thread
// ...
SendAwaitResponse("second message").Wait(); // so no deadlock
You could further improve this by storing Thread.CurrentThread.ManagedThreadId
inside Task.Result
and comparing it to the current thread's id after the await
. If you're still on the same thread, do await Task.Yield()
.
While I understand that SendAwaitResponse
is a simplified version of your actual code, it's still completely synchronous inside (the way you showed it in your question). Why would you expect any thread switch in there?
Anyway, you probably should redesign your logic the way it doesn't make assumptions about what thread you are currently on. Avoid mixing await
and Task.Wait()
and make all of your code asynchronous. Usually, it's possible to stick with just one Wait()
somewhere on the top level (e.g. inside Main
).
[EDITED] Calling task.SetResult(msg)
from ReceiverRun
actually transfers the control flow to the point where you await
on the task
- without a thread switch, because of the default synchronization context's behavior. So, your code which does the actual message processing is taking over the ReceiverRun
thread. Eventually, SendAwaitResponse("second message").Wait()
is called on the same thread, causing the deadlock.
Below is a console app code, modeled after your sample. It uses await Task.Yield()
inside ProcessAsync
to schedule the continuation on a separate thread, so the control flow returns to ReceiverRun
and there's no deadlock.
using System;
using System.Collections.Concurrent;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication
{
class Program
{
class Worker
{
public struct Response
{
public string message;
public int threadId;
}
CancellationToken _token;
readonly ConcurrentQueue<string> _messages = new ConcurrentQueue<string>();
readonly ConcurrentDictionary<string, TaskCompletionSource<Response>> _requests = new ConcurrentDictionary<string, TaskCompletionSource<Response>>();
public Worker(CancellationToken token)
{
_token = token;
}
string ReadNextMessage()
{
// using Thread.Sleep(100) for test purposes here,
// should be using ManualResetEvent (or similar synchronization primitive),
// depending on how messages arrive
string message;
while (!_messages.TryDequeue(out message))
{
Thread.Sleep(100);
_token.ThrowIfCancellationRequested();
}
return message;
}
public void ReceiverRun()
{
LogThread("Enter ReceiverRun");
while (true)
{
var msg = ReadNextMessage();
LogThread("ReadNextMessage: " + msg);
var tcs = _requests[msg];
tcs.SetResult(new Response { message = msg, threadId = Thread.CurrentThread.ManagedThreadId });
_token.ThrowIfCancellationRequested(); // this is how we terminate the loop
}
}
Task<Response> SendAwaitResponse(string msg)
{
LogThread("SendAwaitResponse: " + msg);
var tcs = new TaskCompletionSource<Response>();
_requests.TryAdd(msg, tcs);
_messages.Enqueue(msg);
return tcs.Task;
}
public async Task ProcessAsync()
{
LogThread("Enter Worker.ProcessAsync");
var task1 = SendAwaitResponse("first message");
await task1;
LogThread("result1: " + task1.Result.message);
// avoid deadlock for task2.Wait() with Task.Yield()
// comment this out and task2.Wait() will dead-lock
if (task1.Result.threadId == Thread.CurrentThread.ManagedThreadId)
await Task.Yield();
var task2 = SendAwaitResponse("second message");
task2.Wait();
LogThread("result2: " + task2.Result.message);
var task3 = SendAwaitResponse("third message");
// still on the same thread as with result 2, no deadlock for task3.Wait()
task3.Wait();
LogThread("result3: " + task3.Result.message);
var task4 = SendAwaitResponse("fourth message");
await task4;
LogThread("result4: " + task4.Result.message);
// avoid deadlock for task5.Wait() with Task.Yield()
// comment this out and task5.Wait() will dead-lock
if (task4.Result.threadId == Thread.CurrentThread.ManagedThreadId)
await Task.Yield();
var task5 = SendAwaitResponse("fifth message");
task5.Wait();
LogThread("result5: " + task5.Result.message);
LogThread("Leave Worker.ProcessAsync");
}
public static void LogThread(string message)
{
Console.WriteLine("{0}, thread: {1}", message, Thread.CurrentThread.ManagedThreadId);
}
}
static void Main(string[] args)
{
Worker.LogThread("Enter Main");
var cts = new CancellationTokenSource(5000); // cancel after 5s
var worker = new Worker(cts.Token);
Task receiver = Task.Run(() => worker.ReceiverRun());
Task main = worker.ProcessAsync();
try
{
Task.WaitAll(main, receiver);
}
catch (Exception e)
{
Console.WriteLine("Exception: " + e.Message);
}
Worker.LogThread("Leave Main");
Console.ReadLine();
}
}
}
This is not much different from doing Task.Run(() => task.SetResult(msg))
inside ReceiverRun
. The only advantage I can think of is that you have an explicit control over when to switch threads. This way, you can stay on the same thread for as long as possible (e.g., for task2
, task3
, task4
, but you still need another thread switch after task4
to avoid a deadlock on task5.Wait()
).
Both solutions would eventually make the thread pool grow, which is bad in terms of performance and scalability.
Now, if we replace task.Wait()
with await task
everywhere inside ProcessAsync
in the above code, we will not have to use await Task.Yield
and there still will be no deadlocks. However, the whole chain of await
calls after the 1st await task1
inside ProcessAsync
will actually be executed on the ReceiverRun
thread. As long as we don't block this thread with other Wait()
-style calls and don't do a lot of CPU-bound work as we're processing messages, this approach might work OK (asynchronous IO-bound await
-style calls still should be OK, and they may actually trigger an implicit thread switch).
That said, I think you'd need a separate thread with a serializing synchronization context installed on it for processing messages (similar to WindowsFormsSynchronizationContext
). That's where your asynchronous code containing awaits
should run. You'd still need to avoid using Task.Wait
on that thread. And if an individual message processing takes a lot of CPU-bound work, you should use Task.Run
for such work. For async IO-bound calls, you could stay on the same thread.
You may want to look at ActionDispatcher
/ActionDispatcherSynchronizationContext
from @StephenCleary's
Nito Asynchronous Library for your asynchronous message processing logic. Hopefully, Stephen jumps in and provides a better answer.