Cómo generar futuro solo si hay trabajadores libres disponibles

Estoy intentando enviar información extraída de las líneas de un archivo grande a un proceso que se ejecuta en algún servidor.

Para acelerar esto, me gustaría hacer esto con algunos hilos en paralelo.

Usando el backport Python 2.7 de concurrent.futures intenté esto:

f = open("big_file") with ThreadPoolExecutor(max_workers=4) as e: for line in f: e.submit(send_line_function, line) f.close() 

Sin embargo, esto es problemático, porque todos los futuros se envían instantáneamente, por lo que mi máquina se queda sin memoria, porque el archivo completo se carga en la memoria.

Mi pregunta es si hay una manera fácil de enviar un nuevo futuro solo cuando haya un trabajador libre disponible.

Podrías iterar sobre trozos del archivo usando

 for chunk in zip(*[f]*chunksize): 

(Esta es una aplicación de la receta del mero , que recostack elementos del iterador f en grupos de tamaño de tamaño de chunksize . Nota: Esto no consume todo el archivo de una vez, ya que zip devuelve un iterador en Python3).


 import concurrent.futures as CF import itertools as IT import logging logger = logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG, format='[%(asctime)s %(threadName)s] %(message)s', datefmt='%H:%M:%S') def worker(line): line = line.strip() logger.info(line) chunksize = 1024 with CF.ThreadPoolExecutor(max_workers=4) as executor, open("big_file") as f: for chunk in zip(*[f]*chunksize): futures = [executor.submit(worker, line) for line in chunk] # wait for these futures to complete before processing another chunk CF.wait(futures) 

Ahora, en los comentarios, usted señala acertadamente que esto no es óptimo. Podría haber algún trabajador que requiera mucho tiempo y que retenga una gran cantidad de trabajos.

Por lo general, si cada llamada al trabajador toma aproximadamente la misma cantidad de tiempo, entonces esto no es un gran problema. Sin embargo, aquí hay una forma de avanzar el identificador de archivos a pedido. Utiliza una threading.Condition para notificar al sprinkler para que avance el identificador de archivo.

 import logging import threading import Queue logger = logging.getLogger(__name__) logging.basicConfig(level=logging.DEBUG, format='[%(asctime)s %(threadName)s] %(message)s', datefmt='%H:%M:%S') SENTINEL = object() def worker(cond, queue): for line in iter(queue.get, SENTINEL): line = line.strip() logger.info(line) with cond: cond.notify() logger.info('notify') def sprinkler(cond, queue, num_workers): with open("big_file") as f: for line in f: logger.info('advancing filehandle') with cond: queue.put(line) logger.info('waiting') cond.wait() for _ in range(num_workers): queue.put(SENTINEL) num_workers = 4 cond = threading.Condition() queue = Queue.Queue() t = threading.Thread(target=sprinkler, args=[cond, queue, num_workers]) t.start() threads = [threading.Thread(target=worker, args=[cond, queue])] for t in threads: t.start() for t in threads: t.join()