multiprocessing.Pipe is even slower than multiprocessing.Queue?
On my system Pipe(duplex=False)
is slower (twice the time, or half the rate) than Pipe(duplex=True)
. For anyone looking for performance here is a side-by-side comparison:
from time import time
from multiprocessing import Process, Queue, Pipe
n = 1000
buffer = b'\0' * (1000*1000) # 1 megabyte
def print_elapsed(name, start):
elapsed = time() - start
spi = elapsed / n
ips = n / elapsed
print(f'{name}: {spi*1000:.3f} ms/item, {ips:.0f} item/sec')
def producer(q):
start = time()
for i in range(n):
q.put(buffer)
print_elapsed('producer', start)
def consumer(q):
start = time()
for i in range(n):
out = q.get()
print_elapsed('consumer', start)
class PipeQueue():
def __init__(self, **kwargs):
self.out_pipe, self.in_pipe = Pipe(**kwargs)
def put(self, item):
self.in_pipe.send_bytes(item)
def get(self):
return self.out_pipe.recv_bytes()
def close(self):
self.out_pipe.close()
self.in_pipe.close()
print('duplex=True')
q = PipeQueue(duplex=True)
producer_process = Process(target=producer, args=(q,))
consumer_process = Process(target=consumer, args=(q,))
consumer_process.start()
producer_process.start()
consumer_process.join()
producer_process.join()
q.close()
print('duplex=False')
q = PipeQueue(duplex=False)
producer_process = Process(target=producer, args=(q,))
consumer_process = Process(target=consumer, args=(q,))
consumer_process.start()
producer_process.start()
consumer_process.join()
producer_process.join()
q.close()
Results:
duplex=True
consumer: 0.301 ms/item, 3317 item/sec
producer: 0.298 ms/item, 3358 item/sec
duplex=False
consumer: 0.673 ms/item, 1486 item/sec
producer: 0.669 ms/item, 1494 item/sec
I think this must come down to CPython using os.pipe
vs socket.socketpair
, but I'm not sure.
I assume by your print command you are using Python2. However the strange behavior cannot be replicated with Python3, where Pipe
is actually faster than Queue
.
import sys
import time
from multiprocessing import Process, Pipe, Queue
import numpy as np
NUM = 20000
def worker_pipe(conn):
for task_nbr in range(NUM):
conn.send(np.random.rand(40, 40, 3))
sys.exit(1)
def main_pipe():
parent_conn, child_conn = Pipe(duplex=False)
Process(target=worker_pipe, args=(child_conn,)).start()
for num in range(NUM):
message = parent_conn.recv()
def pipe_test():
start_time = time.time()
main_pipe()
end_time = time.time()
duration = end_time - start_time
msg_per_sec = NUM / duration
print("Pipe")
print("Duration: " + str(duration))
print("Messages Per Second: " + str(msg_per_sec))
def worker_queue(q):
for task_nbr in range(NUM):
q.put(np.random.rand(40, 40, 3))
sys.exit(1)
def main_queue():
recv_q = Queue()
Process(target=worker_queue, args=(recv_q,)).start()
for num in range(NUM):
message = recv_q.get()
def queue_test():
start_time = time.time()
main_queue()
end_time = time.time()
duration = end_time - start_time
msg_per_sec = NUM / duration
print("Queue")
print("Duration: " + str(duration))
print("Messages Per Second: " + str(msg_per_sec))
if __name__ == "__main__":
for i in range(2):
queue_test()
pipe_test()
Results in:
Queue
Duration: 3.44321894646
Messages Per Second: 5808.51822408
Pipe
Duration: 2.69065594673
Messages Per Second: 7433.13169575
Queue
Duration: 3.45295906067
Messages Per Second: 5792.13354361
Pipe
Duration: 2.78426194191
Messages Per Second: 7183.23218766
------------------
(program exited with code: 0)
Press return to continue
You can do an experiment and put the following into your Pipe code above..
def worker(conn):
for task_nbr in range(NUM):
data = np.random.rand(400, 400, 3)
sys.exit(1)
def main():
parent_conn, child_conn = Pipe(duplex=False)
p = Process(target=worker, args=(child_conn,))
p.start()
p.join()
This gives you the time that it takes to create the data for your test. On my system this takes about 2.9 seconds.
Under the hood the queue
object implements a buffer and a threaded send. The thread is still in the same process but by using it, the data creation doesn't have to wait for the system IO to complete. It effectively parallelizes the operations. Try your Pipe code modified with some simple threading implemented (disclaimer, code here is for test only and is not production ready)..
import sys
import time
import threading
from multiprocessing import Process, Pipe, Lock
import numpy as np
import copy
NUM = 1000
def worker(conn):
_conn = conn
_buf = []
_wlock = Lock()
_sentinel = object() # signal that we're done
def thread_worker():
while 1:
if _buf:
_wlock.acquire()
obj = _buf.pop(0)
if obj is _sentinel: return
_conn.send(data)
_wlock.release()
t = threading.Thread(target=thread_worker)
t.start()
for task_nbr in range(NUM):
data = np.random.rand(400, 400, 3)
data[0][0][0] = task_nbr # just for integrity check
_wlock.acquire()
_buf.append(data)
_wlock.release()
_wlock.acquire()
_buf.append(_sentinel)
_wlock.release()
t.join()
sys.exit(1)
def main():
parent_conn, child_conn = Pipe(duplex=False)
Process(target=worker, args=(child_conn,)).start()
for num in range(NUM):
message = parent_conn.recv()
assert num == message[0][0][0], 'Data was corrupted'
if __name__ == "__main__":
start_time = time.time()
main()
end_time = time.time()
duration = end_time - start_time
msg_per_sec = NUM / duration
print "Duration: %s" % duration
print "Messages Per Second: %s" % msg_per_sec
On my machine this takes 3.4 seconds to run which is almost exactly the same as your Queue code above.
From https://docs.python.org/2/library/threading.html
In Cython, due to due to the Global Interpreter Lock, only one thread can execute Python code at once... however, threading is still an appropriate model if you want to run multiple I/O-bound tasks simultaneously.
The queue
and pipe
differences are definitely an odd implementation detail until you dig into it a bit.