Ejecutar una tarea después de que se hayan completado todas las tareas

Estoy escribiendo una aplicación que necesita ejecutar una serie de tareas en paralelo y luego una sola tarea con los resultados de todas las tareas ejecutadas:

@celery.task def power(value, expo): return value ** expo @celery.task def amass(values): print str(values) 

Es un ejemplo muy artificial y simplificado, pero espero que el punto resulte bien. Básicamente, tengo muchos elementos que necesitan ejecutarse a través del power , pero solo quiero ejecutar la amass de los resultados de todas las tareas. Todo esto debería suceder de forma asíncrona, y no necesito nada del método de amass .

¿Alguien sabe cómo configurar esto en apio para que todo se ejecute de forma asíncrona y se llame a una sola callback con una lista de los resultados después de que todo se haya dicho y hecho?

He configurado este ejemplo para ejecutarse con un chord como recomendó Alexander Afanasiev:

 from time import sleep import random tasks = [] for i in xrange(10): tasks.append(power.s((i, 2))) sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms callback = amass.s() r = chord(tasks)(callback) 

Desafortunadamente, en el ejemplo anterior, todas las tareas en las tasks se inician solo cuando se llama al método de chord . ¿Hay alguna forma de que cada tarea pueda comenzar por separado y luego podría agregar una callback al grupo para que se ejecute cuando todo haya terminado?

Aquí hay una solución que funcionó para mis propósitos:

tareas.py :

 from time import sleep import random @celery.task def power(value, expo): sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms return value ** expo @celery.task def amass(results, tasks): completed_tasks = [] for task in tasks: if task.ready(): completed_tasks.append(task) results.append(task.get()) # remove completed tasks tasks = list(set(tasks) - set(completed_tasks)) if len(tasks) > 0: # resend the task to execute at least 1 second from now amass.delay(results, tasks, countdown=1) else: # we done print results 

Caso de uso:

 tasks = [] for i in xrange(10): tasks.append(power.delay(i, 2)) amass.delay([], tasks) 

Lo que debería hacer es iniciar todas las tareas lo antes posible de forma asíncrona. Una vez que todos han sido publicados en la cola, la tarea de amass también se publicará en la cola. La tarea de acumulación continuará repostándose hasta que todas las demás tareas se hayan completado.

El apio tiene muchas herramientas para la mayoría de los flujos de trabajo que puedas imaginar.

Parece que necesitas usar el acorde . Aquí hay una cita de docs:

Un acorde es como un grupo pero con una callback. Un acorde consiste en un grupo de encabezado y un cuerpo, donde el cuerpo es una tarea que debe ejecutarse después de que todas las tareas en el encabezado estén completas.

Mirando este fragmento de su pregunta, parece que está pasando una list como encabezado de acorde, en lugar de un group :

 from time import sleep import random tasks = [] for i in xrange(10): tasks.append(power.s((i, 2))) sleep(random.randint(10, 1000) / 1000.0) # sleep for 10-1000ms callback = amass.s() r = chord(tasks)(callback) 

La conversión de la list a un group debe dar como resultado el comportamiento que está esperando:

 ... callback = amass.s() tasks = group(tasks) r = chord(tasks)(callback) 

La respuesta que @ alexander-afanasiev te dio es esencialmente correcta: usa un acorde.

Su código está bien, pero tasks.append(power.s((i, 2))) no está ejecutando realmente la subtarea, solo agrega subtareas a una lista. Es chord(...)(...) el que envía tantos mensajes al intermediario como subtareas que ha definido en la lista de tasks , más un mensaje más para la subtarea de callback. Cuando se llama chord , vuelve tan pronto como sea posible.

Si desea saber cuándo ha terminado el acorde, puede realizar una encuesta para completar, como con una sola tarea usando r.ready() en su muestra.