¿Cómo enlazar múltiples instancias de subproceso en Python 2.7?

Tengo tres comandos que de otra manera serían fácilmente encadenados juntos en la línea de comandos así:

$ echo foo | firstCommand - | secondCommand - | thirdCommand - > finalOutput 

En otras palabras, firstCommand procesa foo desde la entrada estándar y canaliza el resultado a secondCommand , que a su vez procesa esa entrada y canaliza su salida a thirdCommand , que procesa y redirige su salida al archivo finalOutput .

He estado tratando de recapitular esto en un script de Python, usando subprocesos. Me gustaría usar Python para manipular la salida de firstCommand antes de pasarla a secondCommand , y nuevamente entre thirdCommand y thirdCommand .

Aquí hay un extracto de código que no parece funcionar:

 first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout) first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin)) second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin)) third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin)) first_thread.start() second_thread.start() third_thread.start() first_thread.join() second_thread.join() third_thread.join() first_process.communicate() second_process.communicate() third_process.communicate() # read 1K chunks from standard input def consumeOutputFromStdin(from_stream, to_stream): chunk = from_stream.read(1024) while chunk: to_stream.write(chunk) to_stream.flush() chunk = from_stream.read(1024) def consumeOutputFromFirstCommand(from_stream, to_stream): while True: unprocessed_line = from_stream.readline() if not unprocessed_line: break processed_line = some_python_function_that_processes_line(unprocessed_line) to_stream.write(processed_line) to_stream.flush() def consumeOutputFromSecondCommand(from_stream, to_stream): while True: unprocessed_line = from_stream.readline() if not unprocessed_line: break processed_line = a_different_python_function_that_processes_line(unprocessed_line) to_stream.write(processed_line) to_stream.flush() 

Cuando ejecuto esto, el script cuelga:

 $ echo foo | ./myConversionScript.py ** hangs here... ** 

Si third_thread.join() Ctrl-C para terminar el script, el código se atasca en la línea third_thread.join() :

  Cc Cc Traceback (most recent call last): File "./myConversionScript.py", line 786, in  sys.exit(main(*sys.argv)) File "./myConversionScript.py", line 556, in main third_thread.join() File "/home/foo/proj/tools/lib/python2.7/threading.py", line 949, in join self.__block.wait() File "/home/foo/proj/tools/lib/python2.7/threading.py", line 339, in wait waiter.acquire() KeyboardInterrupt 

Si no uso un third_process y third_thread , en su lugar, solo se pasan los datos de la salida del primer hilo a la entrada del segundo hilo, no hay locking.

Algo sobre el tercer hilo parece hacer que las cosas se rompan, pero no sé por qué.

Pensé que el punto de communicate() es que manejará la E / S para los tres procesos, así que no estoy seguro de por qué hay un locking de E / S.

¿Cómo consigo tres o más comandos / procesos trabajando juntos, donde un subproceso consume la salida de otro subproceso / proceso?

ACTUALIZAR

Bien, hice algunos cambios que parecen ayudar, en base a algunos comentarios aquí y en otros sitios. Los procesos están hechos para wait() para completarse, y dentro de los métodos de subprocesos, close() los conductos una vez que el subproceso ha procesado todos los datos que puede. Mi preocupación es que el uso de memoria será muy alto para grandes conjuntos de datos, pero al menos las cosas están funcionando:

 first_process = subprocess.Popen(['firstCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) second_process = subprocess.Popen(['secondCommand', '-'], stdin=subprocess.PIPE, stdout=subprocess.PIPE) third_process = subprocess.Popen(['thirdCommand', '-'], stdin=subprocess.PIPE, stdout=sys.stdout) first_thread = threading.Thread(target=consumeOutputFromStdin, args=(sys.stdin, first_process.stdin)) second_thread = threading.Thread(target=consumeOutputFromFirstCommand, args=(first_process.stdout, second_process.stdin)) third_thread = threading.Thread(target=consumeOutputFromSecondCommand, args=(second_process.stdout, third_process.stdin)) first_thread.start() second_thread.start() third_thread.start() first_thread.join() second_thread.join() third_thread.join() first_process.wait() second_process.wait() third_process.wait() # read 1K chunks from standard input def consumeOutputFromStdin(from_stream, to_stream): chunk = from_stream.read(1024) while chunk: to_stream.write(chunk) to_stream.flush() chunk = from_stream.read(1024) def consumeOutputFromFirstCommand(from_stream, to_stream): while True: unprocessed_line = from_stream.readline() if not unprocessed_line: from_stream.close() to_stream.close() break processed_line = some_python_function_that_processes_line(unprocessed_line) to_stream.write(processed_line) to_stream.flush() def consumeOutputFromSecondCommand(from_stream, to_stream): while True: unprocessed_line = from_stream.readline() if not unprocessed_line: from_stream.close() to_stream.close() break processed_line = a_different_python_function_that_processes_line(unprocessed_line) to_stream.write(processed_line) to_stream.flush() 

Emular:

 echo foo | firstCommand - | somePythonRoutine - | secondCommand - | anotherPythonRoutine - | thirdCommand - > finalOutput 

Su enfoque actual con hilos funciona:

 from subprocess import Popen, PIPE first = Popen(["firstCommand", "-"], stdin=PIPE, stdout=PIPE, bufsize=1) second = Popen(["secondCommand", "-"], stdin=PIPE, stdout=PIPE, bufsize=1) bind(first.stdout, second.stdin, somePythonRoutine) with open("finalOutput", "wb") as file: third = Popen(["thirdCommand", "-"], stdin=PIPE, stdout=file, bufsize=1) bind(second.stdout, third.stdin, anotherPythonRoutine) # provide input for the pipeline first.stdin.write(b"foo") first.stdin.close() # wait for it to complete pipestatus = [p.wait() for p in [first, second, third]] 

donde cada bind() comienza un nuevo hilo:

 from threading import Thread def bind(input_pipe, output_pipe, line_filter): def f(): try: for line in iter(input_pipe.readline, b''): line = line_filter(line) if line: output_pipe.write(line) # no flush unless newline present finally: try: output_pipe.close() finally: input_pipe.close() t = Thread(target=f) t.daemon = True # die if the program exits t.start() 

y somePythonRoutine , anotherPythonRoutine aceptan una sola línea y la devuelven (posiblemente modificada).

El punto de communicate() es que devuelve la salida del proceso. Esto choca con la configuración de su tubería.

Solo debes llamarlo una vez en el tercer proceso; Todos los demás están conectados a través de tuberías y saben cómo comunicarse entre sí, no es necesaria ninguna otra intervención manual.