¿Cómo puedo alimentar una entrada estándar de un subproceso desde un iterador de Python?

Estoy tratando de usar el módulo de subprocess en Python para comunicarme con un proceso que lee la entrada estándar y escribe la salida estándar de manera continua. Quiero que el subproceso lea las líneas de un iterador que produce la entrada y luego lea las líneas de salida del subproceso. Puede que no haya una correspondencia de uno a uno entre las líneas de entrada y salida. ¿Cómo puedo alimentar un subproceso desde un iterador arbitrario que devuelve cadenas?

Aquí hay un código de ejemplo que da un caso de prueba simple, y algunos métodos que he intentado que no funcionan por alguna razón u otra:

 #!/usr/bin/python from subprocess import * # A really big iterator input_iterator = ("hello %s\n" % x for x in xrange(100000000)) # I thought that stdin could be any iterable, but it actually wants a # filehandle, so this fails with an error. subproc = Popen("cat", stdin=input_iterator, stdout=PIPE) # This works, but it first sends *all* the input at once, then returns # *all* the output as a string, rather than giving me an iterator over # the output. This uses up all my memory, because the input is several # hundred million lines. subproc = Popen("cat", stdin=PIPE, stdout=PIPE) output, error = subproc.communicate("".join(input_iterator)) output_lines = output.split("\n") 

Entonces, ¿cómo puedo hacer que mi subproceso lea de un iterador línea por línea mientras que leo de su línea estándar por línea?

La forma más fácil parece ser bifurcar y alimentar el identificador de entrada desde el proceso hijo. ¿Alguien puede dar más detalles sobre las posibles desventajas de hacer esto? ¿O hay módulos de python que lo hacen más fácil y seguro?

 #!/usr/bin/python from subprocess import * import os def fork_and_input(input, handle): """Send input to handle in a child process.""" # Make sure input is iterable before forking input = iter(input) if os.fork(): # Parent handle.close() else: # Child try: handle.writelines(input) handle.close() # An IOError here means some *other* part of the program # crashed, so don't complain here. except IOError: pass os._exit() # A really big iterator input_iterator = ("hello %s\n" % x for x in xrange(100000000)) subproc = Popen("cat", stdin=PIPE, stdout=PIPE) fork_and_input(input_iterator, subproc.stdin) for line in subproc.stdout: print line, 

Para alimentar la entrada estándar de un subproceso desde un iterador de Python:

 #!/usr/bin/env python3 from subprocess import Popen, PIPE with Popen("sink", stdin=PIPE, bufsize=-1) as process: for chunk in input_iterator: process.stdin.write(chunk) 

Si desea leer la salida al mismo tiempo, necesita subprocesos o async.io:

 #!/usr/bin/env python3 import asyncio import sys from asyncio.subprocess import PIPE from contextlib import closing async def writelines(writer, lines): # NOTE: can't use writer.writelines(lines) here because it tries to write # all at once with closing(writer): for line in lines: writer.write(line) await writer.drain() async def main(): input_iterator = (b"hello %d\n" % x for x in range(100000000)) process = await asyncio.create_subprocess_exec("cat", stdin=PIPE, stdout=PIPE) asyncio.ensure_future(writelines(process.stdin, input_iterator)) async for line in process.stdout: sys.stdout.buffer.write(line) return await process.wait() if sys.platform == 'win32': loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows asyncio.set_event_loop(loop) else: loop = asyncio.get_event_loop() with closing(loop): sys.exit(loop.run_until_complete(main())) 

Siga esta receta. Es un complemento para el subproceso que admite E / S asíncrona. Sin embargo, esto requiere que su subproceso responda a cada línea de entrada o grupo de líneas con una parte de su salida.