La forma más fácil de leer archivos csv con multiprocesamiento en Pandas

Aquí está mi pregunta.
Con un montón de archivos .csv (u otros archivos). Pandas es una forma fácil de leerlos y guardarlos en formato Dataframe . Pero cuando la cantidad de archivos era enorme, quiero leerlos con multiprocesamiento para ahorrar algo de tiempo.

Mi primer bash

Divido manualmente los archivos en diferentes caminos. Utilizando por separado:

 os.chdir("./task_1) files = os.listdir('.') files.sort() for file in files: filename,extname = os.path.splitext(file) if extname == '.csv': f = pd.read_csv(file) df = (f.VALUE.as_matrix()).reshape(75,90) 

Y luego combinarlos.

¿Cómo ejecutarlos con pool para lograr mi problema?
Cualquier consejo sería apreciado!

Utilizando Pool :

 import os import pandas as pd from multiprocessing import Pool # wrap your csv importer in a function that can be mapped def read_csv(filename): 'converts a filename to a pandas dataframe' return pd.read_csv(filename) def main(): # set up your pool pool = Pool(processes=8) # or whatever your hardware can support # get a list of file names files = os.listdir('.') file_list = [filename for filename in files if filename.split('.')[1]=='csv'] # have your pool map the file names to dataframes df_list = pool.map(read_csv, file_list) # reduce the list of dataframes to a single dataframe combined_df = pd.concat(df_list, ignore_index=True) if __name__ == '__main__': main() 

dask biblioteca dask está diseñada para abordar no solo su problema, sino también su problema.

Si no está en contra de usar otra biblioteca, podría usar el marco de trabajo de Graphlab . Esto crea un objeto similar a los marcos de datos que es muy rápido para leer datos si el rendimiento es un gran problema.

No consigo que map / map_async funcione, pero logré trabajar con apply_async .

Dos formas posibles (no tengo idea de cuál es mejor):

  • A) Concat al final.
  • B) Concat durante

Encuentro glob fáciles de listar y archivos fitler desde un directorio

 from glob import glob import pandas as pd from multiprocessing import Pool folder = "./task_1/" # note the "/" at the end file_list = glob(folder+'*.xlsx') def my_read(filename): f = pd.read_csv(filename) return (f.VALUE.as_matrix()).reshape(75,90) #DF_LIST = [] # A) end DF = pd.DataFrame() # B) during def DF_LIST_append(result): #DF_LIST.append(result) # A) end global DF # B) during DF = pd.concat([DF,result], ignore_index=True) # B) during pool = Pool(processes=8) for file in file_list: pool.apply_async(my_read, args = (file,), callback = DF_LIST_append) pool.close() pool.join() #DF = pd.concat(DF_LIST, ignore_index=True) # A) end print(DF.shape)