Aplicar reducir en la salida del generador con multiprocesamiento

Tengo una función de generador (Python) que funciona así

def Mygenerator(x, y, z, ...): while True: # code that makes two matrices based on sequences of input arrays yield (matrix1, matrix2) 

Lo que quiero hacer es agregar la salida de este generador. Esta línea hace el trabajo:

 M1, M2 = reduce(lambda x, y: x[0] + y[0], x[1] + y[1], Mygenerator(x, y, z, ...)) 

Me gustaría paralelizar esto para acelerar los cálculos. Es importante que las salidas de Mygenerator se reduzcan a medida que se list(Mygenerator(...)) , ya que list(Mygenerator(...)) tomaría demasiada memoria.

Para responder a mi propia pregunta, encontré una solución que parece funcionar como esperaba:

Primero, Mygenerator ya no es un generador sino una función. Además, en lugar de recorrer los segmentos de x, y, z, ahora paso un segmento a la función en ese momento:

 def Myfunction(x_segment, y_segment, z_segment): # code that makes two matrices based on input arrays return (matrix1, matrix2) 

El uso de multiprocessing.Pool La imap con la función imap (generador) parece funcionar:

 pool = multiprocessing.Pool(ncpus) results = pool.imap(Myfunction, ( (x[i], y[i], z[i]) for i in range(len(x)) ) M1, M2 = reduce(lambda r1, r2: (r1[0] + r2[0], r1[1] + r2[1]), (result for result in results)) pool.close() pool.join() 

donde cambié x y y en la expresión lambda a r1 y r2 para evitar confusiones con las otras variables con el mismo nombre. Cuando bash usar un generador con multiprocessing , tengo algunos problemas con pickle.

La única decepción con esta solución es que realmente no aceleró mucho los cálculos. Supongo que eso tiene que ver con las operaciones generales. Cuando se utilizan 8 núcleos, la velocidad de procesamiento se incrementó en aproximadamente un 10%. Al reducir a 4 núcleos se duplicó la velocidad. Esto parece ser lo mejor que puedo hacer con mi tarea particular, a menos que haya alguna otra manera de hacer el paralelismo …

La función imap era necesaria para usar aquí, ya que map almacenaría todos los valores devueltos en la memoria antes de la operación de reduce , y en este caso eso no sería posible.