Revocar la tarea de apio antes de ejecutar utilizando la base de datos django

Estoy usando la base de datos Django en lugar de RabbitMQ por razones de concurrencia.

Pero no puedo resolver el problema de revocar una tarea antes de que se ejecute.

Encontré algunas respuestas sobre este asunto, pero no parecen estar completas o no puedo obtener suficiente ayuda.

  • primera respuesta
  • segunda respuesta

¿Cómo puedo extender la tabla de tareas de apio usando un modelo, agregar un campo booleano (revocado) para establecer cuando no quiero que la tarea se ejecute?

Gracias.

Como Celery realiza un seguimiento de las tareas por una ID, todo lo que realmente necesita es poder saber qué ID se han cancelado. En lugar de modificar los elementos internos de kombu , puede crear su propia tabla (o memcached etc.) que solo rastrea las ID canceladas, y luego verificar si la ID para la tarea actual cancelable está en ella.

Esto es lo que hacen internamente los transportes que admiten un comando de revoke remota:

Todos los nodos de trabajo conservan una memoria de identificadores de tareas revocados, ya sea en memoria o persistentes en el disco (consulte Revocaciones persistentes). (De documentos de apio)

Cuando usa el transporte django, usted es responsable de hacerlo usted mismo. En este caso, depende de cada tarea verificar si se ha cancelado.

Así que la forma básica de su tarea (el registro agregado en lugar de una operación real) se convierte en:

 from celery import shared_task from celery.exceptions import Ignore from celery.utils.log import get_task_logger from .models import task_canceled logger = get_task_logger(__name__) @shared_task def my_task(): if task_canceled(my_task.request.id): raise Ignore logger.info("Doing my stuff") 

Puede ampliar y mejorar esto de varias maneras, como creando una clase de Tareable Cancelable como en una de las otras respuestas a las que se vinculó, pero esta es la forma básica. Lo que te falta ahora es el modelo y la función para comprobarlo.

Tenga en cuenta que el ID en este caso será un ID de cadena como a5644f08-7d30-43ff-a61e-81c165ad9e19 , no un entero. Su modelo puede ser tan simple como esto:

 from django.db import models class CanceledTask(models.Model): task_id = models.CharField(max_length=200) def cancel_task(request_id): CanceledTask.objects.create(task_id=request_id) def task_canceled(request_id): return CanceledTask.objects.filter(task_id=request_id).exists() 

Ahora puede verificar el comportamiento observando los registros de depuración de su servicio de apio mientras hace cosas como:

 my_task.delay() models.cancel_task(my_task.delay())