Objetos de memoria compartida en multiprocesamiento.

Supongamos que tengo un gran número en la matriz de memoria, tengo una función de func que toma en esta matriz gigante como entrada (junto con algunos otros parámetros). func con diferentes parámetros se puede ejecutar en paralelo. Por ejemplo:

 def func(arr, param): # do stuff to arr, param # build array arr pool = Pool(processes = 6) results = [pool.apply_async(func, [arr, param]) for param in all_params] output = [res.get() for res in results] 

Si uso la biblioteca de multiprocesamiento, entonces esa matriz gigante se copiará varias veces en diferentes procesos.

¿Hay una manera de permitir que diferentes procesos compartan la misma matriz? Este objeto de matriz es de solo lectura y nunca se modificará.

Lo que es más complicado, si arr no es una matriz, sino un objeto de python arbitrario, ¿hay alguna forma de compartirlo?

[Editado]

Leí la respuesta pero todavía estoy un poco confundido. Dado que fork () es copia en escritura, no debemos invocar ningún costo adicional al generar nuevos procesos en la biblioteca de multiprocesamiento de Python. Pero el siguiente código sugiere que hay una gran sobrecarga:

 from multiprocessing import Pool, Manager import numpy as np; import time def f(arr): return len(arr) t = time.time() arr = np.arange(10000000) print "construct array = ", time.time() - t; pool = Pool(processes = 6) t = time.time() res = pool.apply_async(f, [arr,]) res.get() print "multiprocessing overhead = ", time.time() - t; 

salida (y, por cierto, el costo aumenta a medida que aumenta el tamaño de la matriz, por lo que sospecho que todavía hay gastos generales relacionados con la copia de memoria):

 construct array = 0.0178790092468 multiprocessing overhead = 0.252444982529 

¿Por qué hay una sobrecarga tan grande, si no copiamos la matriz? ¿Y qué parte me salva la memoria compartida?

Si usa un sistema operativo que usa la semántica de la fork() copia en escritura fork() (como cualquier Unix común), siempre y cuando nunca altere su estructura de datos, estará disponible para todos los procesos secundarios sin ocupar memoria adicional. No tendrá que hacer nada especial (excepto asegurarse de no alterar el objeto).

Lo más eficiente que puede hacer para resolver su problema sería empaquetar su matriz en una estructura de matriz eficiente (usando numpy o array ), colocarla en la memoria compartida, envolverla con multiprocessing.Array . multiprocessing.Array y pasarla a sus funciones. Esta respuesta muestra cómo hacer eso .

Si desea un objeto compartido grabable , tendrá que envolverlo con algún tipo de sincronización o locking. multiprocessing proporciona dos métodos para hacer esto : uno que usa memoria compartida (adecuado para valores simples, arrays o ctypes) o un proxy de Manager , donde un proceso guarda la memoria y un administrador arbitra el acceso desde otros procesos (incluso a través de una red) .

El enfoque de Manager se puede usar con objetos Python arbitrarios, pero será más lento que el equivalente al uso de la memoria compartida porque los objetos deben ser serializados / deserializados y enviados entre procesos.

Hay una gran cantidad de bibliotecas y enfoques de parallel processing disponibles en Python . multiprocessing es una biblioteca excelente y bien redondeada, pero si tiene necesidades especiales, tal vez uno de los otros enfoques podría ser mejor.

Me encuentro con el mismo problema y escribí una pequeña clase de utilidad de memoria compartida para solucionarlo.

Estoy usando multiprocesamiento. Arreglo real (lockfree), y también el acceso a los arreglos no está sincronizado (lockfree), tenga cuidado de no disparar sus propios pies.

Con la solución obtengo aceleraciones por un factor de aproximadamente 3 en un i7 de cuatro núcleos.

Aquí está el código: siéntase libre de usarlo y mejorarlo, y por favor reporte cualquier error.

 ''' Created on 14.05.2013 @author: martin ''' import multiprocessing import ctypes import numpy as np class SharedNumpyMemManagerError(Exception): pass ''' Singleton Pattern ''' class SharedNumpyMemManager: _initSize = 1024 _instance = None def __new__(cls, *args, **kwargs): if not cls._instance: cls._instance = super(SharedNumpyMemManager, cls).__new__( cls, *args, **kwargs) return cls._instance def __init__(self): self.lock = multiprocessing.Lock() self.cur = 0 self.cnt = 0 self.shared_arrays = [None] * SharedNumpyMemManager._initSize def __createArray(self, dimensions, ctype=ctypes.c_double): self.lock.acquire() # double size if necessary if (self.cnt >= len(self.shared_arrays)): self.shared_arrays = self.shared_arrays + [None] * len(self.shared_arrays) # next handle self.__getNextFreeHdl() # create array in shared memory segment shared_array_base = multiprocessing.RawArray(ctype, np.prod(dimensions)) # convert to numpy array vie ctypeslib self.shared_arrays[self.cur] = np.ctypeslib.as_array(shared_array_base) # do a reshape for correct dimensions # Returns a masked array containing the same data, but with a new shape. # The result is a view on the original array self.shared_arrays[self.cur] = self.shared_arrays[self.cnt].reshape(dimensions) # update cnt self.cnt += 1 self.lock.release() # return handle to the shared memory numpy array return self.cur def __getNextFreeHdl(self): orgCur = self.cur while self.shared_arrays[self.cur] is not None: self.cur = (self.cur + 1) % len(self.shared_arrays) if orgCur == self.cur: raise SharedNumpyMemManagerError('Max Number of Shared Numpy Arrays Exceeded!') def __freeArray(self, hdl): self.lock.acquire() # set reference to None if self.shared_arrays[hdl] is not None: # consider multiple calls to free self.shared_arrays[hdl] = None self.cnt -= 1 self.lock.release() def __getArray(self, i): return self.shared_arrays[i] @staticmethod def getInstance(): if not SharedNumpyMemManager._instance: SharedNumpyMemManager._instance = SharedNumpyMemManager() return SharedNumpyMemManager._instance @staticmethod def createArray(*args, **kwargs): return SharedNumpyMemManager.getInstance().__createArray(*args, **kwargs) @staticmethod def getArray(*args, **kwargs): return SharedNumpyMemManager.getInstance().__getArray(*args, **kwargs) @staticmethod def freeArray(*args, **kwargs): return SharedNumpyMemManager.getInstance().__freeArray(*args, **kwargs) # Init Singleton on module load SharedNumpyMemManager.getInstance() if __name__ == '__main__': import timeit N_PROC = 8 INNER_LOOP = 10000 N = 1000 def propagate(t): i, shm_hdl, evidence = t a = SharedNumpyMemManager.getArray(shm_hdl) for j in range(INNER_LOOP): a[i] = i class Parallel_Dummy_PF: def __init__(self, N): self.N = N self.arrayHdl = SharedNumpyMemManager.createArray(self.N, ctype=ctypes.c_double) self.pool = multiprocessing.Pool(processes=N_PROC) def update_par(self, evidence): self.pool.map(propagate, zip(range(self.N), [self.arrayHdl] * self.N, [evidence] * self.N)) def update_seq(self, evidence): for i in range(self.N): propagate((i, self.arrayHdl, evidence)) def getArray(self): return SharedNumpyMemManager.getArray(self.arrayHdl) def parallelExec(): pf = Parallel_Dummy_PF(N) print(pf.getArray()) pf.update_par(5) print(pf.getArray()) def sequentialExec(): pf = Parallel_Dummy_PF(N) print(pf.getArray()) pf.update_seq(5) print(pf.getArray()) t1 = timeit.Timer("sequentialExec()", "from __main__ import sequentialExec") t2 = timeit.Timer("parallelExec()", "from __main__ import parallelExec") print("Sequential: ", t1.timeit(number=1)) print("Parallel: ", t2.timeit(number=1)) 

Este es el caso de uso previsto para Ray , que es una biblioteca para Python paralelo y distribuido. Bajo el capó, serializa los objetos utilizando el diseño de datos de Flecha de Apache (que es un formato de copia cero) y los almacena en un almacén de objetos de memoria compartida para que puedan acceder a ellos mediante múltiples procesos sin crear copias.

El código se vería como el siguiente.

 import numpy as np import ray ray.init() @ray.remote def func(array, param): # Do stuff. return 1 array = np.ones(10**6) # Store the array in the shared memory object store once # so it is not copied multiple times. array_id = ray.put(array) result_ids = [func.remote(array_id, i) for i in range(4)] output = ray.get(result_ids) 

Si no llama a ray.put entonces la matriz aún se almacenará en la memoria compartida, pero eso se hará una vez por invocación de func , que no es lo que desea.

Tenga en cuenta que esto funcionará no solo para las matrices, sino también para los objetos que las contienen , por ejemplo, los diccionarios que mapean los ints a las matrices de la siguiente manera.

Puede comparar el rendimiento de la serialización en Ray versus pickle ejecutando lo siguiente en IPython.

 import numpy as np import pickle import ray ray.init() x = {i: np.ones(10**7) for i in range(20)} # Time Ray. %time x_id = ray.put(x) # 2.4s %time new_x = ray.get(x_id) # 0.00073s # Time pickle. %time serialized = pickle.dumps(x) # 2.6s %time deserialized = pickle.loads(serialized) # 1.9s 

La serialización con Ray es solo un poco más rápida que el pickle, pero la deserialización es 1000x más rápida debido al uso de la memoria compartida (este número dependerá, por supuesto, del objeto).

Ver la documentación de Ray . Puedes leer más sobre la serialización rápida usando Ray y Arrow . Tenga en cuenta que soy uno de los desarrolladores de Ray.