How can I abort a task in a multiprocessing.Pool after a timeout?
we can use gevent.Timeout to set time of worker running . gevent tutorial
from multiprocessing.dummy import Pool
#you should install gevent.
from gevent import Timeout
from gevent import monkey
monkey.patch_all()
import time
def worker(sleep_time):
try:
seconds = 5 # max time the worker may run
timeout = Timeout(seconds)
timeout.start()
time.sleep(sleep_time)
print "%s is a early bird"%sleep_time
except:
print "%s is late(time out)"%sleep_time
pool = Pool(4)
pool.map(worker, range(10))
output:
0 is a early bird
1 is a early bird
2 is a early bird
3 is a early bird
4 is a early bird
8 is late(time out)
5 is late(time out)
6 is late(time out)
7 is late(time out)
9 is late(time out)
Here's a way you can do this without needing to change your worker
function. There are two steps required:
- Use the
maxtasksperchild
option you can pass tomultiprocessing.Pool
to ensure the worker processes in the pool are restarted after every task execution. - Wrap your existing worker function in another function, which will call
worker
in a daemon thread, and then wait for a result from that thread fortimeout
seconds. Using a daemon thread is important because processes won't wait for daemon threads to finish before exiting.
If the timeout expires, you exit (or abort - it's up to you) the wrapper function, which will end the task, and because you've set maxtasksperchild=1
, cause the Pool
to terminate the worker process and start a new one. This will mean that the background thread doing your real work also gets aborted, because it's a daemon thread, and the process it's living got shut down.
import multiprocessing
from multiprocessing.dummy import Pool as ThreadPool
from functools import partial
def worker(x, y, z):
pass # Do whatever here
def collectMyResult(result):
print("Got result {}".format(result))
def abortable_worker(func, *args, **kwargs):
timeout = kwargs.get('timeout', None)
p = ThreadPool(1)
res = p.apply_async(func, args=args)
try:
out = res.get(timeout) # Wait timeout seconds for func to complete.
return out
except multiprocessing.TimeoutError:
print("Aborting due to timeout")
raise
if __name__ == "__main__":
pool = multiprocessing.Pool(maxtasksperchild=1)
featureClass = [[1000,k,1] for k in range(start,end,step)] #list of arguments
for f in featureClass:
abortable_func = partial(abortable_worker, worker, timeout=3)
pool.apply_async(abortable_func, args=f,callback=collectMyResult)
pool.close()
pool.join()
Any function that timeouts will raise multiprocessing.TimeoutError
. Note that this means your callback won't execute when a timeout occurs. If this isn't acceptable, just change the except
block of abortable_worker
to return something instead of calling raise
.
Also keep in mind that restarting worker processes after every task execution will have a negative impact on the performance of the Pool
, due to the increased overhead. You should measure that for your use-case and see if the trade-off is worth it to have the ability to abort the work. If it's a problem, you may need to try another approach, like co-operatively interrupting worker
if it has run too long, rather than trying to kill it from the outside. There are many questions on SO that cover this topic.