python pool apply_async y map_async no se bloquean en la cola completa

Soy bastante nuevo en Python. Estoy usando el módulo de multiprocesamiento para leer líneas de texto en una entrada estándar, convertirlas de alguna manera y escribirlas en una base de datos. Aquí hay un fragmento de mi código:

batch = [] pool = multiprocessing.Pool(20) i = 0 for i, content in enumerate(sys.stdin): batch.append(content) if len(batch) >= 10000: pool.apply_async(insert, args=(batch,i+1)) batch = [] pool.apply_async(insert, args=(batch,i)) pool.close() pool.join() 

Ahora que todo funciona bien, hasta que puedo procesar enormes archivos de entrada (cientos de millones de líneas) que canalizo en mi progtwig de Python. En algún momento, cuando mi base de datos se vuelve más lenta, veo que la memoria se llena.

Después de jugar un poco, resultó que pool.apply_async y pool.map_async nunca se bloquean, por lo que la cola de las llamadas que se procesarán crece más y más.

¿Cuál es el enfoque correcto para mi problema? Espero un parámetro que pueda establecer, que bloquee la llamada pool.apply_async, tan pronto como se scope una cierta longitud de cola. AFAIR en Java se puede dar a ThreadPoolExecutor un BlockingQueue con una longitud fija para ese propósito.

¡Gracias!

En caso de que alguien termine aquí, así es como resolví el problema: dejé de usar el multiprocesamiento. Así es como lo hago ahora:

 #set amount of concurrent processes that insert db data processes = multiprocessing.cpu_count() * 2 #setup batch queue queue = multiprocessing.Queue(processes * 2) #start processes for _ in range(processes): multiprocessing.Process(target=insert, args=(queue,)).start() #fill queue with batches batch=[] for i, content in enumerate(sys.stdin): batch.append(content) if len(batch) >= 10000: queue.put((batch,i+1)) batch = [] if batch: queue.put((batch,i+1)) #stop processes using poison-pill for _ in range(processes): queue.put((None,None)) print "all done." 

en el método de inserción, el procesamiento de cada lote se envuelve en un bucle que se extrae de la cola hasta que recibe la píldora venenosa:

 while True: batch, end = queue.get() if not batch and not end: return #poison pill! complete! [process the batch] print 'worker done.' 

Las funciones apply_async y map_async están diseñadas para no bloquear el proceso principal. Para hacerlo, el Pool mantiene una Queue interna cuyo tamaño, desafortunadamente, es imposible de cambiar.

La forma en que se puede resolver el problema es mediante el uso de un Semaphore inicializado con el tamaño que desea que tenga la cola. Adquiere y libera el semáforo antes de alimentar a la piscina y después de que un trabajador haya completado la tarea.

Aquí hay un ejemplo trabajando con Python 2.6 o superior.

 from threading import Semaphore from multiprocessing import Pool def task_wrapper(f): """Python2 does not allow a callback for method raising exceptions, this wrapper ensures the code run into the worker will be exception free. """ try: return f() except: return None class TaskManager(object): def __init__(self, processes, queue_size): self.pool = Pool(processes=processes) self.workers = Semaphore(processes + queue_size) def new_task(self, f): """Start a new task, blocks if queue is full.""" self.workers.acquire() self.pool.apply_async(task_wrapper, args=(f, ), callback=self.task_done)) def task_done(self): """Called once task is done, releases the queue is blocked.""" self.workers.release() 

Otro ejemplo utilizando concurrent.futures implementación de grupos concurrent.futures .

apply_async devuelve un objeto AsyncResult , en el que puede wait :

 if len(batch) >= 10000: r = pool.apply_async(insert, args=(batch, i+1)) r.wait() batch = [] 

Aunque si desea hacer esto de una manera más limpia, debe usar un multiprocessing.Queue con un maxsize de 10000, y derivar una clase Worker del multiprocessing.Process que obtiene de esa cola.

No es bonito, pero puede acceder al tamaño de la cola interna y esperar hasta que esté por debajo del tamaño máximo deseado antes de agregar nuevos elementos:

 max_pool_queue_size = 20 for i in range(10000): pool.apply_async(some_func, args=(...)) while pool._taskqueue.qsize() > max_pool_queue_size: time.sleep(1)