Compartiendo muchas colas entre procesos en Python

Soy consciente del multiprocessing.Manager() y cómo se puede usar para crear objetos compartidos, en particular colas que pueden compartirse entre los trabajadores. Existe esta pregunta , esta pregunta , esta pregunta e incluso una de mis propias preguntas .

Sin embargo, necesito definir muchas colas, cada una de las cuales está vinculando un par específico de procesos. Digamos que cada par de procesos y su cola de enlace se identifican mediante la key variable.

Quiero usar un diccionario para acceder a mis colas cuando necesito colocar y obtener datos. No puedo hacer que esto funcione. He intentado una serie de cosas. Con multiprocessing importado como mp :

La definición de un dict como for key in all_keys: DICT[key] = mp.Queue en un archivo de configuración que es importado por el módulo de multiprocesamiento ( multi.py ) no devuelve errores, pero la cola DICT[key] no se comparte Entre los procesos, cada uno parece tener su propia copia de la cola y, por lo tanto, no ocurre ninguna comunicación.

Si bash definir el DICT al principio de la función de multiprocesamiento principal que define los procesos y los inicia, como

 DICT = mp.Manager().dict() for key in all_keys: DICT[key] = mp.Queue() 

Me sale el error

 RuntimeError: Queue objects should only be shared between processes through inheritance 

Cambiando a

 DICT = mp.Manager().dict() for key in all_keys: DICT[key] = mp.Manager().Queue() 

Solo hace que todo sea peor. Probar definiciones similares al multi.py de multi.py lugar de dentro de la función principal devuelve errores similares.

Debe haber una manera de compartir muchas colas entre procesos sin nombrar explícitamente cada uno en el código. ¿Algunas ideas?

Editar

Aquí hay un esquema básico del progtwig:

1- carga el primer módulo, que define algunas variables, importa multi , inicia multi.main() y carga otro módulo que inicia una cascada de cargas de módulos y ejecución de código. Mientras tanto…

2- multi.main ve así:

 def main(): manager = mp.Manager() pool = mp.Pool() DICT2 = manager.dict() for key in all_keys: DICT2[key] = manager.Queue() proc_1 = pool.apply_async(targ1,(DICT1[key],) ) #DICT1 is defined in the config file proc_2 = pool.apply_async(targ2,(DICT2[key], otherargs,) 

En lugar de usar pool y manager , también estaba iniciando procesos con lo siguiente:

 mp.Process(target=targ1, args=(DICT[key],)) 

3 – La función targ1 toma datos de entrada que vienen (ordenados por key ) del proceso principal. Está destinado a pasar el resultado a DICT[key] para que targ2 pueda hacer su trabajo. Esta es la parte que no está funcionando. Hay un número arbitrario de targ1 s, targ2 s, etc. y, por lo tanto, un número arbitrario de colas.

4 – Los resultados de algunos de estos procesos se enviarán a un conjunto de diferentes marcos de datos / marcos de datos pandas que también se indexan por key , y me gustaría que sea accesible desde procesos arbitrarios, incluso los lanzados en un módulo diferente. Todavía tengo que escribir esta parte y podría ser una pregunta diferente. (Lo menciono aquí porque la respuesta a 3 arriba también podría resolver 4 muy bien).

Parece que tus problemas comenzaron cuando intentaste compartir un multiprocessing.Queue() pasándolo como un argumento. Puede solucionar esto creando una cola administrada en su lugar:

 import multiprocessing manager = mutiprocessing.Manager() passable_queue = manager.Queue() 

Cuando utiliza un administrador para crearlo, está almacenando y pasando un proxy a la cola, en lugar de la cola en sí, por lo que incluso cuando el objeto que pasa a sus procesos de trabajo se copia, seguirá apuntando al mismo subyacente estructura de datos: su cola. Es muy similar (en concepto) a los punteros en C / C ++. Si crea sus colas de esta manera, podrá pasarlas cuando inicie un proceso de trabajo.

Como ahora puede pasar colas, ya no necesita administrar su diccionario. Mantenga un diccionario normal en main que almacenará todas las asignaciones, y solo proporcione a su trabajador los procesos que necesitan, para que no necesiten acceso a ninguna asignación.

He escrito un ejemplo de esto aquí. Parece que estás pasando objetos entre tus trabajadores, así que eso es lo que se hace aquí. Imaginemos que tenemos dos etapas de procesamiento, y los datos comienzan y terminan en el control de main . Observe cómo podemos crear las colas que conectan a los trabajadores como una tubería, pero al darles solo las colas que necesitan , no es necesario que conozcan ninguna asignación:

 import multiprocessing as mp def stage1(q_in, q_out): q_out.put(q_in.get()+"Stage 1 did some work.\n") return def stage2(q_in, q_out): q_out.put(q_in.get()+"Stage 2 did some work.\n") return def main(): pool = mp.Pool() manager = mp.Manager() # create managed queues q_main_to_s1 = manager.Queue() q_s1_to_s2 = manager.Queue() q_s2_to_main = manager.Queue() # launch workers, passing them the queues they need results_s1 = pool.apply_async(stage1, (q_main_to_s1, q_s1_to_s2)) results_s2 = pool.apply_async(stage2, (q_s1_to_s2, q_s2_to_main)) # Send a message into the pipeline q_main_to_s1.put("Main started the job.\n") # Wait for work to complete print(q_s2_to_main.get()+"Main finished the job.") pool.close() pool.join() return if __name__ == "__main__": main() 

El código produce esta salida:

Main comenzó el trabajo.
La etapa 1 hizo algo de trabajo.
La etapa 2 hizo algo de trabajo.
Main terminó el trabajo.

No AsyncResults un ejemplo de almacenamiento de colas u objetos AsyncResults en diccionarios, porque todavía no entiendo bien cómo se supone que funciona su progtwig. Pero ahora que puede pasar sus colas libremente, puede crear su diccionario para almacenar las colas / procesos de la cola según sea necesario.

De hecho, si realmente construye una tubería entre varios trabajadores, ni siquiera necesita mantener una referencia a las colas de “inter-worker” en main . Cree las colas, páselas a sus trabajadores, luego solo conserve las referencias a las colas que usará main . Definitivamente recomendaría intentar que las antiguas colas se recojan de basura lo más rápido posible si realmente tiene “un número arbitrario” de colas.