Forma correcta de crear flujos de trabajo dynamics en Airflow.

Problema

¿Hay alguna forma en Airflow para crear un flujo de trabajo tal que el número de tareas B * sea desconocido hasta que se complete la Tarea A? He visto subdags, pero parece que solo puede funcionar con un conjunto estático de tareas que deben determinarse en la creación de Dag.

¿Dag triggers funcionaría? Y si es así, ¿podría dar un ejemplo?

Tengo un problema en el que es imposible saber la cantidad de tareas B que se necesitarán para calcular la tarea C hasta que la tarea A se haya completado. Cada tarea B. * tomará varias horas para calcular y no se puede combinar.

|---> Task B.1 --| |---> Task B.2 --| Task A ------|---> Task B.3 --|-----> Task C | .... | |---> Task BN --| 

Idea # 1

No me gusta esta solución porque tengo que crear un ExternalTaskSensor de locking y toda la Tarea B * llevará entre 2 y 24 horas en completarse. Así que no considero que esta sea una solución viable. Seguramente hay una forma más fácil? ¿O fue que Airflow no fue diseñado para esto?

 Dag 1 Task A -> TriggerDagRunOperator(Dag 2) -> ExternalTaskSensor(Dag 2, Task Dummy B) -> Task C Dag 2 (Dynamically created DAG though python_callable in TriggerDagrunOperator) |-- Task B.1 --| |-- Task B.2 --| Task Dummy A --|-- Task B.3 --|-----> Task Dummy B | .... | |-- Task BN --| 

Edición 1:

A partir de ahora esta pregunta todavía no tiene una gran respuesta . Varias personas me han contactado buscando una solución.

Aquí es cómo lo hice con una solicitud similar sin ningún tipo de subdags:

Primero crea un método que devuelva los valores que quieras

 def values_function(): return values 

A continuación, crea el método que generará los trabajos dinámicamente:

 def group(number, **kwargs): #load the values if needed in the command you plan to execute dyn_value = "{{ task_instance.xcom_pull(task_ids='push_func') }}" return BashOperator( task_id='JOB_NAME_{}'.format(number), bash_command='script.sh {} {}'.format(dyn_value, number), dag=dag) 

Y luego combinarlos:

 push_func = PythonOperator( task_id='push_func', provide_context=True, python_callable=values_function, dag=dag) complete = DummyOperator( task_id='All_jobs_completed', dag=dag) for i in values_function(): push_func >> group(i) >> complete 

He descubierto una manera de crear flujos de trabajo basados ​​en el resultado de tareas anteriores.
Básicamente, lo que quieres hacer es tener dos subagrupaciones con lo siguiente:

  1. Xcom inserta una lista (o lo que sea que necesite para crear el flujo de trabajo dynamic más adelante) en la subetiqueta que se ejecuta primero (consulte test1.py def return_list() return_list def return_list() )
  2. Pase el objeto dag principal como un parámetro a su segundo subdag.
  3. Ahora, si tiene el objeto dag principal, puede usarlo para obtener una lista de sus instancias de tareas. De esa lista de instancias de tareas, puede filtrar una tarea de la ejecución actual usando parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1] ), probablemente se podría agregar más filtros aquí
  4. Con esa instancia de tarea, puede usar xcom pull para obtener el valor que necesita especificando el dag_id en el de la primera subetiqueta: dag_id='%s.%s' % (parent_dag_name, 'test1')
  5. Use la lista / valor para crear sus tareas dinámicamente

Ahora he probado esto en mi instalación de flujo de air local y funciona bien. No sé si la parte de extracción de xcom tendrá algún problema si hay más de una instancia del dag ejecutándose al mismo tiempo, pero es probable que use una clave única o algo así para identificar de forma única al xcom. valor que quieras. Probablemente, se podría optimizar el 3. paso para estar 100% seguro de obtener una tarea específica del dag principal actual, pero para mi uso esto funciona lo suficientemente bien, creo que solo se necesita un objeto task_instance para usar xcom_pull.

También limpio los xcoms para la primera subdivisión antes de cada ejecución, solo para asegurarme de que no obtengo ningún valor incorrecto accidentalmente.

Soy bastante malo en explicar, así que espero que el siguiente código aclare todo:

test1.py

 from airflow.models import DAG import logging from airflow.operators.python_operator import PythonOperator from airflow.operators.postgres_operator import PostgresOperator log = logging.getLogger(__name__) def test1(parent_dag_name, start_date, schedule_interval): dag = DAG( '%s.test1' % parent_dag_name, schedule_interval=schedule_interval, start_date=start_date, ) def return_list(): return ['test1', 'test2'] list_extract_folder = PythonOperator( task_id='list', dag=dag, python_callable=return_list ) clean_xcoms = PostgresOperator( task_id='clean_xcoms', postgres_conn_id='airflow_db', sql="delete from xcom where dag_id='{{ dag.dag_id }}'", dag=dag) clean_xcoms >> list_extract_folder return dag 

test2.py

 from airflow.models import DAG, settings import logging from airflow.operators.dummy_operator import DummyOperator log = logging.getLogger(__name__) def test2(parent_dag_name, start_date, schedule_interval, parent_dag=None): dag = DAG( '%s.test2' % parent_dag_name, schedule_interval=schedule_interval, start_date=start_date ) if len(parent_dag.get_active_runs()) > 0: test_list = parent_dag.get_task_instances(settings.Session, start_date=parent_dag.get_active_runs()[-1])[-1].xcom_pull( dag_id='%s.%s' % (parent_dag_name, 'test1'), task_ids='list') if test_list: for i in test_list: test = DummyOperator( task_id=i, dag=dag ) return dag 

y el flujo de trabajo principal:

test.py

 from datetime import datetime from airflow import DAG from airflow.operators.subdag_operator import SubDagOperator from subdags.test1 import test1 from subdags.test2 import test2 DAG_NAME = 'test-dag' dag = DAG(DAG_NAME, description='Test workflow', catchup=False, schedule_interval='0 0 * * *', start_date=datetime(2018, 8, 24)) test1 = SubDagOperator( subdag=test1(DAG_NAME, dag.start_date, dag.schedule_interval), task_id='test1', dag=dag ) test2 = SubDagOperator( subdag=test2(DAG_NAME, dag.start_date, dag.schedule_interval, parent_dag=dag), task_id='test2', dag=dag ) test1 >> test2 

OA: “¿Hay alguna forma en Airflow para crear un flujo de trabajo tal que la cantidad de tareas B * sea desconocida hasta que se complete la Tarea A?”

La respuesta corta es no. El flujo de air generará el flujo DAG antes de comenzar a ejecutarlo.

Dicho esto, llegamos a una conclusión simple, es que no tenemos tal necesidad. Cuando desee poner en paralelo algunos trabajos, debe evaluar los recursos que tiene disponibles y no la cantidad de elementos para procesar.

Lo hicimos así: generamos dinámicamente un número fijo de tareas, digamos 10, que dividirá el trabajo. Por ejemplo, si necesitamos procesar 100 archivos, cada tarea procesará 10 de ellos. Voy a publicar el código más tarde hoy.

Actualizar

Aquí está el código, perdón por el retraso.

 from datetime import datetime, timedelta import airflow from airflow.operators.dummy_operator import DummyOperator args = { 'owner': 'airflow', 'depends_on_past': False, 'start_date': datetime(2018, 1, 8), 'email': ['myemail@gmail.com'], 'email_on_failure': True, 'email_on_retry': True, 'retries': 1, 'retry_delay': timedelta(seconds=5) } dag = airflow.DAG( 'parallel_tasks_v1', schedule_interval="@daily", catchup=False, default_args=args) # You can read this from variables parallel_tasks_total_number = 10 start_task = DummyOperator( task_id='start_task', dag=dag ) # Creates the tasks dynamically. # Each one will elaborate one chunk of data. def create_dynamic_task(current_task_number): return DummyOperator( provide_context=True, task_id='parallel_task_' + str(current_task_number), python_callable=parallelTask, # your task will take as input the total number and the current number to elaborate a chunk of total elements op_args=[current_task_number, int(parallel_tasks_total_number)], dag=dag) end = DummyOperator( task_id='end', dag=dag) for page in range(int(parallel_tasks_total_number)): created_task = create_dynamic_task(page) start_task >> created_task created_task >> end 

Explicación del código:

Aquí tenemos una sola tarea de inicio y una única tarea de finalización (ambas ficticias).

Luego, desde la tarea de inicio con el bucle for, creamos 10 tareas con el mismo python que se puede llamar. Las tareas se crean en la función create_dynamic_task.

A cada python que se puede llamar, le pasamos como argumentos el número total de tareas paralelas y el índice de tareas actual.

Supongamos que tiene 1000 elementos para elaborar: la primera tarea recibirá en la entrada que debería elaborar la primera parte de 10 partes. Dividirá los 1000 artículos en 10 partes y elaborará el primero.

Creo que he encontrado una solución mejor para esto en https://github.com/mastak/airflow_multi_dagrun , que utiliza la simple puesta en cola de DagRuns al activar múltiples dagruns, similar a TriggerDagRuns . La mayoría de los créditos van a https://github.com/mastak , aunque tuve que parchear algunos detalles para que funcionara con el flujo de air más reciente.

La solución utiliza un operador personalizado que activa varios DagRuns :

 from airflow import settings from airflow.models import DagBag from airflow.operators.dagrun_operator import DagRunOrder, TriggerDagRunOperator from airflow.utils.decorators import apply_defaults from airflow.utils.state import State from airflow.utils import timezone class TriggerMultiDagRunOperator(TriggerDagRunOperator): CREATED_DAGRUN_KEY = 'created_dagrun_key' @apply_defaults def __init__(self, op_args=None, op_kwargs=None, *args, **kwargs): super(TriggerMultiDagRunOperator, self).__init__(*args, **kwargs) self.op_args = op_args or [] self.op_kwargs = op_kwargs or {} def execute(self, context): context.update(self.op_kwargs) session = settings.Session() created_dr_ids = [] for dro in self.python_callable(*self.op_args, **context): if not dro: break if not isinstance(dro, DagRunOrder): dro = DagRunOrder(payload=dro) now = timezone.utcnow() if dro.run_id is None: dro.run_id = 'trig__' + now.isoformat() dbag = DagBag(settings.DAGS_FOLDER) trigger_dag = dbag.get_dag(self.trigger_dag_id) dr = trigger_dag.create_dagrun( run_id=dro.run_id, execution_date=now, state=State.RUNNING, conf=dro.payload, external_trigger=True, ) created_dr_ids.append(dr.id) self.log.info("Created DagRun %s, %s", dr, now) if created_dr_ids: session.commit() context['ti'].xcom_push(self.CREATED_DAGRUN_KEY, created_dr_ids) else: self.log.info("No DagRun created") session.close() 

Luego, puede enviar varios dagruns desde la función invocable en su PythonOperator, por ejemplo:

 from airflow.operators.dagrun_operator import DagRunOrder from airflow.models import DAG from airflow.operators import TriggerMultiDagRunOperator from airflow.utils.dates import days_ago def generate_dag_run(**kwargs): for i in range(10): order = DagRunOrder(payload={'my_variable': i}) yield order args = { 'start_date': days_ago(1), 'owner': 'airflow', } dag = DAG( dag_id='simple_trigger', max_active_runs=1, schedule_interval='@hourly', default_args=args, ) gen_target_dag_run = TriggerMultiDagRunOperator( task_id='gen_target_dag_run', dag=dag, trigger_dag_id='common_target', python_callable=generate_dag_run ) 

Creé una bifurcación con el código en https://github.com/flinz/airflow_multi_dagrun

Encontré esta publicación de Medium que es muy similar a esta pregunta. Sin embargo, está lleno de errores tipográficos y no funciona cuando intenté implementarlo.

Mi respuesta a la anterior es la siguiente:

Si está creando tareas dinámicamente, debe hacerlo iterando sobre algo que no haya sido creado por una tarea ascendente, o se puede definir independientemente de esa tarea. Aprendí que no puedes pasar las fechas de ejecución u otras variables de flujo de air a algo fuera de una plantilla (por ejemplo, una tarea) como muchos otros han señalado anteriormente. Véase también este post .