Does Parallel.ForEach limit the number of active threads?
On a single core machine... Parallel.ForEach partitions (chunks) of the collection it's working on between a number of threads, but that number is calculated based on an algorithm that takes into account and appears to continually monitor the work done by the threads it's allocating to the ForEach. So if the body part of the ForEach calls out to long running IO-bound/blocking functions which would leave the thread waiting around, the algorithm will spawn up more threads and repartition the collection between them. If the threads complete quickly and don't block on IO threads for example, such as simply calculating some numbers, the algorithm will ramp up (or indeed down) the number of threads to a point where the algorithm considers optimum for throughput (average completion time of each iteration).
Basically the thread pool behind all the various Parallel library functions, will work out an optimum number of threads to use. The number of physical processor cores forms only part of the equation. There is NOT a simple one to one relationship between the number of cores and the number of threads spawned.
I don't find the documentation around the cancellation and handling of synchronizing threads very helpful. Hopefully MS can supply better examples in MSDN.
Don't forget, the body code must be written to run on multiple threads, along with all the usual thread safety considerations, the framework does not abstract that factor... yet.
No, it won't start 1000 threads - yes, it will limit how many threads are used. Parallel Extensions uses an appropriate number of cores, based on how many you physically have and how many are already busy. It allocates work for each core and then uses a technique called work stealing to let each thread process its own queue efficiently and only need to do any expensive cross-thread access when it really needs to.
Have a look at the PFX Team Blog for loads of information about how it allocates work and all kinds of other topics.
Note that in some cases you can specify the degree of parallelism you want, too.
Great question. In your example, the level of parallelization is pretty low even on a quad core processor, but with some waiting the level of parallelization can get quite high.
// Max concurrency: 5
[Test]
public void Memory_Operations()
{
ConcurrentBag<int> monitor = new ConcurrentBag<int>();
ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
var arrayStrings = new string[1000];
Parallel.ForEach<string>(arrayStrings, someString =>
{
monitor.Add(monitor.Count);
monitor.TryTake(out int result);
monitorOut.Add(result);
});
Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}
Now look what happens when a waiting operation is added to simulate an HTTP request.
// Max concurrency: 34
[Test]
public void Waiting_Operations()
{
ConcurrentBag<int> monitor = new ConcurrentBag<int>();
ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
var arrayStrings = new string[1000];
Parallel.ForEach<string>(arrayStrings, someString =>
{
monitor.Add(monitor.Count);
System.Threading.Thread.Sleep(1000);
monitor.TryTake(out int result);
monitorOut.Add(result);
});
Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}
I haven't made any changes yet and the level of concurrency/parallelization has jumped drammatically. Concurrency can have its limit increased with ParallelOptions.MaxDegreeOfParallelism
.
// Max concurrency: 43
[Test]
public void Test()
{
ConcurrentBag<int> monitor = new ConcurrentBag<int>();
ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
var arrayStrings = new string[1000];
var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
Parallel.ForEach<string>(arrayStrings, options, someString =>
{
monitor.Add(monitor.Count);
System.Threading.Thread.Sleep(1000);
monitor.TryTake(out int result);
monitorOut.Add(result);
});
Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}
// Max concurrency: 391
[Test]
public void Test()
{
ConcurrentBag<int> monitor = new ConcurrentBag<int>();
ConcurrentBag<int> monitorOut = new ConcurrentBag<int>();
var arrayStrings = new string[1000];
var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
Parallel.ForEach<string>(arrayStrings, options, someString =>
{
monitor.Add(monitor.Count);
System.Threading.Thread.Sleep(100000);
monitor.TryTake(out int result);
monitorOut.Add(result);
});
Console.WriteLine("Max concurrency: " + monitorOut.OrderByDescending(x => x).First());
}
I reccommend setting ParallelOptions.MaxDegreeOfParallelism
. It will not necessarily increase the number of threads in use, but it will ensure you only start a sane number of threads, which seems to be your concern.
Lastly to answer your question, no you will not get all threads to start at once. Use Parallel.Invoke if you are looking to invoke in parallel perfectly e.g. testing race conditions.
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623363344
// 636462943623368346
// 636462943623368346
// 636462943623373351
// 636462943623393364
// 636462943623393364
[Test]
public void Test()
{
ConcurrentBag<string> monitor = new ConcurrentBag<string>();
ConcurrentBag<string> monitorOut = new ConcurrentBag<string>();
var arrayStrings = new string[1000];
var options = new ParallelOptions {MaxDegreeOfParallelism = int.MaxValue};
Parallel.ForEach<string>(arrayStrings, options, someString =>
{
monitor.Add(DateTime.UtcNow.Ticks.ToString());
monitor.TryTake(out string result);
monitorOut.Add(result);
});
var startTimes = monitorOut.OrderBy(x => x.ToString()).ToList();
Console.WriteLine(string.Join(Environment.NewLine, startTimes.Take(10)));
}