pipe large amount of data to stdin while using subprocess.Popen
Your code deadlocks as soon as cat
's stdout OS pipe buffer is full. If you use stdout=PIPE
; you have to consume it in time otherwise the deadlock as in your case may happen.
If you don't need the output while the process is running; you could redirect it to a temporary file:
#!/usr/bin/env python3
import subprocess
import tempfile
with tempfile.TemporaryFile('r+') as output_file:
with subprocess.Popen(['cat'],
stdin=subprocess.PIPE,
stdout=output_file,
universal_newlines=True) as process:
for i in range(100000):
print(i, file=process.stdin)
output_file.seek(0) # rewind (and sync with the disk)
print(output_file.readline(), end='') # get the first line of the output
If the input/output are small (fit in memory); you could pass the input all at once and get the output all at once using .communicate()
that reads/writes concurrently for you:
#!/usr/bin/env python3
import subprocess
cp = subprocess.run(['cat'], input='\n'.join(['%d' % i for i in range(100000)]),
stdout=subprocess.PIPE, universal_newlines=True)
print(cp.stdout.splitlines()[-1]) # print the last line
To read/write concurrently manually, you could use threads, asyncio, fcntl, etc. @Jed provided a simple thread-based solution. Here's asyncio
-based solution:
#!/usr/bin/env python3
import asyncio
import sys
from subprocess import PIPE
async def pump_input(writer):
try:
for i in range(100000):
writer.write(b'%d\n' % i)
await writer.drain()
finally:
writer.close()
async def run():
# start child process
# NOTE: universal_newlines parameter is not supported
process = await asyncio.create_subprocess_exec('cat', stdin=PIPE, stdout=PIPE)
asyncio.ensure_future(pump_input(process.stdin)) # write input
async for line in process.stdout: # consume output
print(int(line)**2) # print squares
return await process.wait() # wait for the child process to exit
if sys.platform.startswith('win'):
loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows
asyncio.set_event_loop(loop)
else:
loop = asyncio.get_event_loop()
loop.run_until_complete(run())
loop.close()
On Unix, you could use fcntl
-based solution:
#!/usr/bin/env python3
import sys
from fcntl import fcntl, F_GETFL, F_SETFL
from os import O_NONBLOCK
from shutil import copyfileobj
from subprocess import Popen, PIPE, _PIPE_BUF as PIPE_BUF
def make_blocking(pipe, blocking=True):
fd = pipe.fileno()
if not blocking:
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK) # set O_NONBLOCK
else:
fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) & ~O_NONBLOCK) # clear it
with Popen(['cat'], stdin=PIPE, stdout=PIPE) as process:
make_blocking(process.stdout, blocking=False)
with process.stdin:
for i in range(100000):
#NOTE: the mode is block-buffered (default) and therefore
# `cat` won't see it immidiately
process.stdin.write(b'%d\n' % i)
# a deadblock may happen here with a *blocking* pipe
output = process.stdout.read(PIPE_BUF)
if output is not None:
sys.stdout.buffer.write(output)
# read the rest
make_blocking(process.stdout)
copyfileobj(process.stdout, sys.stdout.buffer)
Here's something I used to load 6G mysql dump file loads via subprocess. Stay away from shell=True. Not secure and start out of process wasting resources.
import subprocess
fhandle = None
cmd = [mysql_path,
"-u", mysql_user, "-p" + mysql_pass],
"-h", host, database]
fhandle = open(dump_file, 'r')
p = subprocess.Popen(cmd, stdin=fhandle, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
(stdout,stderr) = p.communicate()
fhandle.close()
If you want a pure Python solution, you need to put either the reader or the writer in a separate thread. The threading
package is a lightweight way to do this, with convenient access to common objects and no messy forking.
import subprocess
import threading
import sys
proc = subprocess.Popen(['cat','-'],
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
)
def writer():
for i in range(100000):
proc.stdin.write(b'%d\n' % i)
proc.stdin.close()
thread = threading.Thread(target=writer)
thread.start()
for line in proc.stdout:
sys.stdout.write(line.decode())
thread.join()
proc.wait()
It might be neat to see the subprocess
module modernized to support streams and coroutines, which would allow pipelines that mix Python pieces and shell pieces to be constructed more elegantly.
If you don't want to keep all the data in memory, you have to use select. E.g. something like:
import subprocess
from select import select
import os
proc = subprocess.Popen(['cat'], stdin=subprocess.PIPE, stdout=subprocess.PIPE)
i = 0;
while True:
rlist, wlist, xlist = [proc.stdout], [], []
if i < 100000:
wlist.append(proc.stdin)
rlist, wlist, xlist = select(rlist, wlist, xlist)
if proc.stdout in rlist:
out = os.read(proc.stdout.fileno(), 10)
print out,
if not out:
break
if proc.stdin in wlist:
proc.stdin.write('%d\n' % i)
i += 1
if i >= 100000:
proc.stdin.close()