Python dynamic multiprocessing and signalling issues
Since my previous answer was python 3 only, I thought I'd also suggest a more dirty method for fun which should work on both python 2 and python 3. Not Windows though...
multiprocessing
just uses os.fork()
under the covers, so patch it to reset the signal handling in the child:
import os
from signal import SIGINT, SIG_DFL
def patch_fork():
print('Patching fork')
os_fork = os.fork
def my_fork():
print('Fork fork fork')
cpid = os_fork()
if cpid == 0:
# child
signal(SIGINT, SIG_DFL)
return cpid
os.fork = my_fork
You can call that at the start of the run method of your Worker
processes (so that you don't affect the Manager) and so be sure that any children will ignore those signals.
This might seem crazy, but if you're not too concerned about portability it might actually not be a bad idea as it's simple and probably pretty resilient over different python versions.
There is not a clear approach for tackling the issue in the way you want to proceed. I often find myself in situations where I have to run unknown code (represented as Python entry point functions which might get down into some C weirdness) in multiprocessing environments.
This is how I approach the problem.
The main loop
Usually the main loop is pretty simple, it fetches a task from some source (HTTP, Pipe, Rabbit Queue..) and submits it to a Pool of workers. I make sure the KeyboardInterrupt exception is correctly handled to shutdown the service.
try:
while 1:
task = get_next_task()
service.process(task)
except KeyboardInterrupt:
service.wait_for_pending_tasks()
logging.info("Sayonara!")
The workers
The workers are managed by a Pool of workers from either multiprocessing.Pool
or from concurrent.futures.ProcessPoolExecutor
. If I need more advanced features such as timeout support I either use billiard or pebble.
Each worker will ignore SIGINT as recommended here. SIGTERM is left as default.
The service
The service is controlled either by systemd or supervisord. In either cases, I make sure that the termination request is always delivered as a SIGINT (CTL+C).
I want to keep SIGTERM as an emergency shutdown rather than relying only on SIGKILL for that. SIGKILL is not portable and some platforms do not implement it.
"I whish it was that simple"
If things are more complex, I'd consider the use of frameworks such as Luigi or Celery.
In general, reinventing the wheel on such things is quite detrimental and gives little gratifications. Especially if someone else will have to look at that code.
The latter sentence does not apply if your aim is to learn how these things are done of course.
I was able to do this using Python 3 and set_start_method(method)
with the 'forkserver'
flavour. Another way Python 3 > Python 2!
Where by "this" I mean:
- Have a main process with its own signal handler which just joins the children.
- Have some worker processes with a signal handler which may spawn...
- further subprocesses which do not have a signal handler.
The behaviour on Ctrl-C is then:
- manager process waits for workers to exit.
- workers run their signal handlers, (an maybe set a
stop
flag and continue executing to finish their job, although I didn't bother in my example, I just joined the child I knew I had) and then exit. - all children of the workers die immediately.
Of course note that if your intention is for the children of the workers not to crash you will need to install some ignore handler or something for them in your worker process run()
method, or somewhere.
To mercilessly lift from the docs:
When the program starts and selects the forkserver start method, a server process is started. From then on, whenever a new process is needed, the parent process connects to the server and requests that it fork a new process. The fork server process is single threaded so it is safe for it to use os.fork(). No unnecessary resources are inherited.
Available on Unix platforms which support passing file descriptors over Unix pipes.
The idea is therefore that the "server process" inherits the default signal handling behaviour before you install your new ones, so all its children also have default handling.
Code in all its glory:
from multiprocessing import Process, set_start_method
import sys
from signal import signal, SIGINT
from time import sleep
class NormalWorker(Process):
def run(self):
while True:
print('%d %s work' % (self.pid, type(self).__name__))
sleep(1)
class SpawningWorker(Process):
def handle_signal(self, signum, frame):
print('%d %s handling signal %r' % (
self.pid, type(self).__name__, signum))
def run(self):
signal(SIGINT, self.handle_signal)
sub = NormalWorker()
sub.start()
print('%d joining %d' % (self.pid, sub.pid))
sub.join()
print('%d %s joined sub worker' % (self.pid, type(self).__name__))
def main():
set_start_method('forkserver')
processes = [SpawningWorker() for ii in range(5)]
for pp in processes:
pp.start()
def sig_handler(signum, frame):
print('main handling signal %d' % signum)
for pp in processes:
pp.join()
print('main out')
sys.exit()
signal(SIGINT, sig_handler)
while True:
sleep(1.0)
if __name__ == '__main__':
main()
You can store pid of main process (when register signal handler) and use it inside signal handler to route execution flow:
if os.getpid() != main_pid:
sys.exit(128 + signum)