Multiprocesamiento en un modelo con dataframe como entrada

Quiero utilizar el multiprocesamiento en un modelo para obtener predicciones utilizando un dataframe como entrada. Tengo el siguiente código:

def perform_model_predictions(model, dataFrame, cores=4): try: with Pool(processes=cores) as pool: result = pool.map(model.predict, dataFrame) return result # return model.predict(dataFrame) except AttributeError: logging.error("AttributeError occurred", exc_info=True) 

El error que estoy recibiendo es:

 raise TypeError("sparse matrix length is ambiguous; use getnnz()" TypeError: sparse matrix length is ambiguous; use getnnz() or shape[0] 

Creo que el problema está en el hecho de que estoy pasando un dataframe como segundo parámetro a la función pool.map . Cualquier consejo o ayuda será apreciada.

El truco es dividir su dataframe en trozos. map espera una lista de objetos que serán procesados ​​por model.predict . Aquí hay un ejemplo completo de trabajo, con el modelo obviamente burlado:

 import numpy as np import pandas as pd from multiprocessing import Pool no_cores = 4 large_df = pd.concat([pd.Series(np.random.rand(1111)), pd.Series(np.random.rand(1111))], axis = 1) chunk_size = len(large_df) // no_cores + no_cores chunks = [df_chunk for g, df_chunk in large_df.groupby(np.arange(len(large_df)) // chunk_size)] class model(object): @staticmethod def predict(df): return np.random.randint(0,2) def perform_model_predictions(model, dataFrame, cores): try: with Pool(processes=cores) as pool: result = pool.map(model.predict, dataFrame) return result # return model.predict(dataFrame) except AttributeError: logging.error("AttributeError occurred", exc_info=True) perform_model_predictions(model, chunks, no_cores) 

Tenga en cuenta que la cantidad de fragmentos aquí se selecciona de manera que coincida con la cantidad de núcleos (o simplemente con cualquier número que desee asignar). De esta manera, cada núcleo obtiene una parte justa y el multiprocessing no dedica mucho tiempo a la serialización de objetos.

Si desea procesar cada fila ( pd.Series ) por separado, el tiempo dedicado a la serialización podría ser una preocupación. En tal caso, recomiendo usar joblib y leer documentos en sus diversos backends. No escribí en él, ya que parecía que querías predecir en pd.Dataframe .

Advertencia extra

Puede suceder que el multiprocessing , en lugar de obtener un mejor rendimiento, lo empeore. Ocurre en situaciones bastante raras cuando su model.predict llamadas a módulos externos que ellos mismos generan hilos. Escribí sobre el tema aquí . Larga historia corta, de nuevo joblib podría ser una respuesta.