Multiprocesamiento de Python con una cola de actualización y una cola de salida

¿Cómo puedo crear un script para un multiproceso de Python que use dos colas como estas ?:

  1. uno como una cola de trabajo que comienza con algunos datos y que, dependiendo de las condiciones de las funciones a ser paralelizadas, recibe más tareas sobre la marcha,
  2. otro que recostack los resultados y se utiliza para anotar el resultado después de que finalice el procesamiento.

Básicamente, necesito poner algunas tareas más en la cola de trabajo en función de lo que encontré en sus elementos iniciales. El ejemplo que publico a continuación es tonto (podría transformar el elemento como me gusta y ponerlo directamente en la Cola de salida), pero su mecánica es clara y refleja parte del concepto que necesito desarrollar.

Por este medio mi bash:

import multiprocessing as mp def worker(working_queue, output_queue): item = working_queue.get() #I take an item from the working queue if item % 2 == 0: output_queue.put(item**2) # If I like it, I do something with it and conserve the result. else: working_queue.put(item+1) # If there is something missing, I do something with it and leave the result in the working queue if __name__ == '__main__': static_input = range(100) working_q = mp.Queue() output_q = mp.Queue() for i in static_input: working_q.put(i) processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())] #I am running as many processes as CPU my machine has (is this wise?). for proc in processes: proc.start() for proc in processes: proc.join() for result in iter(output_q.get, None): print result #alternatively, I would like to (c)pickle.dump this, but I am not sure if it is possible. 

Esto no termina ni imprime ningún resultado.

Al final de todo el proceso, me gustaría asegurarme de que la cola de trabajo esté vacía y que todas las funciones paralelas hayan terminado de escribirse en la cola de salida antes de que se itere para sacar los resultados. ¿Tienes sugerencias sobre cómo hacer que funcione?

Tienes un error tipográfico en la línea que crea los procesos. Debe ser mp.Process , no mp.process . Esto es lo que está causando la excepción que obtienes.

Además, no está haciendo bucles en sus trabajadores, por lo que en realidad solo consumen un solo elemento de la cola y luego salen. Sin saber más sobre la lógica requerida, no es fácil dar consejos específicos, pero probablemente querrá incluir el cuerpo de su función de worker dentro de un bucle de while True y agregar una condición en el cuerpo para salir cuando el trabajo esté terminado.

Tenga en cuenta que, si no agrega una condición para salir explícitamente del bucle, sus trabajadores simplemente se detendrán para siempre cuando la cola esté vacía. Podría considerar el uso de la técnica llamada píldora venenosa para indicar a los trabajadores que pueden salir. Encontrará un ejemplo y una discusión útil en el artículo de PyMOTW sobre la comunicación entre procesos .

En cuanto a la cantidad de procesos a usar, tendrá que realizar un punto de referencia un poco para encontrar lo que funciona para usted, pero, en general, un proceso por núcleo es un buen punto de partida cuando su carga de trabajo está vinculada a la CPU. Si su carga de trabajo está vinculada a IO, es posible que tenga mejores resultados con un mayor número de trabajadores.

El siguiente código logra los resultados esperados. Sigue las sugerencias hechas por @tawmas.

Este código permite utilizar múltiples núcleos en un proceso que requiere que ellos puedan actualizar la cola que alimenta los datos a los trabajadores durante el procesamiento:

 import multiprocessing as mp def worker(working_queue, output_queue): while True: if working_queue.empty() == True: break #this is the so-called 'poison pill' else: picked = working_queue.get() if picked % 2 == 0: output_queue.put(picked) else: working_queue.put(picked+1) return if __name__ == '__main__': static_input = xrange(100) working_q = mp.Queue() output_q = mp.Queue() results_bank = [] for i in static_input: working_q.put(i) processes = [mp.Process(target=worker,args=(working_q, output_q)) for i in range(mp.cpu_count())] for proc in processes: proc.start() for proc in processes: proc.join() results_bank = [] while True: if output_q.empty() == True: break results_bank.append(output_q.get_nowait()) print len(results_bank) # length of this list should be equal to static_input, which is the range used to populate the input queue. In other words, this tells whether all the items placed for processing were actually processed. results_bank.sort() print results_bank