Python multiproceso espera hasta que todos los hilos terminaron

Esto puede haber sido solicitado en un contexto similar, pero no pude encontrar una respuesta después de aproximadamente 20 minutos de búsqueda, así que preguntaré.

He escrito un script en Python (digamos: scriptA.py) y un script (digamos scriptB.py)

En scriptB quiero llamar a scriptA varias veces con diferentes argumentos, cada vez que tarda aproximadamente una hora en ejecutarse, (es un gran script, hace muchas cosas … no te preocupes por eso) y quiero poder ejecutar el scriptA con todos los diferentes argumentos simultáneamente, pero debo esperar hasta que TODOS se terminen antes de continuar; mi código:

import subprocess #setup do_setup() #run scriptA subprocess.call(scriptA + argumentsA) subprocess.call(scriptA + argumentsB) subprocess.call(scriptA + argumentsC) #finish do_finish() 

Quiero hacer ejecutar todo el subprocess.call() al mismo tiempo, y luego esperar hasta que todos hayan terminado, ¿cómo debo hacer esto?

Intenté usar hilos como el ejemplo aquí :

 from threading import Thread import subprocess def call_script(args) subprocess.call(args) #run scriptA t1 = Thread(target=call_script, args=(scriptA + argumentsA)) t2 = Thread(target=call_script, args=(scriptA + argumentsB)) t3 = Thread(target=call_script, args=(scriptA + argumentsC)) t1.start() t2.start() t3.start() 

Pero no creo que esto sea correcto.

¿Cómo sé que todos han terminado de ejecutarse antes de ir a mi do_finish() ?

Debe utilizar el método de unión del objeto Thread al final de la secuencia de comandos.

 t1 = Thread(target=call_script, args=(scriptA + argumentsA)) t2 = Thread(target=call_script, args=(scriptA + argumentsB)) t3 = Thread(target=call_script, args=(scriptA + argumentsC)) t1.start() t2.start() t3.start() t1.join() t2.join() t3.join() 

Por lo tanto, el hilo principal esperará hasta que t1 , t2 y t3 terminen la ejecución.

Ponga los hilos en una lista y luego use el método Unirse

  threads = [] t = Thread(...) threads.append(t) ...repeat as often as necessary... # Start all threads for x in threads: x.start() # Wait for all of them to finish for x in threads: x.join() 

Prefiero usar la comprensión de lista basada en una lista de entrada:

 inputs = [scriptA + argumentsA, scriptA + argumentsB, ...] threads = [Thread(target=call_script, args=(i)) for i in inputs] [t.start() for t in threads] [t.join() for t in threads] 

En Python3, desde Python 3.2 hay un nuevo enfoque para alcanzar el mismo resultado, que yo personalmente prefiero a la creación / inicio / unión del hilo tradicional, paquete concurrent.futures : https://docs.python.org/3/library/ concurrent.futures.html

Usando un ThreadPoolExecutor el código sería:

 from concurrent.futures.thread import ThreadPoolExecutor import time def call_script(ordinal, arg): print('Thread', ordinal, 'argument:', arg) time.sleep(2) print('Thread', ordinal, 'Finished') args = ['argumentsA', 'argumentsB', 'argumentsC'] with ThreadPoolExecutor(max_workers=2) as executor: ordinal = 1 for arg in args: executor.submit(call_script, ordinal, arg) ordinal += 1 print('All tasks has been finished') 

La salida del código anterior es algo como:

 Thread 1 argument: argumentsA Thread 2 argument: argumentsB Thread 1 Finished Thread 2 Finished Thread 3 argument: argumentsC Thread 3 Finished All tasks has been finished 

Una de las ventajas es que puede controlar el rendimiento estableciendo el máximo de trabajadores concurrentes.

Puede tener una clase similar a la siguiente, desde la cual puede agregar ‘n’ número de funciones o archivos de consola que desea ejecutar en paralelo, iniciar la ejecución y esperar a que se completen todos los trabajos.

 from multiprocessing import Process class ProcessParallel(object): """ To Process the functions parallely """ def __init__(self, *jobs): """ """ self.jobs = jobs self.processes = [] def fork_processes(self): """ Creates the process objects for given function deligates """ for job in self.jobs: proc = Process(target=job) self.processes.append(proc) def start_all(self): """ Starts the functions process all together. """ for proc in self.processes: proc.start() def join_all(self): """ Waits untill all the functions executed. """ for proc in self.processes: proc.join() def two_sum(a=2, b=2): return a + b def multiply(a=2, b=2): return a * b #How to run: if __name__ == '__main__': #note: two_sum, multiply can be replace with any python console scripts which #you wanted to run parallel.. procs = ProcessParallel(two_sum, multiply) #Add all the process in list procs.fork_processes() #starts process execution procs.start_all() #wait until all the process got executed procs.join_all() 

Tal vez, algo como

 for t in threading.enumerate(): if t.daemon: t.join() 

Acabo de encontrar el mismo problema en el que tenía que esperar a todos los subprocesos que se crearon utilizando el bucle for. Acabo de probar el siguiente fragmento de código. Puede que no sea la solución perfecta, pero pensé que sería una solución simple. Probar:

 for t in threading.enumerate(): try: t.join() except RuntimeError as err: if 'cannot join current thread' in err: continue else: raise 

Desde la documentación del módulo de threading

Hay un objeto “hilo principal”; esto corresponde al hilo de control inicial en el progtwig Python. No es un hilo de demonio.

Existe la posibilidad de que se creen “objetos de hilo ficticios”. Estos son objetos de hilos que corresponden a “hilos extraños”, que son hilos de control que se inician fuera del módulo de subprocesos, como directamente desde el código C. Los objetos de hilos ficticios tienen una funcionalidad limitada; siempre se consideran vivos y demoníacos, y no se pueden join() ed. Nunca se eliminan, ya que es imposible detectar la terminación de hilos ajenos.

Por lo tanto, para detectar esos dos casos cuando no está interesado en mantener una lista de los hilos que crea:

 import threading as thrd def alter_data(data, index): data[index] *= 2 data = [0, 2, 6, 20] for i, value in enumerate(data): thrd.Thread(target=alter_data, args=[data, i]).start() for thread in thrd.enumerate(): if thread.daemon: continue try: thread.join() except RuntimeError as err: if 'cannot join current thread' in err.args[0]: # catchs main thread continue else: raise 

Después de lo cual:

 >>> print(data) [0, 4, 12, 40]