How do I prevent "maxing out" of CPU: Synchronous method calling multiple workers asynchronously & throttling using SemaphoreSlim?
You didn't explain how you wanted to limit the concurrent calls. Do you want 30 concurrent worker tasks running, or do you want 30 WCF calls, each of which have all their worker tasks running concurrently, or do you want concurrent WCF calls to each have their own limit of concurrent worker tasks? Given you said that each WCF call has only 4 worker tasks and looking at your sample code, I assume you want a global limit of 30 concurrent worker tasks.
Firstly, as @mjwills implied, you need to use the SemaphoreSlim to limit calls to workerService.DoWorkAsync()
. Your code currently starts all of them, and only tried to throttle how many you'll wait to finish. I assume this is why you max out CPU. The number of worker tasks started remains unbounded. Note however you'll also need to await the worker task while you hold the semaphore, otherwise you'll only throttle how fast you create tasks, not how many run concurrently.
Secondly, you're creating a new SemaphoreSlim for each WCF request. Hence my question from my first paragraph. The only way this will throttle anything is if you have more worker services than the initial count, which in your sample is 30, but you said there are only 4 workers. To have a "global" limit, you need to use a singleton SemaphoreSlim.
Thridly, you never call .Release()
on the SemaphoreSlim, so if you did make it a singleton, your code will hang once it's started 30 workers since the process started. Make sure to do it in a try-finally block, so that if the worker crashes, it still gets released.
Here's some hastily written sample code:
public async Task ProcessAllPendingWork()
{
var workerTasks = new List<Task<bool>>();
foreach(var workerService in _workerServices)
{
var workerTask = RunWorker(workerService);
workerTasks.Add(workerTask);
}
await Task.WhenAll(workerTasks);
}
private async Task<bool> RunWorker(Func<bool> workerService)
{
// use singleton semaphore.
await _semaphore.WaitAsync();
try
{
return await workerService.DoWorkAsync();
}
catch (System.Exception)
{
//assume error is a predefined logging service
Log.Error(ex);
return false; // ??
}
finally
{
_semaphore.Release();
}
}
The Task abstraction provided by TPL (Task parallel library) is an abstraction of Thread; tasks are enqueued in a thread pool and then executed when an execututor can manage that request.
In other word, depending on some factors (your traffic, CPU vs IO buound and deploy model) trying to execute a managed Task in your worker function may cause no benefit at all (or in some cases be slower).
Saying that, I suggest you to use Task.WaitAll (available from .NET 4.0) that uses very hight level abstractions to manage concurrency; in particular this piece of code could be useful for you:
- it create workers and wait for all
- it takes 10 seconds to execute (the longest Worker)
- it catch and give to you the opportunity to manage exceptions
- [last but not least] is a declerative api that focus your attention on what to do and not how to do.
public class Q57572902
{
public void ProcessAllPendingWork()
{
var workers = new Action[] {Worker1, Worker2, Worker3};
try
{
Task.WaitAll(workers.Select(Task.Factory.StartNew).ToArray());
// ok
}
catch (AggregateException exceptions)
{
foreach (var ex in exceptions.InnerExceptions)
{
Log.Error(ex);
}
// ko
}
}
public void Worker1() => Thread.Sleep(FromSeconds(5)); // do something
public void Worker2() => Thread.Sleep(FromSeconds(10)); // do something
public void Worker3() => throw new NotImplementedException("error to manage"); // something wrong
}
I have seen from comments that you requires a maximum of 3 worker running in the same time; in this case you can simply copy-paste a LimitedConcurrencyLevelTaskScheduler
from TaskScheduler documentation.
After that you have to create sigleton instance TaskScheduler
with its onw TaskFactory
like that:
public static class WorkerScheduler
{
public static readonly TaskFactory Factory;
static WorkerScheduler()
{
var scheduler = new LimitedConcurrencyLevelTaskScheduler(3);
Factory = new TaskFactory(scheduler);
}
}
Previous ProcessAllPendingWork()
code remains the same except for
...workers.Select(Task.Factory.StartNew)...
that becomes
...workers.Select(WorkerScheduler.Factory.StartNew)...
because you have to use the TaskFactory
associated to your custom WorkerScheduler
.
If your worker needs to return some data to response, errors and data needs to be managed in a different manner as follows:
public void ProcessAllPendingWork()
{
var workers = new Func<bool>[] {Worker1, Worker2, Worker3};
var tasks = workers.Select(WorkerScheduler.Factory.StartNew).ToArray();
bool[] results = null;
Task
.WhenAll(tasks)
.ContinueWith(x =>
{
if (x.Status == TaskStatus.Faulted)
{
foreach (var exception in x.Exception.InnerExceptions)
Log(exception);
return;
}
results = x.Result; // save data in outer scope
})
.Wait();
// continue execution
// results is now filled: if results is null, some errors occured
}
Unless I miss something - your sample code runs ALL workers in parallel. By the time of calling 'workerService.DoWorkAsync()' the worker starts off it's job. 'RunWorkerTasks' only waits for the worker Task to complete. 'DoWorkAsync()' kicks off the async operation while 'await' pauses the calling method from execution until the awaited Task completes.
The fact of high CPU usage is most likely due to your workerService's activity and not due to the way you call them. In order to verify that, try replacing workerService.DoWorkAsync()
with Thread.Sleep(..)
or Task.Delay(..)
. If your CPU usage drops, it is the workers to blame. (Depending on what workerService does) it might be ok or even expected that the CPU consumption increases once you run them in parallel.
Comming to your question of how to limit parallel execution. Note, that the following sample does not exactly use 3 threads, but at maximum 3 threads.
Parallel.ForEach(
_workerServices,
new ParallelOptions { MaxDegreeOfParallelism = 3 },
workerService => workerService.DoWorkAsync()
.ContinueWith(res =>
{
// Handle your result or possible exceptions by consulting res.
})
.Wait());
As you mentioned that previously your code was executing sequentially, I assume that the workers also have a non-async equivalent. It is probably easier to use those. For calling an async method synchronously is mostly a hassle. I've even had deadlock scenarios just by calling DoWorkAsync().Wait()
. There has been much discussion of How would I run an async Task<T> method synchronously?. In essence I try to avoid it. If that is not possible, I attempt to use ContinueWith
which increases the complexity, or AsyncHelper
of the previous SO-discussion.
var results = new ConcurrentDictionary<WorkerService, bool>();
Parallel.ForEach(
_workerServices,
new ParallelOptions { MaxDegreeOfParallelism = 3 },
workerService =>
{
// Handle possible exceptions via try-catch.
results.TryAdd(workerService, workerService.DoWork());
});
// evaluate results
Parallel.ForEach
takes advantage of a Thread- or TaskPool. Meaning it dispatches every execution of the given parameter Action<TSource> body
onto a dedicated thread. You can easily verify that with the following code. If Parallel.ForEach
already dispatches the work on different Threads you can simply execute your 'expensive' operation synchronously. Any async operations would be unnecessary or even have bad impact on runtime performance.
Parallel.ForEach(
Enumerable.Range(1, 4),
m => Console.WriteLine(Thread.CurrentThread.ManagedThreadId));
This is the demo project I used for testing which does not rely on your workerService.
private static bool DoWork()
{
Thread.Sleep(5000);
Console.WriteLine($"done by {Thread.CurrentThread.ManagedThreadId}.");
return DateTime.Now.Millisecond % 2 == 0;
}
private static Task<bool> DoWorkAsync() => Task.Run(DoWork);
private static void Main(string[] args)
{
var sw = new Stopwatch();
sw.Start();
// define a thread-safe dict to store the results of the async operation
var results = new ConcurrentDictionary<int, bool>();
Parallel.ForEach(
Enumerable.Range(1, 4), // this replaces the list of workers
new ParallelOptions { MaxDegreeOfParallelism = 3 },
// m => results.TryAdd(m, DoWork()), // this is the alternative synchronous call
m => DoWorkAsync().ContinueWith(res => results.TryAdd(m, res.Result)).Wait());
sw.Stop();
// print results
foreach (var item in results)
{
Console.WriteLine($"{item.Key}={item.Value}");
}
Console.WriteLine(sw.Elapsed.ToString());
Console.ReadLine();
}