Los informes arrojaron resultados de una larga tarea de apio

Problema

He segmentado una tarea de larga duración en subtareas lógicas, por lo que puedo informar los resultados de cada subtarea a medida que se completa. Sin embargo, estoy tratando de informar los resultados de una tarea que nunca se completará de manera efectiva (en lugar de eso dará los valores a medida que avanza), y me cuesta trabajo hacerlo con mi solución existente.

Fondo

Estoy construyendo una interfaz web para algunos progtwigs Python que he escrito. Los usuarios pueden enviar trabajos a través de formularios web y luego volver a consultar para ver el progreso del trabajo.

Digamos que tengo dos funciones, a las que se accede a través de formularios separados:

  • med_func : tarda aproximadamente 1 minuto en ejecutarse, los resultados se transfieren a render() , lo que produce datos adicionales.
  • long_func : Devuelve un generador. Cada yield toma el orden de 30 minutos, y debe informarse al usuario. Hay tantos rendimientos que podemos considerar este iterador como infinito (termina solo cuando se revoca ).

Código, implementación actual

Con med_func , med_func resultados de la siguiente manera:

Al enviar el formulario, AsyncResult un AsyncResult en una sesión de Django :

  task_result = med_func.apply_async([form], link=render.s()) request.session["task_result"] = task_result 

La vista de Django para la página de resultados accede a este AsyncResult . Cuando una tarea se completa, los resultados se guardan en un objeto que se pasa como contexto a una plantilla de Django.

 def results(request): """ Serve (possibly incomplete) results of a session's latest run. """ session = request.session try: # Load most recent task task_result = session["task_result"] except KeyError: # Already cleared, or doesn't exist if "results" not in session: session["status"] = "No job submitted" else: # Extract data from Asynchronous Tasks session["status"] = task_result.status if task_result.ready(): session["results"] = task_result.get() render_task = task_result.children[0] # Decorate with rendering results session["render_status"] = render_task.status if render_task.ready(): session["results"].render_output = render_task.get() del(request.session["task_result"]) # Don't need any more return render_to_response('results.html', request.session) 

Esta solución solo funciona cuando la función realmente termina . No puedo encadenar las subtareas lógicas de long_func , porque hay un número desconocido de yield s (cada iteración del bucle long_func puede no producir un resultado).

Pregunta

¿Hay alguna forma sensata de acceder a los objetos cedidos desde una tarea de apio extremadamente larga, para que puedan mostrarse antes de que se agote el generador?

Para que Celery sepa cuál es el estado actual de la tarea, establece algunos metadatos en el resultado que tenga. Puede usarlo para almacenar otros tipos de metadatos.

 def yielder(): for i in range(2**100): yield i @task def report_progress(): for progress in yielder(): # set current progress on the task report_progress.backend.mark_as_started( report_progress.request.id, progress=progress) def view_function(request): task_id = request.session['task_id'] task = AsyncResult(task_id) progress = task.info['progress'] # do something with your current progress 

No arrojaría un montón de datos allí, pero funciona bien para rastrear el progreso de una tarea de larga duración.

La respuesta de Pablo es genial. Como alternativa al uso de mark_as_started , puede usar Task método update_state Task . En última instancia, hacen lo mismo, pero el nombre “update_state” es un poco más apropiado para lo que estás tratando de hacer. Opcionalmente, puede definir un estado personalizado que indique que su tarea está en progreso (he llamado a mi estado personalizado “PROGRESO”):

 def yielder(): for i in range(2**100): yield i @task def report_progress(): for progress in yielder(): # set current progress on the task report_progress.update_state(state='PROGRESS', meta={'progress': progress}) def view_function(request): task_id = request.session['task_id'] task = AsyncResult(task_id) progress = task.info['progress'] # do something with your current progress 

Parte de apio:

 def long_func(*args, **kwargs): i = 0 while True: yield i do_something_here(*args, **kwargs) i += 1 @task() def test_yield_task(task_id=None, **kwargs): the_progress = 0 for the_progress in long_func(**kwargs): cache.set('celery-task-%s' % task_id, the_progress) 

Lado del cliente, tarea inicial:

 r = test_yield_task.apply_async() request.session['task_id'] = r.task_id 

Prueba de último valor cedido:

  v = cache.get('celery-task-%s' % session.get('task_id')) if v: do_someting() 

Si no te gusta usar el caché, o si es imposible, puedes usar db, file o cualquier otro lugar en el que el trabajador del apio y el servidor tengan ambos accesos. Con el caché es la solución más simple, pero los trabajadores y el servidor tienen que usar el mismo caché.

Un par de opciones a considerar:

1 – grupos de tareas. Si puede enumerar todas las sub tareas desde el momento de la invocación, puede aplicar el grupo como un todo, que devuelve un objeto TaskSetResult que puede usar para monitorear los resultados del grupo en su totalidad, o de tareas individuales en el grupo. – consulte esto cuando sea necesario cuando necesite verificar el estado.

2 – devoluciones de llamada. Si no puede enumerar todas las sub tareas (¡o incluso si puede!), Puede definir un enlace / callback que sea el último paso de la tarea: se llama cuando finaliza el rest de la tarea. El gancho sería contra un URI en su aplicación que ingiera el resultado y lo haga disponible a través de DB o API interna de la aplicación.

Una combinación de estos podría resolver su desafío.

Vea también este gran preso PyCon de uno de los ingenieros de Instagram.

http://blogs.vmware.com/vfabric/2013/04/how-instagram-feeds-work-celery-and-rabbitmq.html

En la marca de video a las 16:00, explica cómo estructuran largas listas de sub-tareas.

Personalmente, me gustaría ver la hora de inicio, la duración, el progreso (número de elementos producidos), la hora de finalización (o ETA), el estado y cualquier otra información útil. Sería bueno si se viera similar a una pantalla relacionada, tal vez como ps en Linux. Es, después de todo, un estado de proceso.

Podría incluir algunas opciones para pausar o eliminar la tarea y / o para “abrirla” y mostrar información detallada sobre los niños o los resultados.