¿Cómo paralelizar un cálculo de sum en python numpy?

Tengo una sum que estoy tratando de calcular, y tengo dificultades para paralelizar el código. El cálculo que estoy tratando de paralelizar es un poco complejo (usa matrices numpy y matrices dispersas). Escupe una matriz numpy, y quiero sumr las matrices de salida de unos 1000 cálculos. Idealmente, mantendría una sum stream en todas las iteraciones. Sin embargo, no he podido averiguar cómo hacer esto.

Hasta ahora, he intentado usar la función paralela de joblib y la función pool.map con el paquete de multiprocesamiento de python. Para ambos, uso una función interna que devuelve una matriz numpy. Estas funciones devuelven una lista, que convierto a una matriz numpy y luego la sumo.

Sin embargo, después de que la función paralela de Joblib complete todas las iteraciones, el progtwig principal nunca continúa ejecutándose (parece que el trabajo original se encuentra en un estado suspendido, utilizando una CPU del 0%). Cuando uso pool.map, recibo errores de memoria después de completar todas las iteraciones.

¿Hay una manera de simplemente paralelizar una sum de arrays?

Editar : El objective es hacer algo como lo siguiente, excepto en paralelo.

def summers(num_iters): sumArr = np.zeros((1,512*512)) #initialize sum for index in range(num_iters): sumArr = sumArr + computation(index) #computation returns a 1 x 512^2 numpy array return sumArr 

Descubrí cómo hacer una paralelización de una sum de arreglos con multiprocesamiento, apply_async y devoluciones de llamada, así que estoy publicando esto aquí para otras personas. Utilicé la página de ejemplo para Parallel Python para la clase de callback Suma, aunque en realidad no usé ese paquete para la implementación. Sin embargo, me dio la idea de utilizar devoluciones de llamada. Aquí está el código simplificado para lo que terminé usando, y hace lo que quería que hiciera.

 import multiprocessing import numpy as np import thread class Sum: #again, this class is from ParallelPython's example code (I modified for an array and added comments) def __init__(self): self.value = np.zeros((1,512*512)) #this is the initialization of the sum self.lock = thread.allocate_lock() self.count = 0 def add(self,value): self.count += 1 self.lock.acquire() #lock so sum is correct if two processes return at same time self.value += value #the actual summation self.lock.release() def computation(index): array1 = np.ones((1,512*512))*index #this is where the array-returning computation goes return array1 def summers(num_iters): pool = multiprocessing.Pool(processes=8) sumArr = Sum() #create an instance of callback class and zero the sum for index in range(num_iters): singlepoolresult = pool.apply_async(computation,(index,),callback=sumArr.add) pool.close() pool.join() #waits for all the processes to finish return sumArr.value 

También pude hacer que esto funcionara utilizando un mapa paralelizado, que se sugirió en otra respuesta. Lo había intentado antes, pero no lo estaba implementando correctamente. Ambas formas funcionan, y creo que esta respuesta explica bastante bien la cuestión de qué método usar (map o apply.async). Para la versión del mapa, no necesita definir la clase Suma y la función de veranos se convierte en

 def summers(num_iters): pool = multiprocessing.Pool(processes=8) outputArr = np.zeros((num_iters,1,512*512)) #you wouldn't have to initialize these sumArr = np.zeros((1,512*512)) #but I do to make sure I have the memory outputArr = np.array(pool.map(computation, range(num_iters))) sumArr = outputArr.sum(0) pool.close() #not sure if this is still needed since map waits for all iterations return sumArr 

No estoy seguro de entender el problema. ¿Está intentando particionar una lista en un grupo de trabajadores, hacer que mantengan una sum de sus cálculos y sumr el resultado?

 #!/bin/env python import sys import random import time import multiprocessing import numpy as np numpows = 5 numitems = 25 nprocs = 4 def expensiveComputation( i ): time.sleep( random.random() * 10 ) return np.array([i**j for j in range(numpows)]) def listsum( l ): sum = np.zeros_like(l[0]) for item in l: sum = sum + item return sum def partition(lst, n): division = len(lst) / float(n) return [ lst[int(round(division * i)): int(round(division * (i + 1)))] for i in xrange(n) ] def myRunningSum( l ): sum = np.zeros(numpows) for item in l: sum = sum + expensiveComputation(item) return sum if __name__ == '__main__': random.seed(1) data = range(numitems) pool = multiprocessing.Pool(processes=4,) calculations = pool.map(myRunningSum, partition(data,nprocs)) print 'Answer is:', listsum(calculations) print 'Expected answer: ', np.array([25.,300.,4900.,90000.,1763020.]) 

(la función de partición que viene de Python: cortar una lista en n particiones de longitud casi igual )