canalice una gran cantidad de datos a stdin mientras usa subprocess.Popen

Estoy luchando para entender cuál es la forma en que Python resuelve este problema simple.

Mi problema es bastante simple. Si usas el siguiente código se colgará. Esto está bien documentado en el módulo de subproceso doc.

import subprocess proc = subprocess.Popen(['cat','-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, ) for i in range(100000): proc.stdin.write('%d\n' % i) output = proc.communicate()[0] print output 

Buscando una solución (hay un hilo muy perspicaz, pero lo he perdido ahora) Encontré esta solución (entre otras) que usa un enlace explícito:

 import os import sys from subprocess import Popen, PIPE def produce(to_sed): for i in range(100000): to_sed.write("%d\n" % i) to_sed.flush() #this would happen implicitly, anyway, but is here for the example to_sed.close() def consume(from_sed): while 1: res = from_sed.readline() if not res: sys.exit(0) #sys.exit(proc.poll()) print 'received: ', [res] def main(): proc = Popen(['cat','-'],stdin=PIPE,stdout=PIPE) to_sed = proc.stdin from_sed = proc.stdout pid = os.fork() if pid == 0 : from_sed.close() produce(to_sed) return else : to_sed.close() consume(from_sed) if __name__ == '__main__': main() 

Si bien esta solución es conceptualmente muy fácil de entender, utiliza un proceso más y está bloqueado como un nivel demasiado bajo en comparación con el módulo de subproceso (que está ahí solo para ocultar este tipo de cosas …).

Me pregunto: ¿existe una solución simple y limpia que use el módulo de subproceso que no se cuelgue o para implementar este patrón tengo que retroceder e implementar un bucle de selección de estilo antiguo o una bifurcación explícita?

Gracias

Si desea una solución Python pura, debe colocar el lector o el escritor en un hilo separado. El paquete de threading es una forma liviana de hacer esto, con un acceso conveniente a objetos comunes y sin bifurcaciones sucias.

 import subprocess import threading import sys proc = subprocess.Popen(['cat','-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, ) def writer(): for i in range(100000): proc.stdin.write('%d\n' % i) proc.stdin.close() thread = threading.Thread(target=writer) thread.start() for line in proc.stdout: sys.stdout.write(line) thread.join() proc.wait() 

Podría ser agradable ver el módulo de subprocess modernizado para soportar flujos y coroutines, lo que permitiría que las tuberías que mezclan piezas de Python y piezas de shell se construyan de forma más elegante.

Si no desea mantener todos los datos en la memoria, debe utilizar seleccionar. Por ejemplo, algo como:

 import subprocess from select import select import os proc = subprocess.Popen(['cat'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) i = 0; while True: rlist, wlist, xlist = [proc.stdout], [], [] if i < 100000: wlist.append(proc.stdin) rlist, wlist, xlist = select(rlist, wlist, xlist) if proc.stdout in rlist: out = os.read(proc.stdout.fileno(), 10) print out, if not out: break if proc.stdin in wlist: proc.stdin.write('%d\n' % i) i += 1 if i >= 100000: proc.stdin.close() 

Aquí hay algo que solía cargar cargas de archivos de volcado de MySQL 6G a través de subproceso. Mantente alejado de la concha = Verdadero. No es seguro y comienza fuera del proceso desperdiciando recursos.

 import subprocess fhandle = None cmd = [mysql_path, "-u", mysql_user, "-p" + mysql_pass], "-h", host, database] fhandle = open(dump_file, 'r') p = subprocess.Popen(cmd, stdin=fhandle, stdout=subprocess.PIPE, stderr=subprocess.PIPE) (stdout,stderr) = p.communicate() fhandle.close() 

Para este tipo de cosas, el shell funciona mucho mejor que el subproceso.

Escribe aplicaciones Python muy simples que leen desde sys.stdin y escribe en sys.stdout .

Conectar las aplicaciones simples juntas mediante una tubería de shell.

Si lo desea, inicie la canalización utilizando un subprocess o simplemente escriba un script de shell de una línea.

 python part1.py | python part2.py 

Esto es muy, muy eficiente. También es portátil para todo Linux (y Windows) siempre que lo mantengas muy simple.

Su código se interbloqueará tan pronto como la memoria intermedia de tuberías del sistema operativo del cat esté llena. Si utiliza stdout=PIPE ; tiene que consumirlo a tiempo, de lo contrario puede ocurrir un punto muerto, como en su caso.

Si no necesita la salida mientras el proceso se está ejecutando; podría redirigirlo a un archivo temporal:

 #!/usr/bin/env python3 import subprocess import tempfile with tempfile.TemporaryFile('r+') as output_file: with subprocess.Popen(['cat'], stdin=subprocess.PIPE, stdout=output_file, universal_newlines=True) as process: for i in range(100000): print(i, file=process.stdin) output_file.seek(0) # rewind (and sync with the disk) print(output_file.readline(), end='') # get the first line of the output 

Si la entrada / salida es pequeña (caben en la memoria); podría pasar la entrada de una vez y obtener la salida de una vez utilizando .communicate() que lee / escribe simultáneamente para usted:

 #!/usr/bin/env python3 import subprocess cp = subprocess.run(['cat'], input='\n'.join(['%d' % i for i in range(100000)]), stdout=subprocess.PIPE, universal_newlines=True) print(cp.stdout.splitlines()[-1]) # print the last line 

Para leer / escribir simultáneamente de forma manual, puede utilizar subprocesos, asyncio, fcntl, etc. @Jed proporcionó una solución simple basada en subprocesos . Aquí está la solución basada en asyncio :

 #!/usr/bin/env python3 import asyncio import sys from subprocess import PIPE async def pump_input(writer): try: for i in range(100000): writer.write(b'%d\n' % i) await writer.drain() finally: writer.close() async def run(): # start child process # NOTE: universal_newlines parameter is not supported process = await asyncio.create_subprocess_exec('cat', stdin=PIPE, stdout=PIPE) asyncio.ensure_future(pump_input(process.stdin)) # write input async for line in process.stdout: # consume output print(int(line)**2) # print squares return await process.wait() # wait for the child process to exit if sys.platform.startswith('win'): loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows asyncio.set_event_loop(loop) else: loop = asyncio.get_event_loop() loop.run_until_complete(run()) loop.close() 

En Unix, podría usar fcntl solución basada en fcntl :

 #!/usr/bin/env python3 import sys from fcntl import fcntl, F_GETFL, F_SETFL from os import O_NONBLOCK from shutil import copyfileobj from subprocess import Popen, PIPE, _PIPE_BUF as PIPE_BUF def make_blocking(pipe, blocking=True): fd = pipe.fileno() if not blocking: fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) | O_NONBLOCK) # set O_NONBLOCK else: fcntl(fd, F_SETFL, fcntl(fd, F_GETFL) & ~O_NONBLOCK) # clear it with Popen(['cat'], stdin=PIPE, stdout=PIPE) as process: make_blocking(process.stdout, blocking=False) with process.stdin: for i in range(100000): #NOTE: the mode is block-buffered (default) and therefore # `cat` won't see it immidiately process.stdin.write(b'%d\n' % i) # a deadblock may happen here with a *blocking* pipe output = process.stdout.read(PIPE_BUF) if output is not None: sys.stdout.buffer.write(output) # read the rest make_blocking(process.stdout) copyfileobj(process.stdout, sys.stdout.buffer) 

Aquí hay un ejemplo (Python 3) de leer un registro a la vez desde gzip usando una tubería:

 cmd = 'gzip -dc compressed_file.gz' pipe = Popen(cmd, stdout=PIPE).stdout for line in pipe: print(":", line.decode(), end="") 

Sé que hay un módulo estándar para eso, es solo un ejemplo. Puede leer toda la salida de una sola vez (como las marcas de la shell) utilizando el método de comunicación, pero obviamente debe tener cuidado con el tamaño de la memoria.

Aquí hay un ejemplo (nuevamente Python 3) de escribir registros al progtwig lp (1) en Linux:

 cmd = 'lp -' proc = Popen(cmd, stdin=PIPE) proc.communicate(some_data.encode()) 

Ahora sé que esto no va a satisfacer completamente al purista en ti, ya que la entrada tendrá que caber en la memoria, y no tienes ninguna opción para trabajar interactivamente con entrada-salida, pero al menos esto funciona bien en tu ejemplo. El método de comunicación opcionalmente toma la entrada como un argumento, y si alimenta su proceso de esta manera, funcionará.

 import subprocess proc = subprocess.Popen(['cat','-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE, ) input = "".join('{0:d}\n'.format(i) for i in range(100000)) output = proc.communicate(input)[0] print output 

En cuanto al problema mayor, puede crear una subclase de Popen, reescribir __init__ para aceptar objetos de tipo stream como argumentos para stdin, stdout, stderr, y reescribir el método _communicate (hairy for crossplatform, necesita hacerlo dos veces, ver el subprocess.py fuente) para llamar a read () en la secuencia estándar y escriba () la salida a las secuencias stdout y stderr. Lo que me molesta de este enfoque es que, por lo que sé, aún no se ha hecho. Cuando no se han hecho cosas obvias antes, generalmente hay una razón (no funciona como se esperaba), pero no puedo ver por qué no debería hacerlo, aparte del hecho de que necesita que las transmisiones sean seguras para subprocesos en Windows. .

Usando los aiofiles y asyncio en python 3.5:

Un poco complicado, ¡pero solo necesitas 1024 Bytes de memoria para escribir en una entrada estándar!

 import asyncio import aiofiles import sys from os.path import dirname, join, abspath import subprocess as sb THIS_DIR = abspath(dirname(__file__)) SAMPLE_FILE = join(THIS_DIR, '../src/hazelnut/tests/stuff/sample.mp4') DEST_PATH = '/home/vahid/Desktop/sample.mp4' async def async_file_reader(f, buffer): async for l in f: if l: buffer.append(l) else: break print('reader done') async def async_file_writer(source_file, target_file): length = 0 while True: input_chunk = await source_file.read(1024) if input_chunk: length += len(input_chunk) target_file.write(input_chunk) await target_file.drain() else: target_file.write_eof() break print('writer done: %s' % length) async def main(): dir_name = dirname(DEST_PATH) remote_cmd = 'ssh localhost mkdir -p %s && cat - > %s' % (dir_name, DEST_PATH) stdout, stderr = [], [] async with aiofiles.open(SAMPLE_FILE, mode='rb') as f: cmd = await asyncio.create_subprocess_shell( remote_cmd, stdin=sb.PIPE, stdout=sb.PIPE, stderr=sb.PIPE, ) await asyncio.gather(*( async_file_reader(cmd.stdout, stdout), async_file_reader(cmd.stderr, stderr), async_file_writer(f, cmd.stdin) )) print('EXIT STATUS: %s' % await cmd.wait()) stdout, stderr = '\n'.join(stdout), '\n'.join(stderr) if stdout: print(stdout) if stderr: print(stderr, file=sys.stderr) if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main()) 

Resultado:

 writer done: 383631 reader done reader done EXIT STATUS: 0 

La solución más simple que se me ocurre:

 from subprocess import Popen, PIPE from threading import Thread s = map(str,xrange(10000)) # a large string p = Popen(['cat'], stdin=PIPE, stdout=PIPE, bufsize=1) Thread(target=lambda: any((p.stdin.write(b) for b in s)) or p.stdin.close()).start() print (p.stdout.read()) 

Versión con buffer:

 from subprocess import Popen, PIPE from threading import Thread s = map(str,xrange(10000)) # a large string n = 1024 # buffer size p = Popen(['cat'], stdin=PIPE, stdout=PIPE, bufsize=n) Thread(target=lambda: any((p.stdin.write(c) for c in (s[i:i+n] for i in xrange(0, len(s), n)))) or p.stdin.close()).start() print (p.stdout.read()) 

Estaba buscando un código de ejemplo para iterar sobre la salida del proceso de forma incremental, ya que este proceso consume su entrada del iterador proporcionado (también de manera incremental). Básicamente:

 import string import random # That's what I consider a very useful function, though didn't # find any existing implementations. def process_line_reader(args, stdin_lines): # args - command to run, same as subprocess.Popen # stdin_lines - iterable with lines to send to process stdin # returns - iterable with lines received from process stdout pass # Returns iterable over n random strings. n is assumed to be infinity if negative. # Just an example of function that returns potentially unlimited number of lines. def random_lines(n, M=8): while 0 != n: yield "".join(random.choice(string.letters) for _ in range(M)) if 0 < n: n -= 1 # That's what I consider to be a very convenient use case for # function proposed above. def print_many_uniq_numbered_random_lines(): i = 0 for line in process_line_reader(["uniq", "-i"], random_lines(100500 * 9000)): # Key idea here is that `process_line_reader` will feed random lines into # `uniq` process stdin as lines are consumed from returned iterable. print "#%i: %s" % (i, line) i += 1 

Algunas de las soluciones sugeridas aquí permiten hacerlo con subprocesos (pero no siempre es conveniente) o con asyncio (que no está disponible en Python 2.x). A continuación se muestra un ejemplo de implementación de trabajo que permite hacerlo.

 import subprocess import os import fcntl import select class nonblocking_io(object): def __init__(self, f): self._fd = -1 if type(f) is int: self._fd = os.dup(f) os.close(f) elif type(f) is file: self._fd = os.dup(f.fileno()) f.close() else: raise TypeError("Only accept file objects or interger file descriptors") flag = fcntl.fcntl(self._fd, fcntl.F_GETFL) fcntl.fcntl(self._fd, fcntl.F_SETFL, flag | os.O_NONBLOCK) def __enter__(self): return self def __exit__(self, type, value, traceback): self.close() return False def fileno(self): return self._fd def close(self): if 0 <= self._fd: os.close(self._fd) self._fd = -1 class nonblocking_line_writer(nonblocking_io): def __init__(self, f, lines, autoclose=True, buffer_size=16*1024, encoding="utf-8", linesep=os.linesep): super(nonblocking_line_writer, self).__init__(f) self._lines = iter(lines) self._lines_ended = False self._autoclose = autoclose self._buffer_size = buffer_size self._buffer_offset = 0 self._buffer = bytearray() self._encoding = encoding self._linesep = bytearray(linesep, encoding) # Returns False when `lines` iterable is exhausted and all pending data is written def continue_writing(self): while True: if self._buffer_offset < len(self._buffer): n = os.write(self._fd, self._buffer[self._buffer_offset:]) self._buffer_offset += n if self._buffer_offset < len(self._buffer): return True if self._lines_ended: if self._autoclose: self.close() return False self._buffer[:] = [] self._buffer_offset = 0 while len(self._buffer) < self._buffer_size: line = next(self._lines, None) if line is None: self._lines_ended = True break self._buffer.extend(bytearray(line, self._encoding)) self._buffer.extend(self._linesep) class nonblocking_line_reader(nonblocking_io): def __init__(self, f, autoclose=True, buffer_size=16*1024, encoding="utf-8"): super(nonblocking_line_reader, self).__init__(f) self._autoclose = autoclose self._buffer_size = buffer_size self._encoding = encoding self._file_ended = False self._line_part = "" # Returns (lines, more) tuple, where lines is iterable with lines read and more will # be set to False after EOF. def continue_reading(self): lines = [] while not self._file_ended: data = os.read(self._fd, self._buffer_size) if 0 == len(data): self._file_ended = True if self._autoclose: self.close() if 0 < len(self._line_part): lines.append(self._line_part.decode(self._encoding)) self._line_part = "" break for line in data.splitlines(True): self._line_part += line if self._line_part.endswith(("\n", "\r")): lines.append(self._line_part.decode(self._encoding).rstrip("\n\r")) self._line_part = "" if len(data) < self._buffer_size: break return (lines, not self._file_ended) class process_line_reader(object): def __init__(self, args, stdin_lines): self._p = subprocess.Popen(args, stdin=subprocess.PIPE, stdout=subprocess.PIPE) self._reader = nonblocking_line_reader(self._p.stdout) self._writer = nonblocking_line_writer(self._p.stdin, stdin_lines) self._iterator = self._communicate() def __iter__(self): return self._iterator def __enter__(self): return self._iterator def __exit__(self, type, value, traceback): self.close() return False def _communicate(self): read_set = [self._reader] write_set = [self._writer] while read_set or write_set: try: rlist, wlist, xlist = select.select(read_set, write_set, []) except select.error, e: if e.args[0] == errno.EINTR: continue raise if self._reader in rlist: stdout_lines, more = self._reader.continue_reading() for line in stdout_lines: yield line if not more: read_set.remove(self._reader) if self._writer in wlist: if not self._writer.continue_writing(): write_set.remove(self._writer) self.close() def lines(self): return self._iterator def close(self): if self._iterator is not None: self._reader.close() self._writer.close() self._p.wait() self._iterator = None