Mesa de lectura paralela en pandas

¿Hay una manera de paralelizar una invocación de read_table ()? En mi caso es CPU enlazado debido a la fecha de análisis. No veo ninguna forma de lograrlo leyendo los documentos. Lo único que viene a la mente es dividir el archivo de entrada, llamar a read_table en paralelo y luego concatenar los marcos de datos.

Esto leerá los archivos CSV en paralelo y los concatenará. El bit molesto es que no manejará tipos numpy , por lo que no puede analizar fechas. He estado luchando con el mismo problema, pero hasta ahora parece que las bibliotecas como execnet no pueden manejar tipos que no están incorporados. Es por eso que convierto DataFrames en json antes de enviar. Elimina los tipos básicos de Python.

Edición: si necesita analizar las fechas, tal vez un enfoque más sensato sería leer de forma remota los archivos CSV , analizar las fechas y guardarlas como pickle en el disco duro. Luego puedes leer los archivos de pickle en el proceso principal y concatenarlos. No he intentado eso para ver si conduciría a una ganancia en el rendimiento.

remote_read_csv.py

 import cPickle as pickle if __name__ == '__channelexec__': reader = pickle.loads(channel.receive()) for filename in channel: channel.send(reader(filename).to_json()) 

Esto a continuación hace uso del módulo anterior. Lo probé en IPython.

 from pandas import DataFrame, concat, read_csv, read_json from numpy import random import execnet import remote_read_csv import cPickle as pickle import itertools import psutil ### Create dummy data and save to CSV def rdf(): return DataFrame((random.rand(4, 3) * 100).astype(int)) d1 = rdf() d2 = rdf() d3 = rdf() dfsl = [d1, d2, d3] names = 'd1.csv d2.csv d3.csv'.split() for i in range(3): dfsl[i].to_csv(names[i]) ### Read CSV files in separate threads then concatenate reader = pickle.dumps(read_csv) def set_gateways(remote_module, *channel_sends): gateways = [] channels = [] for i in range(psutil.NUM_CPUS): gateways.append(execnet.makegateway()) channels.append(gateways[i].remote_exec(remote_module)) for send in channel_sends: channels[i].send(send) return (gateways, channels) def para_read(names): gateways, channels = set_gateways(remote_read_csv, reader) mch = execnet.MultiChannel(channels) queue = mch.make_receive_queue() channel_ring = itertools.cycle(mch) for f in names: channel = channel_ring.next() channel.send(f) dfs = [] for i in range(len(names)): channel, df = queue.get() dfs.append(df) [gw.exit() for gw in gateways] return concat([read_json(i) for i in dfs], keys=names) para_read(names)