Devolución de llamada para el apio apply_async

Utilizo celery en mi aplicación para ejecutar tareas periódicas. Veamos un ejemplo simple a continuación.

 from myqueue import Queue @perodic_task(run_every=timedelta(minutes=1)) def process_queue(): queue = Queue() uid, questions = queue.pop() if uid is None: return job = group(do_stuff(q) for q in questions) job.apply_async() def do_stuff(question): try: ... except: ... raise 

Como puede ver en el ejemplo anterior, uso celery para ejecutar tareas asíncronas, pero (ya que es una cola) necesito hacer queue.fail(uid) en caso de excepción en do_stuff o queue.ack(uid) contrario. En esta situación, sería muy claro y útil tener alguna callback de mi tarea en ambos casos: on_failure y on_success .

Vi algo de documentación , pero nunca vi prácticas de uso de devoluciones de llamada con apply_async . ¿Es posible hacer eso?

Subclase la clase de tarea y sobrecargue las funciones on_success y on_failure:

 class CallbackTask(Task): def on_success(self, retval, task_id, args, kwargs): pass def on_failure(self, exc, task_id, args, kwargs, einfo): pass @celery.task(base=CallbackTask) # this does the trick def add(x, y): return x + y 

Puede especificar devoluciones de llamada de éxito y error a través del enlace y link_err kwargs cuando llame a apply_async. Los documentos de apio incluyen un ejemplo claro: http://docs.celeryproject.org/en/latest/userguide/calling.html#linking-callbacks-errbacks