Recuperar el código de salida de los procesos iniciados con multiprocessing.Pool.map

Estoy usando el módulo multiprocessing Python para paralelizar algunas tareas pesadas de computación. La opción obvia es usar un grupo de trabajadores y luego usar el método de map .

Sin embargo, los procesos pueden fallar. Por ejemplo, pueden ser asesinados silenciosamente, por ejemplo, por el oom-killer . Por lo tanto, me gustaría poder recuperar el código de salida de los procesos iniciados con map .

Además, para fines de registro, me gustaría poder conocer el PID del proceso iniciado para ejecutar cada valor en el iterable.

Si está utilizando multiprocessing.Pool.map , generalmente no está interesado en el código de salida de los subprocesos del grupo, le interesa saber qué valor devolvieron de su elemento de trabajo. Esto se debe a que, en condiciones normales, los procesos en un grupo no se close hasta que usted close / se join al grupo, por lo que no hay códigos de salida que recuperar hasta que todo el trabajo esté completo y el grupo esté a punto de ser destruido. Debido a esto, no hay una API pública para obtener los códigos de salida de esos subprocesos.

Ahora, está preocupado por las condiciones excepcionales, donde algo fuera de banda mata a uno de los subprocesos mientras se realiza el trabajo. Si te encuentras con un problema como este, es probable que te encuentres con un comportamiento extraño. De hecho, en mis pruebas en las que eliminé un proceso en un grupo mientras estaba trabajando como parte de una llamada de map , el map nunca se completó, porque el proceso eliminado no se completó. Sin embargo, Python lanzó de inmediato un nuevo proceso para reemplazar el que maté.

Dicho esto, puede obtener el pid de cada proceso en su grupo accediendo a los procesos de multiprocessing.Process dentro del grupo directamente, usando el atributo privado _pool :

 pool = multiprocessing.Pool() for proc in pool._pool: print proc.pid 

Por lo tanto, una cosa que podría hacer para tratar de detectar cuándo un proceso murió inesperadamente (suponiendo que no se quede atascado en una llamada de locking como resultado). Puede hacer esto examinando la lista de procesos en el grupo antes y después de hacer una llamada a map_async :

 before = pool._pool[:] # Make a copy of the list of Process objects in our pool result = pool.map_async(func, iterable) # Use map_async so we don't get stuck. while not result.ready(): # Wait for the call to complete if any(proc.exitcode for proc in before): # Abort if one of our original processes is dead. print "One of our processes has exited. Something probably went horribly wrong." break result.wait(timeout=1) else: # We'll enter this block if we don't reach `break` above. print result.get() # Actually fetch the result list here. 

Tenemos que hacer una copia de la lista porque cuando muere un proceso en el Pool , Python lo reemplaza inmediatamente por un proceso nuevo, y elimina el muerto de la lista.

Esto funcionó para mí en mis pruebas, pero debido a que se basa en un atributo privado del objeto Pool ( _pool ), es peligroso usarlo en el código de producción. También sugeriría que puede ser excesivo preocuparse demasiado por este escenario, ya que es muy poco probable que ocurra y complica significativamente la implementación.