Apio detener la ejecución de una cadena.

Tengo una tarea check_orders que se ejecuta periódicamente. Hace un grupo de tareas para que pueda medir el tiempo de ejecución de las tareas, y realizar algo cuando están terminadas (este es el propósito de res.join [1] y grouped_subs) Las tareas que se agrupan son pares de Tareas encadenadas.

Lo que quiero es para cuando la primera tarea no cumpla con una condición (falla) no ejecute la segunda tarea en la cadena. No puedo entender esto por mi vida y siento que esta es una funcionalidad bastante básica para un administrador de colas de trabajo. Cuando bash las cosas que he comentado después de [2] (generar excepciones, eliminar las devoluciones de llamada) … nos quedamos atascados en la unión () en check_orders por alguna razón (rompe el grupo). También he intentado establecer ignore_result en False para todas estas tareas, pero aún no funciona.

@task(ignore_result=True) def check_orders(): # check all the orders and send out appropriate notifications grouped_subs = [] for thingy in things: ... grouped_subs.append(chain(is_room_open.subtask((args_sub_1, )), notify.subtask((args_sub_2, ), immutable=True))) res = group(grouped_subs).apply_async() res.join() #[1] logger.info('Done checking orders at %s' % current_task.request.id)) @task(ignore_result=True) def is_room_open(args_sub_1): #something time consuming if http_req_and_parse(args_sub_1): # go on and do the notify task return True else: # [2] # STOP THE CHAIN SOMEHOW! Don't execute the rest of the chain, how? # None of the following things work: # is_room_open.update_state(state='FAILURE') # raise celery.exceptions.Ignore() # raise Exception('spam', 'eggs') # current_task.request.callbacks[:] = [] @task(ignore_result=True) def notify(args_sub_2): # something else time consuming, only do this if the first part of the chain # passed a test (the chained tasks before this were 'successful' notify_user(args_sub_2) 

En mi opinión, este es un caso de uso común que no recibe suficiente cariño en la documentación.

Suponiendo que desea abortar una cadena a mitad de camino mientras sigue reportando SUCCESS como estado de las tareas completadas, y no envía ningún registro de errores o lo que sea (de lo contrario, solo puede generar una excepción), una forma de lograrlo es:

 @app.task(bind=True) # Note that we need bind=True for self to work def task1(self, other_args): #do_stuff if end_chain: self.request.callbacks = None return #Other stuff to do if end_chain is False 

Así que en tu ejemplo:

 @app.task(ignore_result=True, bind=True) def is_room_open(self, args_sub_1): #something time consuming if http_req_and_parse(args_sub_1): # go on and do the notify task return True else: self.request.callbacks = None 

Trabajará. Tenga en cuenta que en lugar de ignore_result=True y subtask() puede usar el acceso directo .si() como lo indica @ abbasov-alexander

Editado para trabajar con el modo EAGER, según lo sugerido por @PhilipGarnero en los comentarios.

Es increíble que un caso tan común no se trate en ninguna documentación oficial. Tuve que hacer frente al mismo problema (pero al usar shared_tasks con la opción de shared_tasks , por lo que tenemos visibilidad del objeto self ), escribí un decorador personalizado que maneja automáticamente la revocación:

 def revoke_chain_authority(a_shared_task): """ @see: https://gist.github.com/bloudermilk/2173940 @param a_shared_task: a @shared_task(bind=True) celery function. @return: """ @wraps(a_shared_task) def inner(self, *args, **kwargs): try: return a_shared_task(self, *args, **kwargs) except RevokeChainRequested, e: # Drop subsequent tasks in chain (if not EAGER mode) if self.request.callbacks: self.request.callbacks[:] = [] return e.return_value return inner 

Puedes usarlo de la siguiente manera:

 @shared_task(bind=True) @revoke_chain_authority def apply_fetching_decision(self, latitude, longitude): #... if condition: raise RevokeChainRequested(False) 

Vea la explicación completa aquí . ¡Espero eso ayude!

En primer lugar, parece que si en la función existe la excepción ignore_result no te ayuda.

En segundo lugar, usa immutable = True. Significa que la siguiente función (en nuestro caso es notificar ) no tiene argumentos adicionales. notify.subtask((args_sub_2, ), immutable=False) debe usar notify.subtask((args_sub_2, ), immutable=False) si es adecuado para su decisión.

Tercero, puedes usar atajos:

notify.si(args_sub_2) lugar de notify.subtask((args_sub_2, ), immutable=True)

y

is_room_open.s(args_sub_1) en is_room_open.subtask((args_sub_1, )) lugar is_room_open.subtask((args_sub_1, ))

Intenta usarlo codigo:

 @task def check_orders(): # check all the orders and send out appropriate notifications grouped_subs = [] for thingy in things: ... grouped_subs.append(chain(is_room_open.s(args_sub_1), notify.s(args_sub_2))) res = group(grouped_subs).apply_async() res.join() #[1] logger.info('Done checking orders at %s' % current_task.request.id)) @task def is_room_open(args_sub_1): #something time consuming if http_req_and_parse(args_sub_1): # go on and do the notify task return True else: # [2] # STOP THE CHAIN SOMEHOW! Don't execute the rest of the chain, how? # None of the following things work: # is_room_open.update_state(state='FAILURE') # raise celery.exceptions.Ignore() # raise Exception('spam', 'eggs') # current_task.request.callbacks[:] = [] return False @task def notify(result, args_sub_2): if result: # something else time consuming, only do this if the first part of the chain # passed a test (the chained tasks before this were 'successful' notify_user(args_sub_2) return True return False 

Si desea excepciones de captura, debe utilizar la callback como tal

is_room_open.s(args_sub_1, link_error=log_error.s())

 from proj.celery import celery @celery.task def log_error(task_id): result = celery.AsyncResult(task_id) result.get(propagate=False) # make sure result written. with open(os.path.join('/var/errors', task_id), 'a') as fh: fh.write('--\n\n%s %s %s' % ( task_id, result.result, result.traceback))