Python: ejecuta subproceso cat en paralelo

Estoy corriendo varios cat | zgrep cat | zgrep comandos cat | zgrep en un servidor remoto y recostackn su salida individualmente para su posterior procesamiento:

 class MainProcessor(mp.Process): def __init__(self, peaks_array): super(MainProcessor, self).__init__() self.peaks_array = peaks_array def run(self): for peak_arr in self.peaks_array: peak_processor = PeakProcessor(peak_arr) peak_processor.start() class PeakProcessor(mp.Process): def __init__(self, peak_arr): super(PeakProcessor, self).__init__() self.peak_arr = peak_arr def run(self): command = 'ssh remote_host cat files_to_process | zgrep --mmap "regex" ' log_lines = (subprocess.check_output(command, shell=True)).split('\n') process_data(log_lines) 

Esto, sin embargo, da como resultado la ejecución secuencial de los comandos del subproceso (‘ssh … cat …’). El segundo pico espera a que el primero termine, y así sucesivamente.

¿Cómo puedo modificar este código para que las llamadas de subproceso se ejecuten en paralelo, al mismo tiempo que puedo recostackr la salida para cada una individualmente?

Otro enfoque (en lugar de la otra sugerencia de poner procesos de shell en segundo plano) es usar subprocesos múltiples.

El método de run que tienes entonces haría algo como esto:

 thread.start_new_thread ( myFuncThatDoesZGrep) 

Para recostackr resultados, puedes hacer algo como esto:

 class MyThread(threading.Thread): def run(self): self.finished = False # Your code to run the command here. blahBlah() # When finished.... self.finished = True self.results = [] 

Ejecute el subproceso como se indica anteriormente en el enlace de subprocesos múltiples. Cuando su objeto de hilo tiene myThread.finished == Verdadero, entonces puede recostackr los resultados a través de myThread.results.

No necesita multiprocessing ni threading para ejecutar subprocesos en paralelo, por ejemplo:

 #!/usr/bin/env python from subprocess import Popen # run commands in parallel processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True) for i in range(5)] # collect statuses exitcodes = [p.wait() for p in processes] 

ejecuta 5 comandos de shell simultáneamente. Nota: aquí no se utilizan subprocesos ni módulo de multiprocessing . No tiene sentido agregar ampersand & a los comandos de shell: Popen no espera a que se complete el comando. Necesitas llamar a .wait() explícitamente.

Es conveniente, pero no es necesario utilizar subprocesos para recostackr resultados de subprocesos:

 #!/usr/bin/env python from multiprocessing.dummy import Pool # thread pool from subprocess import Popen, PIPE, STDOUT # run commands in parallel processes = [Popen("echo {i:d}; sleep 2; echo {i:d}".format(i=i), shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT, close_fds=True) for i in range(5)] # collect output in parallel def get_lines(process): return process.communicate()[0].splitlines() outputs = Pool(len(processes)).map(get_lines, processes) 

Relacionados: ¿ Python subprocesando múltiples subprocesos de bash? .

Este es un ejemplo de código que obtiene resultados de varios subprocesos simultáneamente en el mismo hilo:

 #!/usr/bin/env python3 import asyncio import sys from asyncio.subprocess import PIPE, STDOUT @asyncio.coroutine def get_lines(shell_command): p = yield from asyncio.create_subprocess_shell(shell_command, stdin=PIPE, stdout=PIPE, stderr=STDOUT) return (yield from p.communicate())[0].splitlines() if sys.platform.startswith('win'): loop = asyncio.ProactorEventLoop() # for subprocess' pipes on Windows asyncio.set_event_loop(loop) else: loop = asyncio.get_event_loop() # get commands output in parallel coros = [get_lines('"{e}" -c "print({i:d}); import time; time.sleep({i:d})"' .format(i=i, e=sys.executable)) for i in range(5)] print(loop.run_until_complete(asyncio.gather(*coros))) loop.close()