Python 3 Multiprocesamiento de interlocking de la cola al llamar a unirse antes de que la cola esté vacía

Tengo una pregunta que comprende la cola en el módulo de multiprocessing en python 3

Esto es lo que dicen en las pautas de progtwigción :

Tenga en cuenta que un proceso que haya puesto los elementos en una cola esperará antes de finalizar hasta que todos los elementos almacenados en el búfer sean alimentados por el subproceso del “alimentador” a la tubería subyacente. (El proceso secundario puede llamar al método Queue.cancel_join_thread de la cola para evitar este comportamiento).

Esto significa que siempre que use una cola, debe asegurarse de que todos los elementos que se han puesto en la cola finalmente se eliminarán antes de unirse al proceso. De lo contrario, no puede estar seguro de que los procesos que han puesto elementos en la cola terminarán. Recuerde también que los procesos no demoníacos se unirán automáticamente.

Un ejemplo que va a interbloquear es el siguiente:

  del proceso de importación multiprocesamiento, Cola

 def f (q):
     q.put ('X' * 1000000)

 si __name__ == '__main__':
     queue = Queue ()
     p = Proceso (destino = f, args = (cola,))
     p.start ()
     p.join () # estos puntos muertos
     obj = queue.get ()

Una solución aquí sería cambiar las dos últimas líneas (o simplemente eliminar la línea p.join ()).

Aparentemente, no se debe llamar a queue.get() después de join() .

Sin embargo, hay ejemplos de uso de colas donde se llama a get después de una join como:

 import multiprocessing as mp import random import string # define a example function def rand_string(length, output): """ Generates a random string of numbers, lower- and uppercase chars. """ rand_str = ''.join(random.choice( string.ascii_lowercase + string.ascii_uppercase + string.digits) for i in range(length)) output.put(rand_str) if __name__ == "__main__": # Define an output queue output = mp.Queue() # Setup a list of processes that we want to run processes = [mp.Process(target=rand_string, args=(5, output)) for x in range(2)] # Run processes for p in processes: p.start() # Exit the completed processes for p in processes: p.join() # Get process results from the output queue results = [output.get() for p in processes] print(results) 

He ejecutado este progtwig y funciona (también se publicó como una solución a la pregunta de Python 3 de StackOverFlow – Multiprocesamiento – Queue.get () no responde ).

¿Podría alguien ayudarme a entender cuál es la regla para el punto muerto aquí?

La implementación de la cola en el multiprocesamiento que permite la transferencia de datos entre procesos se basa en tuberías estándar del sistema operativo.

Las canalizaciones del sistema operativo no son infinitamente largas, por lo que el proceso de colas de datos podría bloquearse en la operación put() hasta que algún otro proceso use get() para recuperar datos de la cola.

Para pequeñas cantidades de datos, como la de su ejemplo, el proceso principal puede join() todos los subprocesos generados y luego recoger los datos. Esto a menudo funciona bien, pero no se escala, y no está claro cuándo se romperá.

Pero ciertamente se romperá con grandes cantidades de datos. El subproceso se bloqueará en put() espera de que el proceso principal elimine algunos datos de la cola con get() , pero el proceso principal se bloquea en join() espera de que finalice el subproceso. Esto se traduce en un punto muerto.

Aquí hay un ejemplo donde un usuario tuvo este problema exacto . Publiqué un código en una respuesta que lo ayudó a resolver su problema.

No llame a join() en un objeto de proceso antes de recibir todos los mensajes de la cola compartida.

Utilicé la siguiente solución para permitir que los procesos salieran antes de procesar todos sus resultados:

 results = [] while True: try: result = resultQueue.get(False, 0.01) results.append(result) except queue.Empty: pass allExited = True for t in processes: if t.exitcode is None: allExited = False break if allExited & resultQueue.empty(): break 

Se puede acortar, pero lo dejé por más tiempo para ser más claro para los novatos.

Aquí resultQueue es el multiprocess.Queue que se compartió con multiprocess.Process objetos multiprocess.Process . Después de este bloque de código, obtendrá la matriz de result con todos los mensajes de la cola.

El problema es que el búfer de entrada de la tubería de la cola que recibe los mensajes puede llenarse, causando un locking infinito de los escritores hasta que haya suficiente espacio para recibir el siguiente mensaje. Así que tienes tres formas de evitar el locking:

  • Aumente el tamaño de multiprocessing.connection.BUFFER (no tan bueno)
  • Disminuir el tamaño del mensaje o su cantidad (no tan bueno)
  • Recupera los mensajes de la cola inmediatamente a medida que van llegando (buena manera)