Carga paralela de archivos de entrada en Pandas Dataframe

Tengo un requisito, donde tengo tres archivos de entrada y necesito cargarlos dentro del dataframe de Pandas, antes de fusionar dos de los archivos en un solo dataframe.

La extensión del archivo siempre cambia, podría ser .txt una vez y .xlsx o .csv otra vez.

¿Cómo puedo ejecutar este proceso en paralelo para ahorrar el tiempo de espera / carga?

Este es mi código en este momento,

from time import time # to measure the time taken to run the code start_time = time() Primary_File = "//ServerA/Testing Folder File Open/Report.xlsx" Secondary_File_1 = "//ServerA/Testing Folder File Open/Report2.csv" Secondary_File_2 = "//ServerA/Testing Folder File Open/Report2.csv" import pandas as pd # to work with the data frames Primary_df = pd.read_excel (Primary_File) Secondary_1_df = pd.read_csv (Secondary_File_1) Secondary_2_df = pd.read_csv (Secondary_File_2) Secondary_df = Secondary_1_df.merge(Secondary_2_df, how='inner', on=['ID']) end_time = time() print(end_time - start_time) 

Me toma alrededor de 20 minutos cargar mi primary_df y secondary_df. Por lo tanto, estoy buscando una solución eficiente que posiblemente utilice el parallel processing para ahorrar tiempo. Lo cronometré mediante la operación de lectura y la mayoría del tiempo toma aproximadamente 18 minutos y 45 segundos.

Configuración de hardware: procesador Intel i5, RAM de 16 GB y SO de 64 bits

Pregunta hecha Elegible para recompensa: – Mientras busco un código de trabajo con pasos detallados – usando un paquete en el entorno de anaconda que permite cargar mis archivos de entrada en paralelo y almacenarlos en un dataframe de pandas por separado. Esto debería eventualmente ahorrar tiempo.

Prueba esto:

 from time import time import pandas as pd from multiprocessing.pool import ThreadPool start_time = time() pool = ThreadPool(processes=3) Primary_File = "//ServerA/Testing Folder File Open/Report.xlsx" Secondary_File_1 = "//ServerA/Testing Folder File Open/Report2.csv" Secondary_File_2 = "//ServerA/Testing Folder File Open/Report2.csv" # Define a function for the thread def import_xlsx(file_name): df_xlsx = pd.read_excel(file_name) # print(df_xlsx.head()) return df_xlsx def import_csv(file_name): df_csv = pd.read_csv(file_name) # print(df_csv.head()) return df_csv # Create two threads as follows Primary_df = pool.apply_async(import_xlsx, (Primary_File, )).get() Secondary_1_df = pool.apply_async(import_csv, (Secondary_File_1, )).get() Secondary_2_df = pool.apply_async(import_csv, (Secondary_File_2, )).get() Secondary_df = Secondary_1_df.merge(Secondary_2_df, how='inner', on=['ID']) end_time = time() 

Intente usar el código @ Cezary.Sz pero usando (borre las llamadas a .get() ), en su lugar:

 Primary_df_job = pool.apply_async(import_xlsx, (Primary_File, )) Secondary_1_df_job = pool.apply_async(import_csv, (Secondary_File_1, )) Secondary_2_df_job = pool.apply_async(import_csv, (Secondary_File_2, )) 

Entonces

 Secondary_1_df = Secondary_1_df_job.get() Secondary_2_df = Secondary_2_df_job.get() 

Y puede usar los marcos de datos, mientras Primary_df_job está cargando.

 Secondary_df = Secondary_1_df.merge(Secondary_2_df, how='inner', on=['ID']) 

Cuando necesites Primary_df en tu código, usa

 Primary_df = Primary_df_job.get() 

Esto bloqueará la ejecución hasta que Primary_df_job haya finalizado.

¿Por qué no usar asyncio sobre multiprocessing ?

En lugar de utilizar varios subprocesos, es posible que desee aprovechar primero el nivel de E / S con un Async CSV Dict Reader (que puede paralizarse utilizando el multiprocessing para varios archivos). Luego, puede concentrar los dictados y luego cargar estos diccionarios en pandas o cargar los dictados individuales en pandas y concat allí. Sin embargo, pandas no admite asyncio por lo que tendrá una pérdida de rendimiento en algún momento.

Desafortunadamente, debido a GIL (Global Interpreter Lock) en Python, varios subprocesos no se ejecutan simultáneamente: todos los subprocesos utilizan el mismo núcleo de una sola CPU . Eso significa que si crea varios subprocesos para cargar sus archivos, el tiempo total será igual (o incluso mayor) que el tiempo necesario para cargar esos archivos uno por uno.

Más sobre GIL: https://wiki.python.org/moin/GlobalInterpreterLock

Para acelerar el tiempo de carga, puede intentar cambiar de csv / excel a pickle (o HDF).

Usted da los detalles del hardware pero no la parte más interesante: la cantidad de discos que tiene, el tipo de RAID que tiene y el sistema de archivos desde el que está leyendo.

Si solo tiene un disco, no tiene RAID y un sistema de archivos regular (ext4, XFS, etc.), como el que tiene principalmente en las computadoras portátiles, no podrá boost el ancho de banda simplemente lanzando CPU (multiproceso o multiproceso) a la problema. El uso de varios subprocesos o E / S asíncronas ayudará a enmascarar un poco la latencia, pero no boostá el ancho de banda, porque es probable que ya lo esté saturando con un solo proceso de lectura.

Entonces, utilizando el código sugerido por @ Cezary.Sz, intente mover uno de los archivos a un almacenamiento externo USB 3.0 o al almacenamiento SDSX. Si está ejecutando en una estación de trabajo grande, mire los detalles del hardware para ver si hay varios discos disponibles, y si se ejecuta en un clúster grande, busque un sistema de archivos paralelo (BeeGFS, Lustre, etc.)