Procesando un solo archivo desde múltiples procesos

Tengo un único archivo de texto grande en el que quiero procesar cada línea (hacer algunas operaciones) y almacenarlas en una base de datos. Dado que un solo progtwig simple está tardando demasiado tiempo, quiero que se realice a través de múltiples procesos o subprocesos. Cada subproceso / proceso debe leer los DIFERENTES datos (diferentes líneas) de ese único archivo y hacer algunas operaciones en su parte de datos (líneas) y colocarlos en la base de datos para que al final, tenga toda la información procesada y mis datos. La base de datos se vuelca con los datos que necesito.

Pero no soy capaz de darme cuenta de cómo abordar esto.

Lo que buscas es un patrón de Productor / Consumidor

Ejemplo de enhebrado básico

Aquí hay un ejemplo básico utilizando el módulo de subprocesamiento (en lugar de multiprocesamiento)

import threading import Queue import sys def do_work(in_queue, out_queue): while True: item = in_queue.get() # process result = item out_queue.put(result) in_queue.task_done() if __name__ == "__main__": work = Queue.Queue() results = Queue.Queue() total = 20 # start for workers for i in xrange(4): t = threading.Thread(target=do_work, args=(work, results)) t.daemon = True t.start() # produce data for i in xrange(total): work.put(i) work.join() # get the results for i in xrange(total): print results.get() sys.exit() 

No compartirías el objeto de archivo con los hilos. Producirías trabajo para ellos al proporcionar la cola con líneas de datos. Luego, cada hilo recogería una línea, la procesaría y luego la devolvería en la cola.

Hay algunas instalaciones más avanzadas integradas en el módulo de multiprocesamiento para compartir datos, como listas y tipos especiales de cola . Existen compensaciones para el uso de multiprocesamiento frente a subprocesos y depende de si su trabajo está vinculado a la CPU o está vinculado a la IO.

Ejemplo de multiprocesamiento básico.

Aquí hay un ejemplo muy básico de un pool de multiprocesamiento.

 from multiprocessing import Pool def process_line(line): return "FOO: %s" % line if __name__ == "__main__": pool = Pool(4) with open('file.txt') as source_file: # chunk the work into batches of 4 lines at a time results = pool.map(process_line, source_file, 4) print results 

Un Pool es un objeto de conveniencia que gestiona sus propios procesos. Dado que un archivo abierto puede iterar sobre sus líneas, puede pasarlo a pool.map() , que lo recorrerá y entregará líneas a la función de trabajo. El mapa bloquea y devuelve el resultado completo cuando se hace. Tenga en cuenta que este es un ejemplo excesivamente simplificado y que pool.map() leerá todo el archivo en la memoria de una sola vez antes de entregar el trabajo. Si espera tener archivos grandes, tenga esto en cuenta. Hay formas más avanzadas de diseñar una configuración de productor / consumidor.

Manual “pool” con límite y reorganización de línea.

Este es un ejemplo manual de Pool.map , pero en lugar de consumir todo un iterable de una sola vez, puede establecer un tamaño de cola para que solo lo alimente pieza por pieza tan rápido como pueda procesar. También agregué los números de línea para que pueda rastrearlos y consultarlos si así lo desea, más adelante.

 from multiprocessing import Process, Manager import time import itertools def do_work(in_queue, out_list): while True: item = in_queue.get() line_no, line = item # exit signal if line == None: return # fake work time.sleep(.5) result = (line_no, line) out_list.append(result) if __name__ == "__main__": num_workers = 4 manager = Manager() results = manager.list() work = manager.Queue(num_workers) # start for workers pool = [] for i in xrange(num_workers): p = Process(target=do_work, args=(work, results)) p.start() pool.append(p) # produce data with open("source.txt") as f: iters = itertools.chain(f, (None,)*num_workers) for num_and_line in enumerate(iters): work.put(num_and_line) for p in pool: p.join() # get the results # example: [(1, "foo"), (10, "bar"), (0, "start")] print sorted(results) 

Aquí hay un ejemplo realmente estúpido que cociné:

 import os.path import multiprocessing def newlinebefore(f,n): f.seek(n) c=f.read(1) while c!='\n' and n > 0: n-=1 f.seek(n) c=f.read(1) f.seek(n) return n filename='gpdata.dat' #your filename goes here. fsize=os.path.getsize(filename) #size of file (in bytes) #break the file into 20 chunks for processing. nchunks=20 initial_chunks=range(1,fsize,fsize/nchunks) #You could also do something like: #initial_chunks=range(1,fsize,max_chunk_size_in_bytes) #this should work too. with open(filename,'r') as f: start_byte=sorted(set([newlinebefore(f,i) for i in initial_chunks])) end_byte=[i-1 for i in start_byte] [1:] + [None] def process_piece(filename,start,end): with open(filename,'r') as f: f.seek(start+1) if(end is None): text=f.read() else: nbytes=end-start+1 text=f.read(nbytes) # process text here. createing some object to be returned # You could wrap text into a StringIO object if you want to be able to # read from it the way you would a file. returnobj=text return returnobj def wrapper(args): return process_piece(*args) filename_repeated=[filename]*len(start_byte) args=zip(filename_repeated,start_byte,end_byte) pool=multiprocessing.Pool(4) result=pool.map(wrapper,args) #Now take your results and write them to the database. print "".join(result) #I just print it to make sure I get my file back ... 

La parte difícil aquí es asegurarse de que dividimos el archivo en caracteres de nueva línea para que no se pierda ninguna línea (o solo lea líneas parciales). Luego, cada proceso lee que es parte del archivo y devuelve un objeto que puede ser colocado en la base de datos por el hilo principal. Por supuesto, es posible que incluso tenga que hacer esta parte en trozos para no tener que guardar toda la información en la memoria de una vez. (esto se logra fácilmente: simplemente divida la lista de “argumentos” en X trozos y llame a pool.map(wrapper,chunk) – vea aquí )

Bueno, divida el único archivo grande en varios archivos más pequeños y haga que cada uno de ellos se procese en hilos separados.