pandas multiprocesamiento aplicar

Estoy tratando de usar el multiprocesamiento con el dataframe de pandas, que es dividir el dataframe en 8 partes. aplique alguna función a cada parte usando apply (con cada parte procesada en un proceso diferente).

EDIT: Aquí está la solución que finalmente encontré:

import multiprocessing as mp import pandas.util.testing as pdt def process_apply(x): # do some stuff to data here def process(df): res = df.apply(process_apply, axis=1) return res if __name__ == '__main__': p = mp.Pool(processes=8) split_dfs = np.array_split(big_df,8) pool_results = p.map(aoi_proc, split_dfs) p.close() p.join() # merging parts processed by different processes parts = pd.concat(pool_results, axis=0) # merging newly calculated parts to big_df big_df = pd.concat([big_df, parts], axis=1) # checking if the dfs were merged correctly pdt.assert_series_equal(parts['id'], big_df['id']) 

Una versión más genérica basada en la solución de autor, que permite ejecutarlo en cada función y dataframe:

 from multiprocessing import Pool from functools import partial import numpy as np def parallelize(data, func, num_of_processes=8): data_split = np.array_split(data, num_of_processes) pool = Pool(num_of_processes) data = pd.concat(pool.map(func, data_split)) pool.close() pool.join() return data def run_on_subset(func, data_subset): return data_subset.apply(func, axis=1) def parallelize_on_rows(data, func, num_of_processes=8): return parallelize(data, partial(run_on_subset, func), num_of_processes) 

Así que la siguiente línea:

 df.apply(some_func, axis=1) 

Se convertirá:

 parallelize_on_rows(df, some_func) 

Como no tengo mucho de su script de datos, esta es una conjetura, pero sugeriría usar p.map lugar de apply_async con la callback.

 p = mp.Pool(8) pool_results = p.map(process, np.array_split(big_df,8)) p.close() p.join() results = [] for result in pool_results: results.extend(result) 

También me encuentro con el mismo problema cuando uso multiprocessing.map() para aplicar la función a una parte diferente de un gran dataframe.

Solo quiero agregar varios puntos en caso de que otras personas encuentren el mismo problema que yo.

  1. recuerde agregar if __name__ == '__main__':
  2. ejecute el archivo en un archivo .py , si usa el ipython/jupyter notebook , no podrá ejecutar el multiprocessing (esto es cierto para mi caso, aunque no tengo la menor idea)