Python multiprocessing.Pool kill * específico * proceso de ejecución prolongada o locking

Necesito ejecutar un grupo de muchas conexiones y consultas de bases de datos paralelas. Me gustaría usar un Multiprocessing.Pool o concurrent.futures ProcessPoolExecutor. Python 2.7.5

En algunos casos, las solicitudes de consulta tardan demasiado tiempo o nunca terminarán (proceso colgado / zombie). Me gustaría finalizar el proceso específico desde Multiprocessing.Pool o concurrent.futures ProcessPoolExecutor que ha caducado.

Este es un ejemplo de cómo matar / reabastecer todo el grupo de procesos, pero lo ideal sería minimizar el problema de la CPU, ya que solo quiero eliminar un proceso específico de larga duración que no haya devuelto datos después de un tiempo de espera.

Por alguna razón, el código a continuación no parece poder terminar / unirse al grupo de procesos después de que todos los resultados se devuelven y completan. Puede tener que ver con matar procesos de trabajadores cuando se produce un tiempo de espera, sin embargo, el Grupo crea nuevos trabajadores cuando son asesinados y los resultados son los esperados.

from multiprocessing import Pool import time import numpy as np from threading import Timer import thread, time, sys def f(x): time.sleep(x) return x if __name__ == '__main__': pool = Pool(processes=4, maxtasksperchild=4) results = [(x, pool.apply_async(f, (x,))) for x in np.random.randint(10, size=10).tolist()] while results: try: x, result = results.pop(0) start = time.time() print result.get(timeout=5), '%d done in %f Seconds!' % (x, time.time()-start) except Exception as e: print str(e) print '%d Timeout Exception! in %f' % (x, time.time()-start) for p in pool._pool: if p.exitcode is None: p.terminate() pool.terminate() pool.join() 

No entiendo completamente tu pregunta. Dice que desea detener un proceso específico, pero luego, en su fase de manejo de excepciones, está llamando a terminar en todos los trabajos. No estoy seguro de por qué estás haciendo eso. Además, estoy bastante seguro de que el uso de variables internas de multiprocessing.Pool no es del todo segura. Habiendo dicho todo eso, creo que su pregunta es por qué este progtwig no termina cuando se agota el tiempo. Si ese es el problema, entonces lo siguiente es el truco:

 from multiprocessing import Pool import time import numpy as np from threading import Timer import thread, time, sys def f(x): time.sleep(x) return x if __name__ == '__main__': pool = Pool(processes=4, maxtasksperchild=4) results = [(x, pool.apply_async(f, (x,))) for x in np.random.randint(10, size=10).tolist()] result = None start = time.time() while results: try: x, result = results.pop(0) print result.get(timeout=5), '%d done in %f Seconds!' % (x, time.time()-start) except Exception as e: print str(e) print '%d Timeout Exception! in %f' % (x, time.time()-start) for i in reversed(range(len(pool._pool))): p = pool._pool[i] if p.exitcode is None: p.terminate() del pool._pool[i] pool.terminate() pool.join() 

El punto es que necesitas eliminar elementos de la piscina; simplemente llamar a terminar en ellos no es suficiente.

En su solución, está manipulando las variables internas de la agrupación. El grupo se basa en 3 subprocesos diferentes para poder operar correctamente, no es seguro intervenir en sus variables internas sin estar realmente al tanto de lo que está haciendo.

No hay una forma clara de detener el tiempo de espera de los procesos en los Grupos de Python estándar, pero hay implementaciones alternativas que exponen dicha característica.

Puedes echar un vistazo a las siguientes bibliotecas:

Guijarro

de billar

Para evitar el acceso a las variables internas, puede guardar multiprocessing.current_process().pid de la tarea en ejecución en la memoria compartida. Luego repita el proceso principal sobre multiprocessing.active_children() y elimine el pid destino si existe.
Sin embargo, después de dicha terminación externa de los trabajadores, se vuelven a crear, pero la agrupación se convierte en inaccesible y también requiere una terminación explícita antes de la join()

También me encontré con este problema.

El código original y la versión editada por @stacksia tienen el mismo problema: en ambos casos, se eliminarán todos los procesos que se pool._pool se pool._pool tiempo de espera para uno solo de los procesos (es decir, cuando se realice el bucle sobre pool._pool ).

Encuentra debajo mi solución. Implica crear un archivo .pid para cada proceso de trabajo como lo sugiere @luart. Funcionará si hay una manera de etiquetar cada proceso de trabajo (en el código a continuación, x hace este trabajo). Si alguien tiene una solución más elegante (como guardar PID en la memoria), compártala.

 #!/usr/bin/env python from multiprocessing import Pool import time, os import subprocess def f(x): PID = os.getpid() print 'Started:', x, 'PID=', PID pidfile = "/tmp/PoolWorker_"+str(x)+".pid" if os.path.isfile(pidfile): print "%s already exists, exiting" % pidfile sys.exit() file(pidfile, 'w').write(str(PID)) # Do the work here time.sleep(x*x) # Delete the PID file os.remove(pidfile) return x*x if __name__ == '__main__': pool = Pool(processes=3, maxtasksperchild=4) results = [(x, pool.apply_async(f, (x,))) for x in [1,2,3,4,5,6]] pool.close() while results: print results try: x, result = results.pop(0) start = time.time() print result.get(timeout=3), '%d done in %f Seconds!' % (x, time.time()-start) except Exception as e: print str(e) print '%d Timeout Exception! in %f' % (x, time.time()-start) # We know which process gave us an exception: it is "x", so let's kill it! # First, let's get the PID of that process: pidfile = '/tmp/PoolWorker_'+str(x)+'.pid' PID = None if os.path.isfile(pidfile): PID = str(open(pidfile).read()) print x, 'pidfile=',pidfile, 'PID=', PID # Now, let's check if there is indeed such process runing: for p in pool._pool: print p, p.pid if str(p.pid)==PID: print 'Found it still running!', p, p.pid, p.is_alive(), p.exitcode # We can also double-check how long it's been running with system 'ps' command:" tt = str(subprocess.check_output('ps -p "'+str(p.pid)+'" o etimes=', shell=True)).strip() print 'Run time from OS (may be way off the real time..) = ', tt # Now, KILL the m*$@r: p.terminate() pool._pool.remove(p) pool._repopulate_pool() # Let's not forget to remove the pidfile os.remove(pidfile) break pool.terminate() pool.join() 

Muchas personas sugieren guijarros. Se ve bien, pero solo está disponible para Python 3. Si alguien tiene una forma de importar guijarros para Python 2.6, sería genial.