Use la matriz numpy en la memoria compartida para multiprocesamiento

Me gustaría usar una matriz numpy en la memoria compartida para usar con el módulo de multiprocesamiento. La dificultad es usarlo como una matriz numpy, y no solo como una matriz ctypes.

from multiprocessing import Process, Array import scipy def f(a): a[0] = -a[0] if __name__ == '__main__': # Create the array N = int(10) unshared_arr = scipy.rand(N) arr = Array('d', unshared_arr) print "Originally, the first two elements of arr = %s"%(arr[:2]) # Create, start, and finish the child processes p = Process(target=f, args=(arr,)) p.start() p.join() # Printing out the changed values print "Now, the first two elements of arr = %s"%arr[:2] 

Esto produce una salida tal como:

 Originally, the first two elements of arr = [0.3518653236697369, 0.517794725524976] Now, the first two elements of arr = [-0.3518653236697369, 0.517794725524976] 

Se puede acceder a la matriz en forma de ctypes, por ejemplo, arr[i] tiene sentido. Sin embargo, no es una matriz numpy y no puedo realizar operaciones como -1*arr o arr.sum() . Supongo que una solución sería convertir el array ctypes en un array numpy. Sin embargo (además de no poder hacer que esto funcione), no creo que se comparta más.

Parece que habría una solución estándar para lo que tiene que ser un problema común.

Para agregar a las respuestas de @unutbu (ya no está disponible) y de @Henry Gomersall. Podría usar shared_arr.get_lock() para sincronizar el acceso cuando sea necesario:

 shared_arr = mp.Array(ctypes.c_double, N) # ... def f(i): # could be anything numpy accepts as an index such another numpy array with shared_arr.get_lock(): # synchronize access arr = np.frombuffer(shared_arr.get_obj()) # no data copying arr[i] = -arr[i] 

Ejemplo

 import ctypes import logging import multiprocessing as mp from contextlib import closing import numpy as np info = mp.get_logger().info def main(): logger = mp.log_to_stderr() logger.setLevel(logging.INFO) # create shared array N, M = 100, 11 shared_arr = mp.Array(ctypes.c_double, N) arr = tonumpyarray(shared_arr) # fill with random values arr[:] = np.random.uniform(size=N) arr_orig = arr.copy() # write to arr from different processes with closing(mp.Pool(initializer=init, initargs=(shared_arr,))) as p: # many processes access the same slice stop_f = N // 10 p.map_async(f, [slice(stop_f)]*M) # many processes access different slices of the same array assert M % 2 # odd step = N // 10 p.map_async(g, [slice(i, i + step) for i in range(stop_f, N, step)]) p.join() assert np.allclose(((-1)**M)*tonumpyarray(shared_arr), arr_orig) def init(shared_arr_): global shared_arr shared_arr = shared_arr_ # must be inherited, not passed as an argument def tonumpyarray(mp_arr): return np.frombuffer(mp_arr.get_obj()) def f(i): """synchronized.""" with shared_arr.get_lock(): # synchronize access g(i) def g(i): """no synchronization.""" info("start %s" % (i,)) arr = tonumpyarray(shared_arr) arr[i] = -1 * arr[i] info("end %s" % (i,)) if __name__ == '__main__': mp.freeze_support() main() 

Si no necesita acceso sincronizado o crea sus propios lockings, mp.Array() es necesario. Puedes usar mp.sharedctypes.RawArray en este caso.

El objeto Array tiene get_obj() un método get_obj() , que devuelve la matriz ctypes que presenta una interfaz de búfer. Creo que lo siguiente debería funcionar …

 from multiprocessing import Process, Array import scipy import numpy def f(a): a[0] = -a[0] if __name__ == '__main__': # Create the array N = int(10) unshared_arr = scipy.rand(N) a = Array('d', unshared_arr) print "Originally, the first two elements of arr = %s"%(a[:2]) # Create, start, and finish the child process p = Process(target=f, args=(a,)) p.start() p.join() # Print out the changed values print "Now, the first two elements of arr = %s"%a[:2] b = numpy.frombuffer(a.get_obj()) b[0] = 10.0 print a[0] 

Cuando se ejecuta, esto imprime el primer elemento de a ahora es 10.0, mostrando a y b son solo dos vistas en la misma memoria.

Para asegurarse de que aún sea seguro para multiprocesadores, creo que tendrá que usar los métodos de acquire y release que existen en el objeto Array , y su locking integrado para asegurarse de que se pueda acceder a él de forma segura (aunque no estoy Un experto en el módulo multiprocesador).

Si bien las respuestas ya proporcionadas son buenas, hay una solución mucho más fácil para este problema siempre que se cumplan dos condiciones:

  1. Está en un sistema operativo compatible con POSIX (por ejemplo, Linux, Mac OSX); y
  2. Los procesos secundarios necesitan acceso de solo lectura a la matriz compartida.

En este caso, no es necesario que juegues con variables compartidas explícitamente, ya que los procesos secundarios se crearán utilizando un fork. Un hijo bifurcado comparte automáticamente el espacio de memoria de los padres. En el contexto del multiprocesamiento de Python, esto significa que comparte todas las variables de nivel de módulo ; tenga en cuenta que esto no se aplica a los argumentos que usted pasa explícitamente a sus procesos secundarios o a las funciones a las que llama en un multiprocessing.Pool .

Un ejemplo simple:

 import multiprocessing import numpy as np # will hold the (implicitly mem-shared) data data_array = None # child worker function def job_handler(num): # built-in id() returns unique memory ID of a variable return id(data_array), np.sum(data_array) def launch_jobs(data, num_jobs=5, num_worker=4): global data_array data_array = data pool = multiprocessing.Pool(num_worker) return pool.map(job_handler, range(num_jobs)) # create some random data and execute the child jobs mem_ids, sumvals = zip(*launch_jobs(np.random.rand(10))) # this will print 'True' on POSIX OS, since the data was shared print(np.all(np.asarray(mem_ids) == id(data_array))) 

He escrito un pequeño módulo de python que utiliza la memoria compartida POSIX para compartir matrices numpy entre los intérpretes de python. Tal vez lo encuentres a mano.

https://pypi.python.org/pypi/SharedArray

Así es como funciona:

 import numpy as np import SharedArray as sa # Create an array in shared memory a = sa.create("test1", 10) # Attach it as a different array. This can be done from another # python interpreter as long as it runs on the same computer. b = sa.attach("test1") # See how they are actually sharing the same memory block a[0] = 42 print(b[0]) # Destroying a does not affect b. del a print(b[0]) # See how "test1" is still present in shared memory even though we # destroyed the array a. sa.list() # Now destroy the array "test1" from memory. sa.delete("test1") # The array b is not affected, but once you destroy it then the # data are lost. print(b[0]) 

Puede usar el módulo sharedmem : https://bitbucket.org/cleemesser/numpy-sharedmem

Aquí está su código original, esta vez usando una memoria compartida que se comporta como una matriz NumPy (note la última instrucción adicional que llama a la función sum() NumPy):

 from multiprocessing import Process import sharedmem import scipy def f(a): a[0] = -a[0] if __name__ == '__main__': # Create the array N = int(10) unshared_arr = scipy.rand(N) arr = sharedmem.empty(N) arr[:] = unshared_arr.copy() print "Originally, the first two elements of arr = %s"%(arr[:2]) # Create, start, and finish the child process p = Process(target=f, args=(arr,)) p.start() p.join() # Print out the changed values print "Now, the first two elements of arr = %s"%arr[:2] # Perform some NumPy operation print arr.sum()