viewflow.io: implementando una tarea de cola

Me gustaría implementar el siguiente caso de uso con la biblioteca ViewFlow :

Problema

Los procesos de un Flujo particular, iniciado por un usuario, deben esperar en una cola antes de ejecutar un trabajo de apio. Cada usuario tiene una cola de estos procesos. En función de una planificación, o activada manualmente, se permite que continúe el siguiente proceso en la cola.

Ejemplo

Un nodo dentro de mi flujo entra en una cola nombrada. Otra lógica dentro de la aplicación determina, para cada cola, cuándo permitir que la siguiente tarea continúe. Se selecciona la siguiente tarea en la cola y se llama al método done () de su activación.

Un flujo de ejemplo podría verse así:

class MyFlow(Flow): start = flow.Start(...).Next(queue_wait) queue_wait = QueueWait("myQueue").Next(job) job = celery.Job(...).Next(end) end = flow.End() 

Pregunta

¿Cuál sería el mejor enfoque para implementar la cola? En el ejemplo anterior, no sé qué debería ser “QueueWait”.

    He leído los documentos y el código del flujo de visualización, pero aún no me queda claro si esto se puede hacer con las clases integradas de Nodo y Activación, como func.Function, o si necesito ampliar con clases personalizadas.

    Después de mucha experimentación, llegué a una solución viable y simple:

     from viewflow.flow import base from viewflow.flow.func import FuncActivation from viewflow.activation import STATUS class Queue(base.NextNodeMixin, base.UndoViewMixin, base.CancelViewMixin, base.DetailsViewMixin, base.Event): """ Node that halts the flow and waits in a queue. To process the next waiting task call the dequeue method, optionally specifying the task owner. Example placing a job in a queue:: class MyFlow(Flow): wait = Queue().Next(this.job) job = celery.Job(send_stuff).Next(this.end) end = flow.End() somewhere in the application code: MyFlow.wait.dequeue() or: MyFlow.wait.dequeue(process__myprocess__owner=user) Queues are logically separated by the task_type, so new queues defined in a subclass by overriding task_type attribute. """ task_type = 'QUEUE' activation_cls = FuncActivation def __init__(self, **kwargs): super(Queue, self).__init__(**kwargs) def dequeue(self, **kwargs): """ Process the next task in the queue by created date/time. kwargs is used to add task filter arguments, thereby effectively splitting the queue into subqueues. This could be used to implement per-user queues. Returns True if task was found and dequeued, False otherwise """ filter_kwargs = {'flow_task_type': self.task_type, 'status': STATUS.NEW} if kwargs is not None: filter_kwargs.update(kwargs) task = self.flow_cls.task_cls.objects.filter(**filter_kwargs).order_by('created').first() if task is not None: lock = self.flow_cls.lock_impl(self.flow_cls.instance) with lock(self.flow_cls, task.process_id): task = self.flow_cls.task_cls._default_manager.get(pk=task.pk) activation = self.activation_cls() activation.initialize(self, task) activation.prepare() activation.done() return True return False 

    Intenté hacerlo lo más genérico posible y admitir la definición de múltiples colas con nombre, así como subcolas, como las colas por usuario.