“Error EOF” en la salida del progtwig usando multiproceso de cola y subproceso

Tengo problemas para entender por qué este sencillo progtwig genera un EOFError al final.

Estoy utilizando una Queue() para comunicarme con un Thread() que deseo finalizar de forma automática y limpia en un atexit de mi progtwig.

 import threading import multiprocessing import atexit class MyClass: def __init__(self): self.queue = None self.thread = None def start(self): self.queue = multiprocessing.Queue() self.thread = threading.Thread(target=self.queued_writer, daemon=True) self.thread.start() # Remove this: no error self.queue.put("message") def queued_writer(self): while 1: msg = self.queue.get() print("Message:", msg) if msg is None: break def stop(self): self.queue.put(None) self.thread.join() instance = MyClass() atexit.register(instance.stop) # Put this before register: no error instance.start() 

Esto plantea:

 Traceback (most recent call last): File "/usr/lib/python3.6/threading.py", line 916, in _bootstrap_inner self.run() File "/usr/lib/python3.6/threading.py", line 864, in run self._target(*self._args, **self._kwargs) File "test.py", line 21, in queued_writer msg = self.queue.get() File "/usr/lib/python3.6/multiprocessing/queues.py", line 94, in get res = self._recv_bytes() File "/usr/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes buf = self._recv_bytes(maxlength) File "/usr/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes buf = self._recv(4) File "/usr/lib/python3.6/multiprocessing/connection.py", line 383, in _recv raise EOFError EOFError 

Además, este fragmento de self.queue.put("message") comporta de forma extraña: si self.queue.put("message") la línea self.queue.put("message") , no se self.queue.put("message") ningún error y el hilo sale correctamente. De manera similar, esto parece funcionar si la instance.start() es llamada antes de atexit.register() .

¿Alguien sabe de dónde podría venir el error por favor?

Edición: Noté que usar un SimpleQueue() parece hacer desaparecer el error.

El problema proviene de un conflicto entre varias llamadas atexit.register() .

La documentación establece que:

atexit ejecuta estas funciones en el orden inverso en el que se registraron; si registra A , B y C , en el momento de la finalización del intérprete se ejecutarán en el orden C , B , A

[…]

El supuesto es que los módulos de nivel inferior normalmente se importarán antes que los módulos de nivel superior y, por lo tanto, se deben limpiar más tarde.

Al importar primero el multiprocessing y luego llamar a atexit.register(my_stop) , usted esperaría que su función de detención se ejecute antes de cualquier procedimiento interno de terminación … Pero este no es el caso, porque atexit.register() puede llamarse dinámicamente.

En el presente caso, la biblioteca de multiprocessing hace uso de una función _exit_function que está destinada a cerrar de forma limpia los subprocesos internos y las colas. Esta función se registra en atexit en el nivel de módulo , sin embargo, el módulo solo se carga una vez que se inicializa el objeto Queue() .

En consecuencia, la función de parada MyClass se registra antes de la multiprocessing y, por lo tanto instance.stop se llama a la _exit_function . Se _exit_function después de _exit_function .

Durante su terminación, _exit_function cierra las conexiones de tuberías internas, por lo que si el subproceso más adelante intenta llamar a .get() con una conexión de lectura cerrada, se EOFError un EOFError . Esto sucede solo si Python no tuvo tiempo de matar automáticamente el subproceso del daemon al final, es decir, si una función de salida “lenta” (como time.sleep(0.1) o en este caso thread.join() ) se registra y ejecuta Después del procedimiento de cierre habitual. Por alguna razón, el cierre de la conexión de escritura se retrasa, por lo que .put() no .put() error de inmediato.

En cuanto a por qué las pequeñas modificaciones en el fragmento de SimpleQueue lo hacen funcionar: SimpleQueue no tiene Finalizer por lo que la canalización interna se cierra más tarde. El subproceso interno de la Queue no se inicia hasta que se llama al primer .put() por lo que eliminarlo significa que no hay una tubería que cerrar. También es posible forzar el registro al importar multiprocessing.queues .

Para lograrlo, puede definir __enter__ y __exit__ dentro de su clase y crear su instancia usando la instrucción:

 import threading import multiprocessing class MyClass: def __init__(self): self.queue = None self.thread = None def __enter__(self): return self def __exit__(self, type, value, traceback): self.stop() def start(self): self.queue = multiprocessing.Queue() self.thread = threading.Thread(target=self.queued_writer, daemon=True) self.thread.start() def queued_writer(self): while 1: msg = self.queue.get() print("Message:", str(msg)) if msg is None: break def put(self, msg): self.queue.put(msg) def stop(self): self.queue.put(None) self.thread.join() with MyClass() as instance: instance.start() print('Thread stopped: ' + str(instance.thread._is_stopped)) instance.put('abc') print('Thread stopped: ' + str(instance.thread._is_stopped)) 

El código anterior da como salida:

 Thread stopped: False Message: abc Message: None Thread stopped: True 

La respuesta superficial a su pregunta es bastante simple. El proceso queued_writer todavía está esperando que se escriban las entradas en la cola cuando finaliza el proceso principal, enviando un EOF a la conexión de locking abierto que self.queue.get abrió.

Eso plantea la pregunta de por qué el atexit.register no parece hacer su trabajo, pero de eso no sé por qué.