Python – Unir múltiples hilos con tiempo de espera

Tengo varios subprocesos de proceso en ejecución y me gustaría unirlos a todos con un parámetro de tiempo de espera. Entiendo que si no fuera necesario el tiempo de espera, podría escribir:

for thread in threads: thread.join() 

Una solución que pensé fue usar un hilo maestro que uniera todos los hilos e intentara unirse a ese hilo. Sin embargo, recibí el siguiente error en Python:

 AssertionError: can only join a child process 

El código que tengo está abajo.

 def join_all(threads): for thread in threads: thread.join() if __name__ == '__main__': for thread in threads: thread.start() master = multiprocessing.Process(target=join_all, args=(threads,)) master.start() master.join(timeout=60) 

Podría pasar por cada hilo repetidamente, haciendo verificaciones no bloqueantes para ver si el hilo está listo:

 import time def timed_join_all(threads, timeout): start = cur_time = time.time() while cur_time <= (start + timeout): for thread in threads: if not thread.is_alive(): thread.join() time.sleep(1) cur_time = time.time() if __name__ == '__main__': for thread in threads: thread.start() timed_join_all(threads, 60) 

Esta respuesta se basa inicialmente en eso por dano pero tiene una serie de cambios.

join_all toma una lista de subprocesos y un tiempo de espera (en segundos) e intenta unir todos los subprocesos. Para ello, realiza una llamada no bloqueante a Thread.join (estableciendo el tiempo de espera en 0 , ya que la join sin argumentos nunca expirará ).

Una vez que todos los subprocesos hayan finalizado (al marcar is_alive() en cada uno de ellos), el bucle saldrá prematuramente.

Si algunos subprocesos todavía se están ejecutando en el momento en que se produce el tiempo de espera, la función genera un RuntimeError con información sobre los subprocesos restantes.

 import time def join_all(threads, timeout): """ Args: threads: a list of thread objects to join timeout: the maximum time to wait for the threads to finish Raises: RuntimeError: is not all the threads have finished by the timeout """ start = cur_time = time.time() while cur_time <= (start + timeout): for thread in threads: if thread.is_alive(): thread.join(timeout=0) if all(not t.is_alive() for t in threads): break time.sleep(0.1) cur_time = time.time() else: still_running = [t for t in threads if t.is_alive()] num = len(still_running) names = [t.name for t in still_running] raise RuntimeError('Timeout on {0} threads: {1}'.format(num, names)) if __name__ == '__main__': for thread in threads: thread.start() join_all(threads, 60) 

En mi uso de esto, fue dentro de una serie de pruebas donde los subprocesos fueron versiones desmonizadas de ExcThread, de modo que si los subprocesos nunca terminaran de ejecutarse, no importaría.

El siguiente código se join a cada proceso, esperando un cierto tiempo. Si el proceso vuelve lo suficientemente rápido, el tiempo de espera se reduce y, a continuación, se une el siguiente proceso. Si se produce un tiempo de espera, se muestra un mensaje de error y todo el sistema sale a la persona que llama.

fuente

 import multiprocessing, sys, time # start three procs that run for differing lengths of time procs = [ multiprocessing.Process( target=time.sleep, args=[num], name='%d sec'%num, ) for num in [1,2,5] ] for p in procs: p.start() print p timeleft = 3.0 print 'Join, timeout after {} seconds'.format(timeleft) for p in procs: orig = time.time() print '{}: join, {:.3f} sec left...'.format(p, timeleft) p.join(timeleft) timeleft -= time.time() - orig if timeleft <= 0.: sys.exit('timed out!') 

ejemplo con timeout

 We start three procs: one waits for 1 sec, another for 3 sec, the last for 5 seconds. Then we `join` them, timing out after 3 seconds -- the last proc will be *interrupted*.    Join, timeout after 3.0 seconds : join, 3.000 sec left... : join, 1.982 sec left... : join, 0.965 sec left... timed out! 

Estoy escribiendo esto aquí, solo para asegurarme de que no lo olvide. El principio de la respuesta es el mismo que el de dano. También el fragmento de código es un poco más pythonic:

 threads = [] timeout = ... # create and start the threads for work in ...: thread = threading.Thread(target=worker) thread.daemon = True # without this the thread might outlive its parent thread.start() threads.append(thread) # Wait for workers to finish or for timeout stop_time = time.time() + timeout while any(t.isAlive for t in threads) and (time.time() < stop_time): time.sleep(0.1)