Módulo de multiprocesamiento de Python: unir procesos con tiempo de espera

Im haciendo una optimización de los parámetros de una simulación compleja. Estoy usando el módulo de multiprocesamiento para mejorar el rendimiento del algoritmo de optimización. Los conceptos básicos del multiprocesamiento que aprendí en http://pymotw.com/2/multiprocessing/basics.html . La simulación compleja dura diferentes tiempos dependiendo de los parámetros dados del algoritmo de optimización, alrededor de 1 a 5 minutos. Si los parámetros se eligen muy mal, la simulación puede durar 30 minutos o más y los resultados no son útiles. Así que estaba pensando en construir un tiempo de espera para el multiprocesamiento, que termine todas las simulaciones que duran más de un tiempo definido. Aquí hay una versión resumida del problema:

import numpy as np import time import multiprocessing def worker(num): time.sleep(np.random.random()*20) def main(): pnum = 10 procs = [] for i in range(pnum): p = multiprocessing.Process(target=worker, args=(i,), name = ('process_' + str(i+1))) procs.append(p) p.start() print 'starting', p.name for p in procs: p.join(5) print 'stopping', p.name if __name__ == "__main__": main() 

La línea p.join(5) define el tiempo de espera de 5 segundos. Debido al for-loop for p in procs: el progtwig espera 5 segundos hasta que finaliza el primer proceso y luego nuevamente 5 segundos hasta que finaliza el segundo proceso y así sucesivamente, pero quiero que el progtwig finalice todos los procesos que duren más de 5 segundos. Además, si ninguno de los procesos dura más de 5 segundos, el progtwig no debe esperar 5 segundos.

Puede hacer esto creando un bucle que esperará un tiempo de espera de unos segundos, verificando con frecuencia si todos los procesos han finalizado. Si no todos terminan en la cantidad de tiempo asignada, finalice todos los procesos:

 TIMEOUT = 5 start = time.time() while time.time() - start <= TIMEOUT: if any(p.is_alive() for p in procs): time.sleep(.1) # Just to avoid hogging the CPU else: # All the processes are done, break now. break else: # We only enter this if we didn't 'break' above. print("timed out, killing all processes") for p in procs: p.terminate() p.join() 

Si desea eliminar todos los procesos, puede usar la agrupación a partir del multiprocesamiento, deberá definir un tiempo de espera general para toda la ejecución, a diferencia de los tiempos de espera individuales.

 import numpy as np import time from multiprocessing import Pool def worker(num): xtime = np.random.random()*20 time.sleep(xtime) return xtime def main(): pnum = 10 pool = Pool() args = range(pnum) pool_result = pool.map_async(worker, args) # wait 5 minutes for every worker to finish pool_result.wait(timeout=300) # once the timeout has finished we can try to get the results if pool_result.ready(): print pool_result.get(timeout=1) if __name__ == "__main__": main() 

Esto le dará una lista con los valores de retorno para todos sus trabajadores en orden.
Más información aquí: https://docs.python.org/2/library/multiprocessing.html#module-multiprocessing.pool

Gracias a la ayuda de dano encontré una solución:

 import numpy as np import time import multiprocessing def worker(num): time.sleep(np.random.random()*20) def main(): pnum = 10 TIMEOUT = 5 procs = [] bool_list = [True]*pnum for i in range(pnum): p = multiprocessing.Process(target=worker, args=(i,), name = ('process_' + str(i+1))) procs.append(p) p.start() print 'starting', p.name start = time.time() while time.time() - start <= TIMEOUT: for i in range(pnum): bool_list[i] = procs[i].is_alive() print bool_list if np.any(bool_list): time.sleep(.1) else: break else: print("timed out, killing all processes") for p in procs: p.terminate() for p in procs: print 'stopping', p.name,'=', p.is_alive() p.join() if __name__ == "__main__": main() 

No es la forma más elegante, estoy seguro de que hay una manera mejor que usar bool_list . Los procesos que aún están vivos después del tiempo de espera de 5 segundos serán eliminados. Si está configurando tiempos más cortos en la función de trabajo que el tiempo de espera, verá que el progtwig se detiene antes de que se scope el tiempo de espera de 5 segundos. Todavía estoy abierto para soluciones más elegantes si hay 🙂