Multiprocesamiento Compartir objetos no serializables entre procesos.

Hay tres preguntas como posibles duplicados (pero demasiado específicas):

  • Cómo configurar correctamente objetos proxy multiprocesamiento para objetos que ya existen
  • Compartir objeto con proceso (multiproceso)
  • ¿Puedo usar un ProcessPoolExecutor desde un futuro?

Al responder esta pregunta, las otras tres preguntas pueden ser contestadas. Con suerte me dejo claro:

Una vez que creé un objeto en algún proceso creado por multiprocesamiento:

  1. ¿Cómo paso una referencia a ese objeto a otro proceso?
  2. (no es tan importante) ¿Cómo me aseguro de que este proceso no muera mientras tengo una referencia?

Ejemplo 1 (resuelto)

from concurrent.futures import * def f(v): return lambda: v * v if __name__ == '__main__': with ThreadPoolExecutor(1) as e: # works with ThreadPoolExecutor l = list(e.map(f, [1,2,3,4])) print([g() for g in l]) # [1, 4, 9, 16] 

Ejemplo 2

Supongamos que f devuelve un objeto con estado mutable. Este objeto idéntico debe ser accesible desde otros procesos.

Ejemplo 3

Tengo un objeto que tiene un archivo abierto y un locking. ¿Cómo puedo otorgar acceso a otros procesos?

Recordatorio

No quiero que este error específico no aparezca. O una solución a este caso de uso específico. La solución debe ser lo suficientemente general como para compartir objetos inamovibles entre procesos. Los objetos pueden potencialmente crearse en cualquier proceso. Una solución que hace que todos los objetos se puedan mover y preservar la identidad también puede ser buena.

Cualquier sugerencia es bienvenida, cualquier solución parcial o fragmentos de código que indiquen cómo implementar una solución valen la pena. Así podemos crear una solución juntos.

Aquí hay un bash de resolver esto pero sin el multiprocesamiento: https://github.com/niccokunzmann/pynet/blob/master/documentation/done/tools.rst

Preguntas

¿Qué quieres que hagan los otros procesos con las referencias?

Las referencias se pueden pasar a cualquier otro proceso creado con multiprocesamiento (duplicado 3). Uno puede acceder a los atributos, llame a la referencia. Los accesos accedidos pueden o no ser apoderados.

¿Cuál es el problema con solo usar un proxy?

Tal vez no hay problema sino un reto. Mi impresión fue que un proxy tiene un administrador y que un administrador tiene su propio proceso, por lo que el objeto que no se puede serializar debe ser serializado y transferido (parcialmente resuelto con StacklessPython / fork). También existen proxies para objetos especiales – es difícil pero no imposible construir un proxy para todos los objetos (solucionable).

¿Solución? – Proxy + Manager?

Eric Urban demostró que la serialización no es el problema. El verdadero desafío está en los ejemplos 2 y 3: la sincronización de estado. Mi idea de una solución sería crear una clase de proxy especial para un administrador. Esta clase proxy

  1. Toma un constuctor para objetos inserializables.
  2. toma un objeto serializable y lo transfiere al proceso del administrador.
  3. (problema) de acuerdo con 1. el objeto unserializable se debe crear en el proceso del administrador.

La mayoría de las veces no es realmente deseable pasar la referencia de un objeto existente a otro proceso. En su lugar, crea la clase que desea compartir entre procesos:

 class MySharedClass: # stuff... 

Entonces haces un administrador de proxy como este:

 import multiprocessing.managers as m class MyManager(m.BaseManager): pass # Pass is really enough. Nothing needs to be done here. 

Entonces registras tu clase en ese Gerente, así:

 MyManager.register("MySharedClass", MySharedClass) 

Luego, una vez que se inicie y comience el administrador, con manager.start() puede crear instancias compartidas de su clase con manager.MySharedClass . Esto debería funcionar para todas las necesidades. El proxy devuelto funciona exactamente como los objetos originales, excepto por algunas excepciones descritas en la documentación .

Antes de leer esta respuesta, tenga en cuenta que la solución explicada es terrible. Tenga en cuenta la advertencia al final de la respuesta.

Encontré una manera de compartir el estado de un objeto a través de multiprocessing.Array . multiprocessing.Array . Así que hice esta clase que comparte de forma transparente su estado a través de todos los procesos:

 import multiprocessing as m import pickle class Store: pass class Shareable: def __init__(self, size = 2**10): object.__setattr__(self, 'store', m.Array('B', size)) o = Store() # This object will hold all shared values s = pickle.dumps(o) store(object.__getattribute__(self, 'store'), s) def __getattr__(self, name): s = load(object.__getattribute__(self, 'store')) o = pickle.loads(s) return getattr(o, name) def __setattr__(self, name, value): s = load(object.__getattribute__(self, 'store')) o = pickle.loads(s) setattr(o, name, value) s = pickle.dumps(o) store(object.__getattribute__(self, 'store'), s) def store(arr, s): for i, ch in enumerate(s): arr[i] = ch def load(arr): l = arr[:] return bytes(arr) 

Puede pasar instancias de esta clase (y sus subclases) a cualquier otro proceso y sincronizará su estado a través de todos los procesos. Esto fue probado con este código:

 class Foo(Shareable): def __init__(self): super().__init__() self.f = 1 def foo(self): self.f += 1 def f(s): sf += 1 if __name__ == '__main__': import multiprocessing as m import time s = Foo() print(sf) p = m.Process(target=f, args=(s,)) p.start() time.sleep(1) print(sf) 

La “magia” de esta clase es que almacena todos sus atributos en otra instancia de la clase Store . Esta clase no es muy especial. Es solo una clase que puede tener atributos arbitrarios. (Un dict habría hecho también.)

Sin embargo, esta clase tiene algunas peculiaridades realmente desagradables. Encontré dos

La primera peculiaridad es que debe especificar la cantidad de espacio que ocupará la instancia de la Store como máximo. Esto se debe a que multiprocessing.Array tiene un tamaño estático. Por lo tanto, el objeto que puede ser decapado solo puede ser tan grande como la matriz.

El segundo capricho es que no puede usar esta clase con ProcessPoolExecutors o grupos simples. Si intentas hacer esto, obtienes un error:

 >>> s = Foo() >>> with ProcessPoolExecutor(1) as e: ... e.submit(f, args=(s,)) ...  Traceback (most recent call last):  RuntimeError: SynchronizedArray objects should only be shared between processes through inheritance 

advertencia
Probablemente no debería usar este enfoque, ya que usa una cantidad incontrolable de memoria, es demasiado complicado en comparación con el uso de un proxy (vea mi otra respuesta) y podría fallar de manera espectacular.

Sólo tiene que utilizar python sin stack. Puede serializar casi cualquier cosa con pickle , incluyendo funciones. Aquí serializo y deserializo un lambda usando el módulo pickle . Esto es similar a lo que intentas hacer en tu ejemplo.

Aquí está el enlace de descarga de Stackless Python http://www.stackless.com/wiki/Download

 Python 2.7.5 Stackless 3.1b3 060516 (default, Sep 23 2013, 20:17:03) [GCC 4.6.3] on linux2 Type "help", "copyright", "credits" or "license" for more information. >>> f = 5 >>> g = lambda : f * f >>> g() 25 >>> import pickle >>> p = pickle.dumps(g) >>> m = pickle.loads(p) >>> m() 25 >>>