How to correctly queue up tasks to run in C#

As I always recommend.. what you need is TPL Dataflow (to install: Install-Package System.Threading.Tasks.Dataflow).

You create an ActionBlock with an action to perform on each item. Set MaxDegreeOfParallelism for throttling. Start posting into it and await its completion:

var block = new ActionBlock<QueryAvailabilityMultidayRequest>(async service => 
{
    var availabilityResponse = await client.QueryAvailability(service);
    // ...
},
new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

foreach (var service in RunData.Demand)
{
    block.Post(service);
}

block.Complete();
await block.Completion;

Old question, but I would like to propose an alternative lightweight solution using the SemaphoreSlim class. Just reference System.Threading.

SemaphoreSlim sem = new SemaphoreSlim(4,4);

foreach (var service in RunData.Demand)
{

    await sem.WaitAsync();
    Task t = Task.Run(async () => 
    {
        var availabilityResponse = await client.QueryAvailability(serviceCopy));    
        // do your other stuff here with the result of QueryAvailability
    }
    t.ContinueWith(sem.Release());
}

The semaphore acts as a locking mechanism. You can only enter the semaphore by calling Wait (WaitAsync) which subtracts one from the count. Calling release adds one to the count.


You're using async HTTP calls, so limiting the number of threads will not help (nor will ParallelOptions.MaxDegreeOfParallelism in Parallel.ForEach as one of the answers suggests). Even a single thread can initiate all requests and process the results as they arrive.

One way to solve it is to use TPL Dataflow.

Another nice solution is to divide the source IEnumerable into partitions and process items in each partition sequentially as described in this blog post:

public static Task ForEachAsync<T>(this IEnumerable<T> source, int dop, Func<T, Task> body)
{
    return Task.WhenAll(
        from partition in Partitioner.Create(source).GetPartitions(dop)
        select Task.Run(async delegate
        {
            using (partition)
                while (partition.MoveNext())
                    await body(partition.Current);
        }));
}