Exception thrown in multiprocessing Pool not detected
I have a reasonable solution for the problem, at least for debugging purposes. I do not currently have a solution that will raise the exception back in the main processes. My first thought was to use a decorator, but you can only pickle functions defined at the top level of a module, so that's right out.
Instead, a simple wrapping class and a Pool subclass that uses this for apply_async
(and hence apply
). I'll leave map_async
as an exercise for the reader.
import traceback
from multiprocessing.pool import Pool
import multiprocessing
# Shortcut to multiprocessing's logger
def error(msg, *args):
return multiprocessing.get_logger().error(msg, *args)
class LogExceptions(object):
def __init__(self, callable):
self.__callable = callable
def __call__(self, *args, **kwargs):
try:
result = self.__callable(*args, **kwargs)
except Exception as e:
# Here we add some debugging help. If multiprocessing's
# debugging is on, it will arrange to log the traceback
error(traceback.format_exc())
# Re-raise the original exception so the Pool worker can
# clean up
raise
# It was fine, give a normal answer
return result
class LoggingPool(Pool):
def apply_async(self, func, args=(), kwds={}, callback=None):
return Pool.apply_async(self, LogExceptions(func), args, kwds, callback)
def go():
print(1)
raise Exception()
print(2)
multiprocessing.log_to_stderr()
p = LoggingPool(processes=1)
p.apply_async(go)
p.close()
p.join()
This gives me:
1
[ERROR/PoolWorker-1] Traceback (most recent call last):
File "mpdebug.py", line 24, in __call__
result = self.__callable(*args, **kwargs)
File "mpdebug.py", line 44, in go
raise Exception()
Exception
Maybe I'm missing something, but isn't that what the get
method of the Result object returns? See Process Pools.
class multiprocessing.pool.AsyncResult
The class of the result returned by Pool.apply_async() and Pool.map_async().get([timeout])
Return the result when it arrives. If timeout is not None and the result does not arrive within timeout seconds then multiprocessing.TimeoutError is raised. If the remote call raised an exception then that exception will be reraised by get().
So, slightly modifying your example, one can do
from multiprocessing import Pool
def go():
print(1)
raise Exception("foobar")
print(2)
p = Pool()
x = p.apply_async(go)
x.get()
p.close()
p.join()
Which gives as result
1
Traceback (most recent call last):
File "rob.py", line 10, in <module>
x.get()
File "/usr/lib/python2.6/multiprocessing/pool.py", line 422, in get
raise self._value
Exception: foobar
This is not completely satisfactory, since it does not print the traceback, but is better than nothing.
UPDATE: This bug has been fixed in Python 3.4, courtesy of Richard Oudkerk. See the issue get method of multiprocessing.pool.Async should return full traceback.
The solution with the most votes at the time of writing has a problem:
from multiprocessing import Pool
def go():
print(1)
raise Exception("foobar")
print(2)
p = Pool()
x = p.apply_async(go)
x.get() ## waiting here for go() to complete...
p.close()
p.join()
As @dfrankow noted, it will wait on x.get()
, which ruins the point of running a task asynchronously. So, for better efficiency (in particular if your worker function go
takes a long time) I would change it to:
from multiprocessing import Pool
def go(x):
print(1)
# task_that_takes_a_long_time()
raise Exception("Can't go anywhere.")
print(2)
return x**2
p = Pool()
results = []
for x in range(1000):
results.append( p.apply_async(go, [x]) )
p.close()
for r in results:
r.get()
Advantages: the worker function is run asynchronously, so if for example you are running many tasks on several cores, it will be a lot more efficient than the original solution.
Disadvantages: if there is an exception in the worker function, it will only be raised after the pool has completed all the tasks. This may or may not be the desirable behaviour. EDITED according to @colinfang's comment, which fixed this.