ThreadPoolExecutor: ¿cómo limitar el tamaño máximo de la cola?

Estoy usando la clase ThreadPoolExecutor del paquete concurrent.futures

def some_func(arg): # does some heavy lifting # outputs some results from concurrent.futures import ThreadPoolExecutor with ThreadPoolExecutor(max_workers=1) as executor: for arg in range(10000000): future = executor.submit(some_func, arg) 

pero necesito limitar el tamaño de la cola de alguna manera, ya que no quiero que se creen millones de futuros a la vez, ¿hay una forma sencilla de hacerlo o debo atenerme a queue.Queue and threading package para lograr esto?

ThreadPoolExecutor de Python no tiene la función que estás buscando, pero la clase proporcionada se puede subdividir fácilmente de la siguiente manera para proporcionarla:

 class ThreadPoolExecutorWithQueueSizeLimit(futures.ThreadPoolExecutor): def __init__(self, maxsize=50, *args, **kwargs): super(ThreadPoolExecutorWithQueueSizeLimit, self).__init__(*args, **kwargs) self._work_queue = Queue.Queue(maxsize=maxsize) 

He estado haciendo esto cortando el rango. Aquí hay un ejemplo de trabajo.

 from time import time, strftime, sleep, gmtime from random import randint from itertools import islice from concurrent.futures import ThreadPoolExecutor, as_completed def nap(id, nap_length): sleep(nap_length) return nap_length def chunked_iterable(iterable, chunk_size): it = iter(iterable) while True: chunk = tuple(islice(it, chunk_size)) if not chunk: break yield chunk if __name__ == '__main__': startTime = time() range_size = 10000000 chunk_size = 10 nap_time = 2 # Iterate in chunks. # This consumes less memory and kicks back initial results sooner. for chunk in chunked_iterable(range(range_size), chunk_size): with ThreadPoolExecutor(max_workers=chunk_size) as pool_executor: pool = {} for i in chunk: function_call = pool_executor.submit(nap, i, nap_time) pool[function_call] = i for completed_function in as_completed(pool): result = completed_function.result() i = pool[completed_function] print('{} completed @ {} and slept for {}'.format( str(i + 1).zfill(4), strftime("%H:%M:%S", gmtime()), result)) print('==--- Script took {} seconds. ---=='.format( round(time() - startTime))) 

introduzca la descripción de la imagen aquí

La desventaja de este enfoque es que los trozos son sincrónicos. Todos los subprocesos de un fragmento deben completarse antes de agregar el siguiente fragmento al grupo.

debe usar un semáforo como este https://www.bettercodebytes.com/theadpoolexecutor-with-a-bounded-queue-in-python/

algo anda mal con la respuesta de andres.riancho , que si configuramos max_size of queue, cuando cerramos el grupo, self._work_queue.put (None) no puede poner el límite de max_size, por lo que nuestra encuesta nunca saldrá.

  def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown = True self._work_queue.put(None) if wait: for t in self._threads: t.join(sys.maxint)