El proceso de multiprocesamiento de Python se bloquea silenciosamente

Estoy usando Python 2.7.3. He paralelizado algún código usando objetos multiprocessing.Process subclasificado. Si no hay errores en el código en mis objetos de proceso subclasificados, todo funciona bien. Pero si hay errores en el código en mis objetos de proceso subclasificados, aparentemente se bloquearán silenciosamente (no se imprimirá un seguimiento de stack en el shell primario) y el uso de la CPU se reducirá a cero. El código principal nunca falla, dando la impresión de que la ejecución simplemente se bloquea. Mientras tanto, es realmente difícil localizar dónde está el error en el código porque no se da ninguna indicación de dónde está el error.

No puedo encontrar ninguna otra pregunta en stackoverflow que trate el mismo problema.

Supongo que los objetos de proceso subclasificados parecen bloquearse silenciosamente porque no pueden imprimir un mensaje de error al shell de los padres, pero me gustaría saber qué puedo hacer al respecto para poder al menos depurar más eficientemente (y para que otros los usuarios de mi código pueden decirme cuándo tienen problemas también).

EDITAR: mi código real es demasiado complejo, pero un ejemplo trivial de un objeto de proceso subclasificado con un error sería algo como esto:

 from multiprocessing import Process, Queue class Worker(Process): def __init__(self, inputQueue, outputQueue): super(Worker, self).__init__() self.inputQueue = inputQueue self.outputQueue = outputQueue def run(self): for i in iter(self.inputQueue.get, 'STOP'): # (code that does stuff) 1 / 0 # Dumb error # (more code that does stuff) self.outputQueue.put(result) 

Lo que realmente desea es alguna forma de pasar las excepciones al proceso principal, ¿verdad? Entonces puedes manejarlos como quieras.

Si usa concurrent.futures.ProcessPoolExecutor , esto es automático. Si usas multiprocessing.Pool , es trivial. Si usa Process y Queue explícitos, tiene que hacer un poco de trabajo, pero no es tanto.

Por ejemplo:

 def run(self): try: for i in iter(self.inputQueue.get, 'STOP'): # (code that does stuff) 1 / 0 # Dumb error # (more code that does stuff) self.outputQueue.put(result) except Exception as e: self.outputQueue.put(e) 

Luego, su código de llamada puede leer Exception fuera de la cola como cualquier otra cosa. En lugar de esto:

 yield outq.pop() 

hacer esto:

 result = outq.pop() if isinstance(result, Exception): raise result yield result 

(No sé qué hace el código de lectura de la cola del proceso padre real, porque su muestra mínima simplemente ignora la cola. Pero espero que esto explique la idea, aunque su código real en realidad no funciona así).

Esto supone que desea abortar en cualquier excepción no controlada que lo haga run . Si desea devolver la excepción y continuar a la siguiente i in iter , simplemente mueva el try al for , en lugar de a su alrededor.

Esto también supone que las Exception no son valores válidos. Si eso es un problema, la solución más simple es simplemente insertar tuplas (result, exception) :

 def run(self): try: for i in iter(self.inputQueue.get, 'STOP'): # (code that does stuff) 1 / 0 # Dumb error # (more code that does stuff) self.outputQueue.put((result, None)) except Exception as e: self.outputQueue.put((None, e)) 

Entonces, su código de popping hace esto:

 result, exception = outq.pop() if exception: raise exception yield result 

Puede observar que esto es similar al estilo de callback node.js, donde pasa (err, result) a cada callback. Sí, es molesto, y vas a desordenar el código en ese estilo. Pero en realidad no estás usando eso en ninguna parte excepto en el envoltorio; todo su código de “nivel de aplicación” que obtiene valores de la cola o recibe una llamada dentro de la run solo ve rendimientos / rendimientos normales y excepciones generadas.

Es posible que incluso desee considerar la posibilidad de construir un Future a la especificación de concurrent.futures (o usar esa clase como está), incluso si está haciendo su trabajo en cola y ejecutándose manualmente. No es tan difícil, y te da una API muy buena, especialmente para la depuración.

Finalmente, vale la pena señalar que la mayoría del código creado alrededor de los trabajadores y las colas se puede simplificar mucho más con un diseño de ejecutor / grupo, incluso si está absolutamente seguro de que solo desea un trabajador por cola. Simplemente Worker.run la Worker.run y gire el bucle en el método Worker.run en una función (que simplemente return s o raise s como de costumbre, en lugar de agregarse a una cola). En el lado de la llamada, una vez más descarte toda la placa de soporte y simplemente submit o map la función de trabajo con sus parámetros.

Todo su ejemplo puede reducirse a:

 def job(i): # (code that does stuff) 1 / 0 # Dumb error # (more code that does stuff) return result with concurrent.futures.ProcessPoolExecutor(max_workers=1) as executor: results = executor.map(job, range(10)) 

Y automáticamente manejará las excepciones correctamente.


Como mencionó en los comentarios, el rastreo de una excepción no se remonta al proceso secundario; solo llega hasta la llamada de raise result manual (o, si está utilizando un grupo o ejecutor, las agallas del grupo o ejecutor).

La razón es que el multiprocessing.Queue se construye sobre el pickle , y las excepciones de escabeche no escurren sus tracebacks. Y la razón de esto es que no se pueden escudriñar las trazas. Y la razón de ello es que las trazas están llenas de referencias al contexto de ejecución local, por lo que hacer que funcionen en otro proceso sería muy difícil.

Entonces … ¿qué puedes hacer al respecto? No vayas buscando una solución totalmente general. En su lugar, piensa en lo que realmente necesitas. El 90% de las veces, lo que desea es “registrar la excepción, con rastreo y continuar” o “imprimir la excepción, con rastreo, a stderr y exit(1) como el controlador de excepciones no manejado predeterminado”. Para cualquiera de estos, no es necesario que pase una excepción; simplemente formatéalo en el lado del niño y pasa una cadena. Si necesita algo más sofisticado, calcule exactamente lo que necesita, y pase la información suficiente para juntarlo manualmente. Si no sabe cómo dar formato a las trazas y excepciones, consulte el módulo de traceback . Es bastante simple Y esto significa que no necesitas meterte en la maquinaria de salmuera. (No es que sea muy difícil copyreg un pickler o escribir una clase de titular con un método __reduce__ o algo así, pero si no es necesario, ¿por qué aprender todo eso?)

Sugiero tal solución para mostrar las excepciones del proceso.

 from multiprocessing import Queue, Process, RawValue, Semaphore, Lock, Pool import traceback run_old = Process.run def run_new(*args, **kwargs): try: run_old(*args, **kwargs) except (KeyboardInterrupt, SystemExit): raise except: traceback.print_exc(file=sys.stdout) Process.run = run_new 

Esto no es una respuesta, solo un comentario extendido. Ejecute este progtwig y díganos qué salida (si corresponde) obtiene:

 from multiprocessing import Process, Queue class Worker(Process): def __init__(self, inputQueue, outputQueue): super(Worker, self).__init__() self.inputQueue = inputQueue self.outputQueue = outputQueue def run(self): for i in iter(self.inputQueue.get, 'STOP'): # (code that does stuff) 1 / 0 # Dumb error # (more code that does stuff) self.outputQueue.put(result) if __name__ == '__main__': inq, outq = Queue(), Queue() inq.put(1) inq.put('STOP') w = Worker(inq, outq) w.start() 

Yo obtengo:

 % test.py Process Worker-1: Traceback (most recent call last): File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap self.run() File "/home/unutbu/pybin/test.py", line 21, in run 1 / 0 # Dumb error ZeroDivisionError: integer division or modulo by zero 

Estoy sorprendido (si) no consigues nada.