multiprocesamiento en python: compartir objetos grandes (por ejemplo, dataframe de pandas) entre múltiples procesos

Estoy usando multiproceso de Python, más precisamente

from multiprocessing import Pool p = Pool(15) args = [(df, config1), (df, config2), ...] #list of args - df is the same object in each tuple res = p.map_async(func, args) #func is some arbitrary function p.close() p.join() 

Este enfoque tiene un gran consumo de memoria; consumiendo prácticamente toda mi memoria RAM (en cuyo punto se vuelve extremadamente lento, por lo que el multiprocesamiento es bastante inútil). Supongo que el problema es que df es un objeto enorme (un gran dataframe de pandas) y se copia para cada proceso. He intentado usar multiprocessing.Value Valorar compartir el dataframe sin copiar

 shared_df = multiprocessing.Value(pandas.DataFrame, df) args = [(shared_df, config1), (shared_df, config2), ...] 

(como se sugiere en la memoria compartida multiproceso de Python ), pero eso me da TypeError: this type has no size (¿es lo mismo que compartir un objeto complejo entre procesos de Python?, al que desafortunadamente no entiendo la respuesta).

Estoy utilizando el multiprocesamiento por primera vez y tal vez mi comprensión no sea (todavía) lo suficientemente buena. ¿Es multiprocessing.Value realidad es incluso lo correcto para usar en este caso? He visto otras sugerencias (p. Ej., Cola) pero ahora estoy un poco confundido. ¿Qué opciones hay para compartir la memoria y cuál sería la mejor en este caso?

El primer argumento de Value es typecode_or_type . Eso se define como:

typecode_or_type determina el tipo del objeto devuelto: es un tipo de ctypes o un código de tipo de un carácter del tipo utilizado por el módulo de matriz. * args se pasa al constructor para el tipo.

Énfasis mío. Entonces, simplemente no puede poner un dataframe de pandas en un Value , tiene que ser un tipo de ctypes .

En su lugar, podría usar un gestor de procesos multiprocessing.Manager para administrar su instancia de dataframe de singleton a todos sus procesos. Hay algunas maneras diferentes de terminar en el mismo lugar: probablemente la más fácil es simplemente colocar su dataframe en el Namespace de Namespace del administrador.

 from multiprocessing import Manager mgr = Manager() ns = mgr.Namespace() ns.df = my_dataframe # now just give your processes access to ns, ie most simply # p = Process(target=worker, args=(ns, work_unit)) 

Ahora se puede acceder a su instancia de dataframe a cualquier proceso que pase una referencia al Administrador. O simplemente pase una referencia al Namespace , está más limpio.

Una cosa que no cubrí / no son los eventos y la señalización: si sus procesos necesitan esperar a que otros terminen de ejecutarse, deberá agregarlos. Aquí hay una página con algunos ejemplos de Event que también cubren con una un poco más de detalle cómo utilizar el Namespace de Namespace del administrador.

(tenga en cuenta que nada de esto aborda si el multiprocessing dará como resultado beneficios tangibles de rendimiento, esto solo le brinda las herramientas para explorar esa pregunta)

Puede compartir un dataframe de pandas entre procesos sin sobrecarga de memoria creando un proceso secundario data_handler. Este proceso recibe llamadas de otros hijos con solicitudes de datos específicos (es decir, una fila, una celda específica, una porción, etc.) de su objeto de dataframe muy grande. Solo el proceso data_handler mantiene su dataframe en la memoria a diferencia de un Gestor como el Espacio de nombres, lo que hace que el dataframe se copie en todos los procesos secundarios. Vea a continuación un ejemplo de trabajo. Esto se puede convertir en pool.

¿Necesitas una barra de progreso para esto? vea mi respuesta aquí: https://stackoverflow.com/a/55305714/11186769

 import time import Queue import numpy as np import pandas as pd import multiprocessing from random import randint #========================================================== # DATA HANDLER #========================================================== def data_handler( queue_c, queue_r, queue_d, n_processes ): # Create a big dataframe big_df = pd.DataFrame(np.random.randint( 0,100,size=(100, 4)), columns=list('ABCD')) # Handle data requests finished = 0 while finished < n_processes: try: # Get the index we sent in idx = queue_c.get(False) except Queue.Empty: continue else: if idx == 'finished': finished += 1 else: try: # Use the big_df here! B_data = big_df.loc[ idx, 'B' ] # Send back some data queue_r.put(B_data) except: pass # big_df may need to be deleted at the end. #import gc; del big_df; gc.collect() #========================================================== # PROCESS DATA #========================================================== def process_data( queue_c, queue_r, queue_d): data = [] # Save computer memory with a generator generator = ( randint(0,x) for x in range(100) ) for g in generator: """ Lets make a request by sending in the index of the data we want. Keep in mind you may receive another child processes return call, which is fine if order isnt important. """ #print(g) # Send an index value queue_c.put(g) # Handle the return call while True: try: return_call = queue_r.get(False) except Queue.Empty: continue else: data.append(return_call) break queue_c.put('finished') queue_d.put(data) #========================================================== # START MULTIPROCESSING #========================================================== def multiprocess( n_processes ): combined = [] processes = [] # Create queues queue_data = multiprocessing.Queue() queue_call = multiprocessing.Queue() queue_receive = multiprocessing.Queue() for process in range(n_processes): if process == 0: # Load your data_handler once here p = multiprocessing.Process(target = data_handler, args=(queue_call, queue_receive, queue_data, n_processes)) processes.append(p) p.start() p = multiprocessing.Process(target = process_data, args=(queue_call, queue_receive, queue_data)) processes.append(p) p.start() for i in range(n_processes): data_list = queue_data.get() combined += data_list for p in processes: p.join() # Your B values print(combined) if __name__ == "__main__": multiprocess( n_processes = 4 )