How do I wait for ThreadPoolExecutor.map to finish
The call to ThreadPoolExecutor.map
does not block until all of its tasks are complete. Use wait to do this.
from concurrent.futures import wait, ALL_COMPLETED
...
futures = [pool.submit(fn, args) for args in arg_list]
wait(futures, timeout=whatever, return_when=ALL_COMPLETED) # ALL_COMPLETED is actually the default
do_other_stuff()
You could also call list(results)
on the generator returned by pool.map
to force the evaluation (which is what you're doing in your original example). If you're not actually using the values returned from the tasks, though, wait
is the way to go.
It's true that Executor.map()
will not wait for all futures to finish. Because it returns a lazy iterator like @MisterMiyagi said.
But we can accomplish this by using with
:
import time
from concurrent.futures import ThreadPoolExecutor
def hello(i):
time.sleep(i)
print(i)
with ThreadPoolExecutor(max_workers=2) as executor:
executor.map(hello, [1, 2, 3])
print("finish")
# output
# 1
# 2
# 3
# finish
As you can see, finish
is printed after 1,2,3
. It works because Executor
has a __exit__()
method, code is
def __exit__(self, exc_type, exc_val, exc_tb):
self.shutdown(wait=True)
return False
the shutdown
method of ThreadPoolExecutor
is
def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
self._shutdown = True
if cancel_futures:
# Drain all work items from the queue, and then cancel their
# associated futures.
while True:
try:
work_item = self._work_queue.get_nowait()
except queue.Empty:
break
if work_item is not None:
work_item.future.cancel()
# Send a wake-up to prevent threads calling
# _work_queue.get(block=True) from permanently blocking.
self._work_queue.put(None)
if wait:
for t in self._threads:
t.join()
shutdown.__doc__ = _base.Executor.shutdown.__doc__
So by using with
, we can get the ability to wait until all futures finish.
Executor.map will run jobs in parallel and wait futures to finish, collect results and return a generator. It has done the wait for you. If you set a timeout, it will wait until timeout and throw exception in generator.
map(func, *iterables, timeout=None, chunksize=1)
- the iterables are collected immediately rather than lazily;
- func is executed asynchronously and several calls to func may be made concurrently.
To get a list of futures and do the wait manually, you can use:
myfuturelist = [pool.submit(_exec, x) for x in range(5)]
Executor.submit will return a future object, call result
on future will explicitly wait for it to finish:
myfutrelist[0].result() # wait the 1st future to finish and return the result