Multiprocesamiento y memoria de Python.

Estoy usando multiprocessing.imap_unordered para realizar un cálculo en una lista de valores:

 def process_parallel(fnc, some_list): pool = multiprocessing.Pool() for result in pool.imap_unordered(fnc, some_list): for x in result: yield x pool.terminate() 

Cada llamada a fnc devuelve un objeto ENORME como resultado, por diseño. Puedo almacenar N instancias de tal objeto en la RAM, donde N ~ cpu_count, pero no mucho más (no cientos).

Ahora, usar esta función requiere demasiada memoria. La memoria se gasta por completo en el proceso principal, no en los trabajadores.

¿Cómo almacena imap_unordered los resultados finales? Me refiero a los resultados que ya fueron devueltos por los trabajadores pero que aún no se han transmitido al usuario. Pensé que era inteligente y solo los computé “perezosamente” según fuera necesario, pero aparentemente no.

Parece que, dado que no puedo consumir los resultados de process_parallel lo suficientemente rápido, el grupo mantiene en cola estos enormes objetos desde fnc algún lugar, internamente, y luego explota. Hay alguna manera de evitar esto? ¿Limitar su cola interna de alguna manera?


Estoy usando Python2.7. Aclamaciones.

Como puede ver al buscar en el archivo fuente correspondiente ( python2.7/multiprocessing/pool.py ), el IMapUnorderedIterator utiliza una instancia de collections.deque para almacenar los resultados. Si entra un nuevo elemento, se agrega y elimina en la iteración.

Como sugirió, si entra otro objeto enorme mientras el subproceso principal aún está procesando el objeto, también se almacenarán en la memoria.

Lo que podrías probar es algo como esto:

 it = pool.imap_unordered(fnc, some_list) for result in it: it._cond.acquire() for x in result: yield x it._cond.release() 

Esto debería hacer que la tarea-resultado-receptor-hilo se bloquee mientras procesa un ítem si está tratando de poner el siguiente objeto en el deque. Por lo tanto, no debe haber más de dos de los grandes objetos en la memoria. Si eso funciona para tu caso, no lo sé;)

La solución más simple que se me ocurre sería agregar un cierre para envolver su función fnc que usaría un semáforo para controlar el número total de ejecuciones simultáneas de trabajos que se pueden ejecutar al mismo tiempo (supongo que el proceso / subproceso principal incrementará el semáforo). El valor del semáforo podría calcularse según el tamaño del trabajo y la memoria disponible.