Process.join () y la cola no funcionan con grandes números

Estoy tratando de dividir por bucle, es decir

N = 1000000 for i in xrange(N): #do something 

utilizando multiprocesamiento. El proceso funciona bien con valores pequeños de N. El problema surge cuando uso valores mayores de N. Algo extraño sucede antes o durante el proceso de p.join () y el progtwig no responde. Si pongo print i, en lugar de q.put (i) en la definición de la función f, todo funciona bien.

Apreciaría cualquier ayuda. Aquí está el código.

 from multiprocessing import Process, Queue def f(q,nMin, nMax): # function for multiprocessing for i in xrange(nMin,nMax): q.put(i) if __name__ == '__main__': nEntries = 1000000 nCpu = 10 nEventsPerCpu = nEntries/nCpu processes = [] q = Queue() for i in xrange(nCpu): processes.append( Process( target=f, args=(q,i*nEventsPerCpu,(i+1)*nEventsPerCpu) ) ) for p in processes: p.start() for p in processes: p.join() print q.qsize() 

Está intentando boost su cola sin límites y se está uniendo a un subproceso que está esperando espacio en la cola, por lo que su proceso principal se detiene esperando que se complete, y nunca lo hará.

Si saca los datos de la cola antes de la unión, funcionará bien.

Una técnica que podrías usar es algo como esto:

 while 1: running = any(p.is_alive() for p in processes) while not queue.empty(): process_queue_data() if not running: break 

De acuerdo con la documentación, el p.is_alive () debe realizar una unión implícita, pero también parece implicar que la mejor práctica podría ser realizar explícitamente uniones en todos los subprocesos después de esto.

Edit: Aunque eso está bastante claro, puede que no sea todo ese rendimiento. La forma en que lo haga funcionar mejor será altamente específica para la tarea y la máquina (y, en general, no debería crear tantos procesos a la vez, de todos modos, a menos que algunos se bloqueen en la E / S).

Además de reducir la cantidad de procesos a la cantidad de CPU, algunas soluciones fáciles para hacerlo un poco más rápido (de nuevo, dependiendo de las circunstancias) podrían tener este aspecto:

 liveprocs = list(processes) while liveprocs: try: while 1: process_queue_data(q.get(False)) except Queue.Empty: pass time.sleep(0.5) # Give tasks a chance to put more data in if not q.empty(): continue liveprocs = [p for p in liveprocs if p.is_alive()]