python concurrent.futures.ProcessPoolExecutor: Performance of .submit() vs .map()
Overview:
There are 2 parts to my answer:
- Part 1 shows how to gain more speed-up from @niemmi's
ProcessPoolExecutor.map()
solution. - Part 2 shows when the
ProcessPoolExecutor
's subclasses.submit()
and.map()
yield non-equivalent compute times.
=======================================================================
Part 1: More Speed-up for ProcessPoolExecutor.map()
Background:
This section builds on @niemmi's .map()
solution, which by itself is excellent. While doing some research on his discretization scheme to better understand how that interact with .map() chunksizes arguement, I found this interesting solution.
I regard @niemmi's definition of chunk = nmax // workers
to be a definition for chunksize, i.e. a smaller size of actual number range (given task) to be tackled by each worker in the worker pool. Now, this definition is premised on the assumption that if a computer has x number of workers, dividing the task equally among each worker will result in optimum use of each worker and hence the total task will be completed fastest. Therefore, the number of chunks to break up a given task into should always equal the number of pool workers. However, is this assumption correct?
Proposition: Here, I propose that the above assumption does not always lead to the fastest compute time when used with ProcessPoolExecutor.map()
. Rather, discretising a task to an amount greater than the number of pool workers can lead to speed-up, i.e. faster completion of a given task.
Experiment: I have modified @niemmi's code to allow the number of discretized tasks to exceed the number of pool workers. This code is given below and used to fin the number of times the number 5 appears in the number range of 0 to 1E8. I have executed this code using 1, 2, 4, and 6 pool workers and for various ratio of number of discretized tasks vs the number of pool workers. For each scenario, 3 runs were made and the compute times were tabulated. "Speed-up" is defined here as the average compute time using equal number of chunks and pool workers over the average compute time of when the number of discretized tasks is greater than the number of pool workers.
Findings:
Figure on left shows the compute time taken by all the scenarios mentioned in the experiment section. It shows that the compute time taken by number of chunks / number of workers = 1 is always greater than the compute time taken by number of chunks > number of workers. That is, the former case is always less efficient than the latter.
Figure on right shows that a speed-up of 1.2 times or more was gained when the number of chunks / number of workers reach a threshold value of 14 or more. It is interesting to observe that the speed-up trend also occurred when
ProcessPoolExecutor.map()
was executed with 1 worker.
Conclusion: When customizing the number of discrete tasks that ProcessPoolExecutor.map()` should use to solve a given task, it is prudent to ensure that this number is greater than the number pool workers as this practice shortens compute time.
concurrent.futures.ProcessPoolExecutor.map() code. (revised parts only)
def _concurrent_map(nmax, number, workers, num_of_chunks):
'''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
find the occurrences of a given number in a number range in a parallelised
manner.'''
# 1. Local variables
start = time()
chunksize = nmax // num_of_chunks
futures = []
found =[]
#2. Parallelization
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
# 2.1. Discretise workload and submit to worker pool
cstart = (chunksize * i for i in range(num_of_chunks))
cstop = (chunksize * i if i != num_of_chunks else nmax
for i in range(1, num_of_chunks + 1))
futures = executor.map(_findmatch, cstart, cstop,
itertools.repeat(number))
# 2.2. Consolidate result as a list and return this list.
for future in futures:
#print('type(future)=',type(future))
for f in future:
if f:
try:
found.append(f)
except:
print_exc()
foundsize = len(found)
end = time() - start
print('\n within statement of def _concurrent(nmax, number):')
print("found {0} in {1:.4f}sec".format(foundsize, end))
return found
if __name__ == '__main__':
nmax = int(1E8) # Number range maximum.
number = str(5) # Number to be found in number range.
workers = 4 # Pool of workers
chunks_vs_workers = 14 # A factor of =>14 can provide optimum performance
num_of_chunks = chunks_vs_workers * workers
start = time()
a = _concurrent_map(nmax, number, workers, num_of_chunks)
end = time() - start
print('\n main')
print('nmax={}, workers={}, num_of_chunks={}'.format(
nmax, workers, num_of_chunks))
print('workers = ', workers)
print("found {0} in {1:.4f}sec".format(len(a),end))
=======================================================================
Part 2: Total compute time from using ProcessPoolExecutor subclasses .submit() and .map() can be dissimilar when returning a sorted/ordered result list.
Background: I have amended both the .submit()
and .map()
codes to allow an "apple-to-apple" comparison of their compute time and the ability to visualize the compute time of the main code, the compute time of the _concurrent method called by the main code to performs the concurrent operations, and the compute time for each discretized task/worker called by the _concurrent method. Furthermore, the concurrent method in these codes was structured to return an unordered and ordered list of the result directly from the future object of .submit()
and the iterator of .map()
. Source code is provided below (Hope it helps you.).
Experiments These two newly improved codes were used to perform the same experiment described in Part 1, save that only 6 pool workers were considered and the python built-in list
and sorted
methods were used to return an unordered and ordered list of the results to the main section of the code, respectively.
Findings:
- From the _concurrent method's result, we can see the compute times of the _concurrent method used to create all the Future objects of
ProcessPoolExecutor.submit()
, and to create the iterator ofProcessPoolExecutor.map()
, as a function of the number of discretized task over the number of pool workers, are equivalent. This result simply means that theProcessPoolExecutor
sub-classes.submit()
and.map()
are equally efficient/fast. - Comparing the compute times from main and it's _concurrent method, we can see that main ran longer than it's _concurrent method. This is to be expected as their time difference reflects the amount of compute times of the
list
andsorted
methods (and that of the other methods encased within these methods). Clearly seen, thelist
method took less compute time to return a result list than thesorted
method. The average compute times of thelist
method for both the .submit() and .map() codes were similar, at ~0.47sec. The average compute time of the sorted method for the .submit() and .map() codes was 1.23sec and 1.01sec, respectively. In other words, thelist
method performed 2.62 times and 2.15 times faster thansorted
method for the .submit() and .map() codes, respectively. - It is not clear why the
sorted
method generated an ordered list from.map()
faster than from.submit()
, as the number of discretized tasks increased more than the number of pool workers, save when the number of discretized tasks equaled the number of pool workers. That said, these findings shows that the decision to use the equally fast.submit()
or.map()
sub-classes can be encumbered by the sorted method. For example, if the intent is to generate an ordered list in the shortest time possible, the use of ProcessPoolExecutor.map() should be preferred overProcessPoolExecutor.submit()
as.map()
can allow the shortest total compute time. - The discretization scheme mentioned in Part 1 of my answer is shown here to speed-up the performance of both the
.submit()
and.map()
sub-classes. The amount of speed-up can be as much as 20% over the case when the number of discretized tasks equaled the number of pool workers.
Improved .map() code
#!/usr/bin/python3.5
# -*- coding: utf-8 -*-
import concurrent.futures as cf
from time import time
from itertools import repeat, chain
def _findmatch(nmin, nmax, number):
'''Function to find the occurence of number in range nmin to nmax and return
the found occurences in a list.'''
start = time()
match=[]
for n in range(nmin, nmax):
if number in str(n):
match.append(n)
end = time() - start
#print("\n def _findmatch {0:<10} {1:<10} {2:<3} found {3:8} in {4:.4f}sec".
# format(nmin, nmax, number, len(match),end))
return match
def _concurrent(nmax, number, workers, num_of_chunks):
'''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
find the occurrences of a given number in a number range in a concurrent
manner.'''
# 1. Local variables
start = time()
chunksize = nmax // num_of_chunks
#2. Parallelization
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
# 2.1. Discretise workload and submit to worker pool
cstart = (chunksize * i for i in range(num_of_chunks))
cstop = (chunksize * i if i != num_of_chunks else nmax
for i in range(1, num_of_chunks + 1))
futures = executor.map(_findmatch, cstart, cstop, repeat(number))
end = time() - start
print('\n within statement of def _concurrent_map(nmax, number, workers, num_of_chunks):')
print("found in {0:.4f}sec".format(end))
return list(chain.from_iterable(futures)) #Return an unordered result list
#return sorted(chain.from_iterable(futures)) #Return an ordered result list
if __name__ == '__main__':
nmax = int(1E8) # Number range maximum.
number = str(5) # Number to be found in number range.
workers = 6 # Pool of workers
chunks_vs_workers = 30 # A factor of =>14 can provide optimum performance
num_of_chunks = chunks_vs_workers * workers
start = time()
found = _concurrent(nmax, number, workers, num_of_chunks)
end = time() - start
print('\n main')
print('nmax={}, workers={}, num_of_chunks={}'.format(
nmax, workers, num_of_chunks))
#print('found = ', found)
print("found {0} in {1:.4f}sec".format(len(found),end))
Improved .submit() code.
This code is same as .map code except you replace the _concurrent method with the following:
def _concurrent(nmax, number, workers, num_of_chunks):
'''Function that utilises concurrent.futures.ProcessPoolExecutor.submit to
find the occurrences of a given number in a number range in a concurrent
manner.'''
# 1. Local variables
start = time()
chunksize = nmax // num_of_chunks
futures = []
#2. Parallelization
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
# 2.1. Discretise workload and submit to worker pool
for i in range(num_of_chunks):
cstart = chunksize * i
cstop = chunksize * (i + 1) if i != num_of_chunks - 1 else nmax
futures.append(executor.submit(_findmatch, cstart, cstop, number))
end = time() - start
print('\n within statement of def _concurrent_submit(nmax, number, workers, num_of_chunks):')
print("found in {0:.4f}sec".format(end))
return list(chain.from_iterable(f.result() for f in cf.as_completed(
futures))) #Return an unordered list
#return list(chain.from_iterable(f.result() for f in cf.as_completed(
# futures))) #Return an ordered list
=======================================================================
You're comparing apples to oranges here. When using map
you produce all the 1E8
numbers and transfer them to worker processes. This takes a lot of time compared to actual execution. When using submit
you just create 6 sets of parameters that get transferred.
If you change map
to operate with the same principle you'll get numbers that are close to each other:
def _findmatch(nmin, nmax, number):
'''Function to find the occurrence of number in range nmin to nmax and return
the found occurrences in a list.'''
print('\n def _findmatch', nmin, nmax, number)
start = time()
match=[]
for n in range(nmin, nmax):
if number in str(n):
match.append(n)
end = time() - start
print("found {0} in {1:.4f}sec".format(len(match),end))
return match
def _concurrent_map(nmax, number, workers):
'''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
find the occurrences of a given number in a number range in a parallelised
manner.'''
# 1. Local variables
start = time()
chunk = nmax // workers
futures = []
found =[]
#2. Parallelization
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
# 2.1. Discretise workload and submit to worker pool
cstart = (chunk * i for i in range(workers))
cstop = (chunk * i if i != workers else nmax for i in range(1, workers + 1))
futures = executor.map(_findmatch, cstart, cstop, itertools.repeat(number))
# 2.3. Consolidate result as a list and return this list.
for future in futures:
for f in future:
try:
found.append(f)
except:
print_exc()
foundsize = len(found)
end = time() - start
print('within statement of def _concurrent(nmax, number):')
print("found {0} in {1:.4f}sec".format(foundsize, end))
return found
You could improve the performance of submit by using as_completed
correctly. For given iterable of futures it will return an iterator that will yield
futures in the order they complete.
You could also skip the copying of the data to another array and use itertools.chain.from_iterable
to combine the results from futures to single iterable:
import concurrent.futures as cf
import itertools
from time import time
from traceback import print_exc
from itertools import chain
def _findmatch(nmin, nmax, number):
'''Function to find the occurrence of number in range nmin to nmax and return
the found occurrences in a list.'''
print('\n def _findmatch', nmin, nmax, number)
start = time()
match=[]
for n in range(nmin, nmax):
if number in str(n):
match.append(n)
end = time() - start
print("found {0} in {1:.4f}sec".format(len(match),end))
return match
def _concurrent_map(nmax, number, workers):
'''Function that utilises concurrent.futures.ProcessPoolExecutor.map to
find the occurrences of a given number in a number range in a parallelised
manner.'''
# 1. Local variables
chunk = nmax // workers
futures = []
found =[]
#2. Parallelization
with cf.ProcessPoolExecutor(max_workers=workers) as executor:
# 2.1. Discretise workload and submit to worker pool
for i in range(workers):
cstart = chunk * i
cstop = chunk * (i + 1) if i != workers - 1 else nmax
futures.append(executor.submit(_findmatch, cstart, cstop, number))
return chain.from_iterable(f.result() for f in cf.as_completed(futures))
if __name__ == '__main__':
nmax = int(1E8) # Number range maximum.
number = str(5) # Number to be found in number range.
workers = 6 # Pool of workers
start = time()
a = _concurrent_map(nmax, number, workers)
end = time() - start
print('\n main')
print('workers = ', workers)
print("found {0} in {1:.4f}sec".format(sum(1 for x in a),end))