multiprocessing.Pool generando un nuevo childern después de terminate () en Linux / Python2.7?

Tengo un archivo ejecutable que necesito ejecutar muy a menudo, con diferentes parámetros. Para esto escribí una pequeña envoltura de Python (2.7), usando el módulo de multiprocesamiento, siguiendo el patrón dado aquí .

Mi código se ve así:

try: logging.info("starting pool runs") pool.map(run_nlin, params) pool.close() except KeyboardInterrupt: logging.info("^C pressed") pool.terminate() except Exception, e: logging.info("exception caught: ", e) pool.terminate() finally: time.sleep(5) pool.join() logging.info("done") 

Mi función de trabajador está aquí:

 class KeyboardInterruptError(Exception): pass def run_nlin((path_config, path_log, path_nlin, update_method)): try: with open(path_log, "w") as log_: cmdline = [path_nlin, path_config] if update_method: cmdline += [update_method, ] sp.call(cmdline, stdout=log_, stderr=log_) except KeyboardInterrupt: time.sleep(5) raise KeyboardInterruptError() except: raise 

path_config es la ruta a un archivo de configuración para el progtwig binario; Ahí está, por ejemplo, la fecha para ejecutar el progtwig.

Cuando empiezo la envoltura, todo se ve bien. Sin embargo, cuando numproc ^C , la secuencia de comandos de numproc parece iniciar procesos numproc adicionales de la agrupación antes de terminar. Como ejemplo, cuando comienzo el script para los días 1-10, puedo ver en la salida ps aux que se ejecutan dos instancias del progtwig binario (generalmente para los días 1 y 3). Ahora, cuando presiono ^C , el script de envoltura se cierra, los progtwigs binarios para los días 1 y 3 se han ido, pero hay nuevos progtwigs binarios que se ejecutan para los días 5 y 7.

Entonces, para mí, parece que el Pool lanza otros procesos numproc antes de finalmente morir.

¿Alguna idea de lo que está pasando aquí, y qué puedo hacer al respecto?

En esta página , Jesse Noller, autor del módulo de multiprocesamiento, muestra que la forma correcta de manejar KeyboardInterrupt es que los subprocesos regresen, no que vuelva a subir la excepción. Esto permite que el proceso principal termine el grupo.

Sin embargo, como se muestra en el siguiente código, el proceso principal no llega al bloque except KeyboardInterrupt hasta que se hayan ejecutado todas las tareas generadas por pool.map . Esta es la razón por la que (creo) está viendo llamadas adicionales a su función de trabajador, run_nlin , después de Ctrl-C .

Una posible solución es hacer que todas las funciones de trabajo prueben si se ha establecido un multiprocessing.Event . Si se ha establecido el evento, haga que el trabajador salga temprano, de lo contrario, siga adelante con el cálculo largo.


 import logging import multiprocessing as mp import time logger = mp.log_to_stderr(logging.WARNING) def worker(x): try: if not terminating.is_set(): logger.warn("Running worker({x!r})".format(x = x)) time.sleep(3) else: logger.warn("got the message... we're terminating!") except KeyboardInterrupt: logger.warn("terminating is set") terminating.set() return x def initializer(terminating_): # This places terminating in the global namespace of the worker subprocesses. # This allows the worker function to access `terminating` even though it is # not passed as an argument to the function. global terminating terminating = terminating_ def main(): terminating = mp.Event() result = [] pool = mp.Pool(initializer=initializer, initargs=(terminating, )) params = range(12) try: logger.warn("starting pool runs") result = pool.map(worker, params) pool.close() except KeyboardInterrupt: logger.warn("^C pressed") pool.terminate() finally: pool.join() logger.warn('done: {r}'.format(r = result)) if __name__ == '__main__': main() 

Ejecutando los rendimientos del script:

 % test.py [WARNING/MainProcess] starting pool runs [WARNING/PoolWorker-1] Running worker(0) [WARNING/PoolWorker-2] Running worker(1) [WARNING/PoolWorker-3] Running worker(2) [WARNING/PoolWorker-4] Running worker(3) 

Aquí se presiona Ctrl-C; Cada uno de los trabajadores establece el evento de terminating . Realmente solo necesitamos uno para configurarlo, pero esto funciona a pesar de la pequeña ineficiencia.

  Cc Cc[WARNING/PoolWorker-4] terminating is set [WARNING/PoolWorker-2] terminating is set [WARNING/PoolWorker-3] terminating is set [WARNING/PoolWorker-1] terminating is set 

Ahora se ejecutan todas las demás tareas en cola por pool.map :

 [WARNING/PoolWorker-4] got the message... we're terminating! [WARNING/PoolWorker-2] got the message... we're terminating! [WARNING/PoolWorker-1] got the message... we're terminating! [WARNING/PoolWorker-2] got the message... we're terminating! [WARNING/PoolWorker-4] got the message... we're terminating! [WARNING/PoolWorker-2] got the message... we're terminating! [WARNING/PoolWorker-1] got the message... we're terminating! [WARNING/PoolWorker-3] got the message... we're terminating! 

Finalmente el proceso principal llega al bloque except KeyboardInterrupt .

 [WARNING/MainProcess] ^C pressed [WARNING/MainProcess] done: []