Aprovechando la “Copia en escritura” para copiar datos a procesos de trabajo de multiprocesamiento.Pool ()

Tengo un poco de código Python multiprocessing que se ve un poco así:

 import time from multiprocessing import Pool import numpy as np class MyClass(object): def __init__(self): self.myAttribute = np.zeros(100000000) # basically a big memory struct def my_multithreaded_analysis(self): arg_lists = [(self, i) for i in range(10)] pool = Pool(processes=10) result = pool.map(call_method, arg_lists) print result def analyze(self, i): time.sleep(10) return i ** 2 def call_method(args): my_instance, i = args return my_instance.analyze(i) if __name__ == '__main__': my_instance = MyClass() my_instance.my_multithreaded_analysis() 

Después de leer las respuestas sobre cómo funciona la memoria en otras respuestas de StackOverflow como esta, el uso de memoria de multiprocesamiento de Python tenía la impresión de que no usaría la memoria en proporción a la cantidad de procesos que usé para el multiprocesamiento, ya que es de copia y escritura. No he modificado ninguno de los atributos de my_instance . Sin embargo, veo alta memoria para todos los procesos cuando ejecuto top dice que la mayoría de mis procesos están usando mucha memoria (esto es el máximo rendimiento de OSX, pero puedo replicar en Linux).

Mi pregunta es básicamente: ¿estoy interpretando esto correctamente en el sentido de que mi instancia de MyClass está duplicada en la piscina? Y si es así, ¿cómo puedo evitar esto? ¿No debería usar una construcción como esta? Mi objective es reducir el uso de memoria para un análisis computacional.

 PID COMMAND %CPU TIME #TH #WQ #PORT MEM PURG CMPRS PGRP PPID STATE 2494 Python 0.0 00:01.75 1 0 7 765M 0B 0B 2484 2484 sleeping 2493 Python 0.0 00:01.85 1 0 7 765M 0B 0B 2484 2484 sleeping 2492 Python 0.0 00:01.86 1 0 7 765M 0B 0B 2484 2484 sleeping 2491 Python 0.0 00:01.83 1 0 7 765M 0B 0B 2484 2484 sleeping 2490 Python 0.0 00:01.87 1 0 7 765M 0B 0B 2484 2484 sleeping 2489 Python 0.0 00:01.79 1 0 7 167M 0B 597M 2484 2484 sleeping 2488 Python 0.0 00:01.77 1 0 7 10M 0B 755M 2484 2484 sleeping 2487 Python 0.0 00:01.75 1 0 7 8724K 0B 756M 2484 2484 sleeping 2486 Python 0.0 00:01.78 1 0 7 9968K 0B 755M 2484 2484 sleeping 2485 Python 0.0 00:01.74 1 0 7 171M 0B 594M 2484 2484 sleeping 2484 Python 0.1 00:16.43 4 0 18 775M 0B 12K 2484 2235 sleeping 

Todo lo que se envíe a pool.map (y los métodos relacionados) no está utilizando recursos compartidos de copia en escritura. Los valores son “decapados” (mecanismo de serialización de Python) , enviados a través de tuberías a los procesos de trabajo y no seleccionados, lo que reconstruye el objeto en el niño desde cero. Por lo tanto, cada niño en este caso termina con una versión de copia en escritura de los datos originales (que nunca usa, porque se le dijo que usara la copia enviada a través de IPC), y una recreación personal de los datos originales que fueron Reconstruido en el niño y no se comparte.

Si desea aprovechar las ventajas de copiar y escribir de forking, no puede enviar datos (u objetos que hagan referencia a los datos) a través de la tubería. Debe almacenarlos en una ubicación que se pueda encontrar en el niño accediendo a sus propios objetos globales. Así por ejemplo:

 import time from multiprocessing import Pool import numpy as np class MyClass(object): def __init__(self): self.myAttribute = np.zeros(100000000) # basically a big memory struct def my_multithreaded_analysis(self): arg_lists = list(range(10)) # Don't pass self pool = Pool(processes=10) result = pool.map(call_method, arg_lists) print result def analyze(self, i): time.sleep(10) return i ** 2 def call_method(i): # Implicitly use global copy of my_instance, not one passed as an argument return my_instance.analyze(i) # Constructed globally and unconditionally, so the instance exists # prior to forking in commonly accessible location my_instance = MyClass() if __name__ == '__main__': my_instance.my_multithreaded_analysis() 

Al no pasarse a self , evita hacer copias y solo usa el objeto global único que fue copiado en escritura asignado en el elemento secundario. Si necesita más de un objeto, puede hacer una list global o una asignación de dict a las instancias del objeto antes de crear el grupo, luego pasar el índice o la clave que puede buscar el objeto como parte de los argumentos al pool.map . Luego, la función de trabajo utiliza el índice / clave (que tuvo que ser decapado y enviado al hijo a través de IPC) para buscar el valor (copia en escritura asignada) en el dict global (también copia en escritura asignada), así que usted copia información barata para buscar datos costosos en el niño sin copiarlos.

Alternativamente, para aprovechar los beneficios de copia y escritura de forking, al mismo tiempo que se conserva cierta apariencia de encapsulación, podría aprovechar los atributos de clase y los métodos de clase sobre los globals puros .

 import time from multiprocessing import Pool import numpy as np class MyClass(object): myAttribute = np.zeros(100000000) # basically a big memory struct # myAttribute is a class-attribute @classmethod def my_multithreaded_analysis(cls): arg_list = [i for i in range(10)] pool = Pool(processes=10) result = pool.map(analyze, arg_list) print result @classmethod def analyze(cls, i): time.sleep(10) # If you wanted, you could access cls.myAttribute w/o worry here. return i ** 2 """ We don't need this proxy step ! def call_method(args): my_instance, i = args return my_instance.analyze(i) """ if __name__ == '__main__': my_instance = MyClass() # Note that now you can instantiate MyClass anywhere in your app, # While still taking advantage of copy-on-write forking my_instance.my_multithreaded_analysis() 

Nota 1: Sí, admito que class-attributes class-methods son globales glorificados. Pero compra un poco de encapsulación …

Nota 2: En lugar de crear explícitamente las arg_lists anteriores, puede pasar implícitamente la instancia (self) a cada tarea creada por Pool , pasando el analyze(self) método de instancia enlazada analyze(self) a Pool.map() y dispararse en el pie incluso más fácil!