Python multiprocesamiento con generador.

Estoy tratando de procesar un archivo (cada línea es un documento json). El tamaño del archivo puede subir hasta 100’s de mbs a gb’s. Así que escribí un código generador para obtener cada documento línea por línea desde el archivo.

def jl_file_iterator(file): with codecs.open(file, 'r', 'utf-8') as f: for line in f: document = json.loads(line) yield document 

Mi sistema tiene 4 núcleos, por lo que me gustaría procesar 4 líneas del archivo en paralelo. Actualmente tengo este código que toma 4 líneas a la vez y llama al código para parallel processing

 threads = 4 files, i = [], 1 for jl in jl_file_iterator(input_path): files.append(jl) if i % (threads) == 0: # pool.map(processFile, files) parallelProcess(files, o) files = [] i += 1 if files: parallelProcess(files, o) files = [] 

Este es mi código donde ocurre el procesamiento real

 def parallelProcess(files, outfile): processes = [] for i in range(len(files)): p = Process(target=processFile, args=(files[i],)) processes.append(p) p.start() for i in range(len(files)): processes[i].join() def processFile(doc): extractors = {} ... do some processing on doc o.write(json.dumps(doc) + '\n') 

Como puede ver, espero que todas las 4 líneas terminen de procesarse antes de enviar los siguientes 4 archivos para procesar. Pero lo que me gustaría hacer es que, tan pronto como un proceso finalice el procesamiento del archivo, deseo comenzar la siguiente línea que se asignará al procesador liberado. ¿Cómo puedo hacer eso?

PD: el problema es que es un generador que no puedo cargar todos los archivos y usar algo como un mapa para ejecutar los procesos.

Gracias por tu ayuda

Como @pvg dijo en un comentario, una cola (limitada) es la forma natural de mediar entre un productor y consumidores con diferentes velocidades, asegurando que todos se mantengan lo más ocupados posible pero sin dejar que el productor avance.

Aquí hay un ejemplo ejecutable autónomo. La cola está restringida a un tamaño máximo igual al número de procesos de trabajo. Si los consumidores se ejecutan mucho más rápido que el productor, podría ser sensato dejar que la cola se haga más grande que eso.

En su caso específico, probablemente tendría sentido pasar líneas a los consumidores y dejar que hagan la parte document = json.loads(line) en paralelo.

 import multiprocessing as mp NCORE = 4 def process(q, iolock): from time import sleep while True: stuff = q.get() if stuff is None: break with iolock: print("processing", stuff) sleep(stuff) if __name__ == '__main__': q = mp.Queue(maxsize=NCORE) iolock = mp.Lock() pool = mp.Pool(NCORE, initializer=process, initargs=(q, iolock)) for stuff in range(20): q.put(stuff) # blocks until q below its max size with iolock: print("queued", stuff) for _ in range(NCORE): # tell workers we're done q.put(None) pool.close() pool.join() 

Así que terminé corriendo esto con éxito. Creando trozos de líneas de mi archivo y ejecutando las líneas paralelamente. Publicándolo aquí para que pueda ser útil para alguien en el futuro.

 def run_parallel(self, processes=4): processes = int(processes) pool = mp.Pool(processes) try: pool = mp.Pool(processes) jobs = [] # run for chunks of files for chunkStart,chunkSize in self.chunkify(input_path): jobs.append(pool.apply_async(self.process_wrapper,(chunkStart,chunkSize))) for job in jobs: job.get() pool.close() except Exception as e: print e def process_wrapper(self, chunkStart, chunkSize): with open(self.input_file) as f: f.seek(chunkStart) lines = f.read(chunkSize).splitlines() for line in lines: document = json.loads(line) self.process_file(document) # Splitting data into chunks for parallel processing def chunkify(self, filename, size=1024*1024): fileEnd = os.path.getsize(filename) with open(filename,'r') as f: chunkEnd = f.tell() while True: chunkStart = chunkEnd f.seek(size,1) f.readline() chunkEnd = f.tell() yield chunkStart, chunkEnd - chunkStart if chunkEnd > fileEnd: break