ejecutando varios comandos del sistema en paralelo

Escribo un script simple que ejecuta un comando del sistema en una secuencia de archivos. Para acelerar las cosas, me gustaría ejecutarlas en paralelo, pero no todas al mismo tiempo, necesito controlar el número máximo de comandos que se ejecutan simultáneamente. ¿Cuál sería la forma más fácil de abordar esto?

Si está llamando a subprocesos de todos modos, no veo la necesidad de usar un grupo de subprocesos. Una implementación básica utilizando el módulo de subprocess sería

 import subprocess import os import time files =  command = "/bin/touch" processes = set() max_processes = 5 for name in files: processes.add(subprocess.Popen([command, name])) if len(processes) >= max_processes: os.wait() processes.difference_update([ p for p in processes if p.poll() is not None]) 

En Windows, os.wait() no está disponible (ni ningún otro método para esperar a que finalice ningún proceso secundario). Puedes solucionar esto encuestando en ciertos intervalos:

 for name in files: processes.add(subprocess.Popen([command, name])) while len(processes) >= max_processes: time.sleep(.1) processes.difference_update([ p for p in processes if p.poll() is not None]) 

El tiempo de inactividad depende del tiempo de ejecución esperado de los subprocesos.

La respuesta de Sven Marnach es casi correcta, pero hay un problema. Si uno de los últimos procesos de max_processes finaliza, el progtwig principal intentará iniciar otro proceso y el ciclo de finalización terminará. Esto cerrará el proceso principal, que a su vez puede cerrar los procesos secundarios. Para mí, este comportamiento ocurrió con el comando de pantalla.

El código en Linux será así (y solo funcionará en python2.7):

 import subprocess import os import time files =  command = "/bin/touch" processes = set() max_processes = 5 for name in files: processes.add(subprocess.Popen([command, name])) if len(processes) >= max_processes: os.wait() processes.difference_update( [p for p in processes if p.poll() is not None]) #Check if all the child processes were closed for p in processes: if p.poll() is None: p.wait() 

Necesitas combinar un objeto Semaphore con hilos . Un semáforo es un objeto que le permite limitar el número de subprocesos que se ejecutan en una sección dada de código. En este caso, usaremos un semáforo para limitar el número de subprocesos que pueden ejecutar la llamada os.system.

Primero importamos los módulos que necesitamos:

 #!/usr/bin/python import threading import os 

A continuación creamos un objeto Semáforo. El número cuatro aquí es el número de hilos que pueden adquirir el semáforo a la vez. Esto limita el número de subprocesos que se pueden ejecutar a la vez.

 semaphore = threading.Semaphore(4) 

Esta función simplemente ajusta la llamada al subproceso en llamadas al semáforo.

 def run_command(cmd): semaphore.acquire() try: os.system(cmd) finally: semaphore.release() 

Si está utilizando Python 2.6+, esto puede volverse aún más simple, ya que puede usar la statement ‘with’ para realizar las llamadas de adquisición y liberación.

 def run_command(cmd): with semaphore: os.system(cmd) 

Finalmente, para mostrar que esto funciona como se espera, llamaremos el comando “dormir 10” ocho veces.

 for i in range(8): threading.Thread(target=run_command, args=("sleep 10", )).start() 

La ejecución de la secuencia de comandos utilizando el progtwig ‘tiempo’ muestra que solo toma 20 segundos, ya que dos lotes de cuatro horas de espera se ejecutan en paralelo.

 aw@aw-laptop:~/personal/stackoverflow$ time python 4992400.py real 0m20.032s user 0m0.020s sys 0m0.008s 

Combiné las soluciones de Sven y Thuener en una que espera los procesos finales y también se detiene si uno de los procesos falla:

 def removeFinishedProcesses(processes): """ given a list of (commandString, process), remove those that have completed and return the result """ newProcs = [] for pollCmd, pollProc in processes: retCode = pollProc.poll() if retCode==None: # still running newProcs.append((pollCmd, pollProc)) elif retCode!=0: # failed raise Exception("Command %s failed" % pollCmd) else: logging.info("Command %s completed successfully" % pollCmd) return newProcs def runCommands(commands, maxCpu): processes = [] for command in commands: logging.info("Starting process %s" % command) proc = subprocess.Popen(shlex.split(command)) procTuple = (command, proc) processes.append(procTuple) while len(processes) >= maxCpu: time.sleep(.2) processes = removeFinishedProcesses(processes) # wait for all processes while len(processes)>0: time.sleep(0.5) processes = removeFinishedProcesses(processes) logging.info("All processes completed") 

Lo que estás pidiendo es un grupo de hilos. Hay un número fijo de subprocesos que se pueden utilizar para ejecutar tareas. Cuando no está ejecutando una tarea, espera en una cola de tareas para obtener una nueva pieza de código para ejecutar.

Existe este módulo de grupo de subprocesos , pero hay un comentario que dice que aún no se considera completo. Puede haber otros paquetes por ahí, pero este fue el primero que encontré.

Si ejecuta los comandos del sistema, puede crear las instancias de proceso con el módulo de subproceso, llámelos como desee. No debería haber ninguna necesidad de subprocesar (su antitónica) y el multiprocesamiento parece un poco excesivo para esta tarea.

Esta respuesta es muy similar a otras respuestas presentes aquí, pero utiliza una lista en lugar de conjuntos. Por alguna razón, al usar esas respuestas, recibía un error de tiempo de ejecución con respecto al tamaño del cambio de conjunto.

 from subprocess import PIPE import subprocess import time def submit_job_max_len(job_list, max_processes): sleep_time = 0.1 processes = list() for command in job_list: print 'running {n} processes. Submitting {proc}.'.format(n=len(processes), proc=str(command)) processes.append(subprocess.Popen(command, shell=False, stdout=None, stdin=PIPE)) while len(processes) >= max_processes: time.sleep(sleep_time) processes = [proc for proc in processes if proc.poll() is None] while len(processes) > 0: time.sleep(sleep_time) processes = [proc for proc in processes if proc.poll() is None] cmd = '/bin/bash run_what.sh {n}' job_list = ((cmd.format(n=i)).split() for i in range(100)) submit_job_max_len(job_list, max_processes=50)