Multiprocesamiento en Python mientras se limita el número de procesos en ejecución

Me gustaría ejecutar varias instancias de program.py simultáneamente, mientras limito la cantidad de instancias que se ejecutan al mismo tiempo (por ejemplo, a la cantidad de núcleos de CPU disponibles en mi sistema). Por ejemplo, si tengo 10 núcleos y tengo que hacer 1000 ejecuciones de program.py en total, solo se crearán y ejecutarán 10 instancias en un momento dado.

He intentado usar el módulo de multiprocesamiento, multiproceso y colas, pero no hay nada que me parezca que se preste a una implementación fácil. El mayor problema que tengo es encontrar una manera de limitar la cantidad de procesos que se ejecutan simultáneamente. Esto es importante porque si creo 1000 procesos a la vez, se convierte en equivalente a una bomba de bifurcación. No necesito que los resultados se devuelvan de los procesos mediante progtwigción (se envíen al disco) y todos los procesos se ejecutan de forma independiente.

¿Alguien puede darme sugerencias o un ejemplo de cómo podría implementar esto en Python, o incluso bash? Publico el código que he escrito hasta ahora usando colas, pero no funciona como es debido y puede que ya esté en el camino equivocado.

Muchas gracias.

Sé que mencionaste que el enfoque de Pool.map no tiene mucho sentido para ti. El mapa es solo una forma fácil de darle una fuente de trabajo, y una aplicación que se puede aplicar a cada uno de los elementos. La func para el mapa podría ser cualquier punto de entrada para realizar el trabajo real en el argumento dado.

Si eso no parece correcto para usted, tengo una respuesta bastante detallada aquí sobre el uso de un patrón de Productor-Consumidor: https://stackoverflow.com/a/11196615/496445

Esencialmente, creas una cola e inicias N número de trabajadores. Luego, alimenta la cola desde el hilo principal o crea un proceso Producer que alimenta la cola. Los trabajadores siguen tomando el trabajo de la cola y nunca habrá más trabajo simultáneo que la cantidad de procesos que ha iniciado.

También tiene la opción de poner un límite en la cola, de modo que bloquee al productor cuando ya haya demasiado trabajo pendiente, si necesita poner restricciones también en la velocidad y los recursos que consume el productor.

La función de trabajo que recibe la llamada puede hacer lo que quieras. Esto puede ser una envoltura alrededor de algún comando del sistema, o puede importar su lib de python y ejecutar la rutina principal. Existen sistemas de administración de procesos específicos que le permiten configurar configuraciones para ejecutar ejecutables arbitrarios con recursos limitados, pero este es solo un enfoque básico de Python para hacerlo.

Fragmentos de esa otra respuesta mía:

Piscina Básica:

 from multiprocessing import Pool def do_work(val): # could instantiate some other library class, # call out to the file system, # or do something simple right here. return "FOO: %s" % val pool = Pool(4) work = get_work_args() results = pool.map(do_work, work) 

Utilizando un gestor de procesos y productor.

 from multiprocessing import Process, Manager import time import itertools def do_work(in_queue, out_list): while True: item = in_queue.get() # exit signal if item == None: return # fake work time.sleep(.5) result = item out_list.append(result) if __name__ == "__main__": num_workers = 4 manager = Manager() results = manager.list() work = manager.Queue(num_workers) # start for workers pool = [] for i in xrange(num_workers): p = Process(target=do_work, args=(work, results)) p.start() pool.append(p) # produce data # this could also be started in a producer process # instead of blocking iters = itertools.chain(get_work_args(), (None,)*num_workers) for item in iters: work.put(item) for p in pool: p.join() print results 

Debe utilizar un supervisor de procesos. Un enfoque sería usar la API proporcionada por Circus para hacer eso “programáticamente”, el sitio de documentación ahora está fuera de línea, pero creo que es solo un problema temporal, de todos modos, puede usar el circo para manejar esto. Otro enfoque sería utilizar el supervisor y establecer el número de parámetros del proceso en función del número de núcleos que tiene.

Un ejemplo utilizando circo:

 from circus import get_arbiter arbiter = get_arbiter("myprogram", numprocesses=3) try: arbiter.start() finally: arbiter.stop() 

Bash script en lugar de Python, pero lo uso a menudo para el parallel processing simple:

 #!/usr/bin/env bash waitForNProcs() { nprocs=$(pgrep -f $procName | wc -l) while [ $nprocs -gt $MAXPROCS ]; do sleep $SLEEPTIME nprocs=$(pgrep -f $procName | wc -l) done } SLEEPTIME=3 MAXPROCS=10 procName=myPython.py for file in ./data/*.txt; do waitForNProcs ./$procName $file & done 

O para casos muy simples, otra opción es xargs donde P establece el número de procesos

 find ./data/ | grep txt | xargs -P10 -I SUB ./myPython.py SUB 

Si bien hay muchas respuestas sobre el uso de multiprocessing.pool, no hay muchos fragmentos de código sobre cómo usar multiprocessing.Process, que es más beneficioso cuando el uso de la memoria es importante. iniciar 1000 procesos sobrecargará la CPU y matará la memoria. Si cada proceso y sus líneas de datos requieren un uso intensivo de la memoria, el sistema operativo o el propio Python limitarán la cantidad de procesos paralelos. Desarrollé el siguiente código para limitar la cantidad simultánea de trabajos enviados a la CPU en lotes. El tamaño del lote se puede escalar proporcionalmente al número de núcleos de CPU. En mi PC con Windows, la cantidad de trabajos por lote puede ser eficiente hasta 4 veces la capacidad de CPU disponible.

 import multiprocessing def func_to_be_multiprocessed(q,data): q.put(('s')) q = multiprocessing.Queue() worker = [] for p in range(number_of_jobs): worker[p].append(multiprocessing.Process(target=func_to_be_multiprocessed, \ args=(q,data)...)) num_cores = multiprocessing.cpu_count() Scaling_factor_batch_jobs = 3.0 num_jobs_per_batch = num_cores * Scaling_factor_batch_jobs num_of_batches = number_of_jobs // num_jobs_per_batch for i_batch in range(num_of_batches): floor_job = i_batch * num_jobs_per_batch ceil_job = floor_job + num_jobs_per_batch for p in worker[floor_job : ceil_job]: worker.start() for p in worker[floor_job : ceil_job]: worker.join() for p in worker[ceil_job :]: worker.start() for p in worker[ceil_job :]: worker.join() for p in multiprocessing.active_children(): p.terminate() result = [] for p in worker: result.append(q.get()) 

El único problema es que, si alguno de los trabajos en cualquier lote no se pudo completar y conduce a una situación de locking, el rest de los lotes de trabajos no se iniciarán. Por lo tanto, la función que se va a procesar debe tener rutinas de manejo de errores adecuadas.