Notificar a apio tarea de cierre trabajador

Estoy usando apio 2.4.1 con python 2.6, el servidor de rabbitmq y django. Me gustaría que mi tarea fuera capaz de limpiar correctamente si el trabajador se cierra. Por lo que sé, no se puede suministrar un destructor de tareas, así que intenté conectarme a la señal worker_shutdown .

Nota: AbortableTask solo funciona con el backend de la base de datos, así que no puedo usar eso.

from celery.signals import worker_shutdown @task def mytask(*args) obj = DoStuff() def shutdown_hook(*args): print "Worker shutting down" # cleanup nicely obj.stop() worker_shutdown.connect(shutdown_hook) # blocking call that monitors a network connection obj.stuff() 

Sin embargo, el gancho de cierre nunca se llama. Ctrl-C’ing el trabajador no mata la tarea y tengo que matarlo manualmente desde el shell.

Entonces, si esta no es la forma correcta de hacerlo, ¿cómo permito que las tareas se cierren correctamente?

worker_shutdown solo lo envía MainProcess , no los trabajadores de la agrupación secundaria. Todas las señales worker_* , except for worker_process_init , se refieren a MainProcess .

Sin embargo, el gancho de cierre nunca se llama. Ctrl-C’ing el trabajador no mata la tarea y tengo que matarlo manualmente desde el shell.

El trabajador nunca termina una tarea bajo un cierre normal (en caliente). Incluso si una tarea tarda días en completarse, el trabajador no completará el cierre hasta que se complete. Puede configurar --soft-time-limit , o – --time-limit para indicar a la instancia cuándo está bien terminar la tarea.

Entonces, para agregar cualquier tipo de proceso de limpieza de procesos, primero debe asegurarse de que las tareas puedan completarse. Como la limpieza no se llamaría antes de que eso suceda.

Para agregar un paso de limpieza a los procesos de trabajo en grupo, puede usar algo como:

 from celery import platforms from celery.signals import worker_process_init def cleanup_after_tasks(signum, frame): # reentrant code here (see http://docs.python.org/library/signal.html) def install_pool_process_sighandlers(**kwargs): platforms.signals["TERM"] = cleanup_after_tasks platforms.signals["INT"] = cleanup_after_tasks worker_process_init.connect(install_pool_process_sighandlers)