Multiprocessing in a pipeline done right
For Idea 1, how about:
import multiprocessing as mp
sentinel=None
def f2(inq,outq):
while True:
val=inq.get()
if val is sentinel:
break
outq.put(val*2)
def f3(outq):
while True:
val=outq.get()
if val is sentinel:
break
print(val)
def f1():
num_workers=2
inq=mp.Queue()
outq=mp.Queue()
for i in range(5):
inq.put(i)
for i in range(num_workers):
inq.put(sentinel)
workers=[mp.Process(target=f2,args=(inq,outq)) for i in range(2)]
printer=mp.Process(target=f3,args=(outq,))
for w in workers:
w.start()
printer.start()
for w in workers:
w.join()
outq.put(sentinel)
printer.join()
if __name__=='__main__':
f1()
The only difference from the description of Idea 1 is that f2
breaks out of the while-loop
when it receives the sentinel (thus terminating itself). f1
blocks until the workers are done (using w.join()
) and then sends f3
the sentinel (signaling that it break out of its while-loop
).
With MPipe module, simply do this:
from mpipe import OrderedStage, Pipeline
def f1(value):
return value * 2
def f2(value):
print(value)
s1 = OrderedStage(f1, size=2)
s2 = OrderedStage(f2)
p = Pipeline(s1.link(s2))
for task in 1, 2, 3, 4, 5, None:
p.put(task)
The above runs 4 processes:
- two for the first stage (function f1)
- one for the second stage (function f2)
- and one more for the main program that feeds the pipeline.
The MPipe cookbook offers some explanation of how processes are shut down internally using None
as the last task.
To run the code, install MPipe:
virtualenv venv
venv/bin/pip install mpipe
venv/bin/python prog.py
Output:
2
4
6
8
10