Python Chunking CSV Multiproccessing de archivos

Estoy usando el siguiente código para dividir un archivo CSV en varios fragmentos (provenientes de aquí )

def worker(chunk): print len(chunk) def keyfunc(row): return row[0] def main(): pool = mp.Pool() largefile = 'Counseling.csv' num_chunks = 10 start_time = time.time() results = [] with open(largefile) as f: reader = csv.reader(f) reader.next() chunks = itertools.groupby(reader, keyfunc) while True: # make a list of num_chunks chunks groups = [list(chunk) for key, chunk in itertools.islice(chunks, num_chunks)] if groups: result = pool.map(worker, groups) results.extend(result) else: break pool.close() pool.join() 

Sin embargo, parece que el número de trozos siempre permanece constante independientemente del número de trozos que elijo usar. Por ejemplo, si elijo tener 1 o 10 fragmentos, siempre obtengo esta salida al procesar un archivo de muestra. Idealmente, me gustaría fragmentar un archivo para que se distribuya equitativamente.

Tenga en cuenta que el archivo real que estoy fragmentando tiene más de 13 millones de filas y por eso lo estoy procesando pieza por pieza. ¡Eso es un deber!

 6 7 1 ... 1 1 94 --- 0.101687192917 seconds --- 

De acuerdo con los comentarios , deseamos que cada proceso funcione en un segmento de 10000 filas. Eso no es demasiado difícil de hacer; vea la receta de iter/islice continuación. Sin embargo, el problema con el uso

 pool.map(worker, ten_thousand_row_chunks) 

es que pool.map intentará poner todos los trozos en una cola de tareas a la vez . Si esto requiere más memoria de la que está disponible, obtendrá un MemoryError . (Nota: pool.imap el mismo problema .)

Entonces, en lugar de eso, necesitamos llamar a pool.map iterativa, en partes de cada fragmento.

 import itertools as IT import multiprocessing as mp import csv def worker(chunk): return len(chunk) def main(): # num_procs is the number of workers in the pool num_procs = mp.cpu_count() # chunksize is the number of lines in a chunk chunksize = 10**5 pool = mp.Pool(num_procs) largefile = 'Counseling.csv' results = [] with open(largefile, 'rb') as f: reader = csv.reader(f) for chunk in iter(lambda: list(IT.islice(reader, chunksize*num_procs)), []): chunk = iter(chunk) pieces = list(iter(lambda: list(IT.islice(chunk, chunksize)), [])) result = pool.map(worker, pieces) results.extend(result) print(results) pool.close() pool.join() main() 

Cada chunk constará de hasta un chunksize*num_procs líneas desde el archivo. Estos son datos suficientes para dar a todos los trabajadores de la agrupación algo en lo que trabajar, pero no demasiado grandes como para causar un error MemoryError, dado que el chunksize no está configurado demasiado grande.

Cada chunk se divide en partes, y cada pieza consta de hasta el chunksize filas del archivo. Estas piezas son luego enviadas a pool.map .


¿Cómo funciona iter(lambda: list(IT.islice(iterator, chunksize)), []) :

Este es un modismo para agrupar un iterador en trozos de tamaño de chunksize. Veamos cómo funciona en un ejemplo:

 In [111]: iterator = iter(range(10)) 

Observe que cada vez que se IT.islice(iterator, 3) se IT.islice(iterator, 3) una nueva porción de 3 elementos del iterador:

 In [112]: list(IT.islice(iterator, 3)) Out[112]: [0, 1, 2] In [113]: list(IT.islice(iterator, 3)) Out[113]: [3, 4, 5] In [114]: list(IT.islice(iterator, 3)) Out[114]: [6, 7, 8] 

Cuando quedan menos de 3 elementos en el iterador, solo se devuelve lo que queda:

 In [115]: list(IT.islice(iterator, 3)) Out[115]: [9] 

Y si lo vuelves a llamar, obtienes una lista vacía:

 In [116]: list(IT.islice(iterable, 3)) Out[116]: [] 

lambda: list(IT.islice(iterator, chunksize)) es una función que devuelve la list(IT.islice(iterator, chunksize)) cuando se llama. Es un “one-liner” que es equivalente a

 def func(): return list(IT.islice(iterator, chunksize)) 

Finalmente, iter(callable, sentinel) devuelve otro iterador. Los valores producidos por este iterador son los valores devueltos por el invocable. Continúa produciendo valores hasta que el que se puede llamar devuelve un valor igual al centinela. Asi que

 iter(lambda: list(IT.islice(iterator, chunksize)), []) 

continuará devolviendo la list(IT.islice(iterator, chunksize)) valores list(IT.islice(iterator, chunksize)) hasta que ese valor sea la lista vacía:

 In [121]: iterator = iter(range(10)) In [122]: list(iter(lambda: list(IT.islice(iterator, 3)), [])) Out[122]: [[0, 1, 2], [3, 4, 5], [6, 7, 8], [9]] 

En primer lugar, itertools.groupby no tendrá ningún sentido real si los registros no están ya ordenados en la columna de clave. Además, si el requisito es simplemente dividir el archivo csv en un número predeterminado de filas y dárselo a un trabajador, entonces no tiene que hacer todo esto.

Una implementación simple será:

 import csv from multiprocessing import Pool def worker(chunk): print len(chunk) def emit_chunks(chunk_size, file_path): lines_count = 0 with open(file_path) as f: reader = csv.reader(f) chunk = [] for line in reader: lines_count += 1 chunk.append(line) if lines_count == chunk_size: lines_count = 0 yield chunk chunk = [] else: continue if chunk : yield chunk def main(): chunk_size = 10 gen = emit_chunks(chunk_size, 'c:/Temp/in.csv') p = Pool(5) p.imap(worker, gen) print 'Completed..' 

* Edición: cambiado a pool.imap en lugar de pool.map