Flujo de air: pasar un valor dynamic al operador Sub DAG

Soy nuevo en Airflow.
Me he encontrado con un escenario en el que Parent DAG necesita pasar un número dynamic (digamos n ) a Sub DAG.
Donde como SubDAG usará este número para crear dinámicamente n tareas paralelas.

La documentación del flujo de air no cubre una manera de lograr esto. Así que he explorado un par de maneras:

Opción – 1 (utilizando xcom Pull)

He intentado pasar como un valor de xcom, pero por alguna razón, SubDAG no está resolviendo el valor pasado.

Archivo Dag principal

 def load_dag(**kwargs): number_of_runs = json.dumps(kwargs['dag_run'].conf['number_of_runs']) dag_data = json.dumps({ "number_of_runs": number_of_runs }) return dag_data # ------------------ Tasks ------------------------------ load_config = PythonOperator( task_id='load_config', provide_context=True, python_callable=load_dag, dag=dag) t1 = SubDagOperator( task_id=CHILD_DAG_NAME, subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, "'{{ ti.xcom_pull(task_ids='load_config') }}'" ), default_args=default_args, dag=dag, ) 

Sub Dag File

 def sub_dag(parent_dag_name, child_dag_name, args, num_of_runs): dag_subdag = DAG( dag_id='%s.%s' % (parent_dag_name, child_dag_name), default_args=args, schedule_interval=None) variabe_names = {} for i in range(num_of_runs): variabe_names['task' + str(i + 1)] = DummyOperator( task_id='dummy_task', dag=dag_subdag, ) return dag_subdag 

Opcion 2

También he intentado pasar number_of_runs como una variable global, que no estaba funcionando.

Opción – 3

También intentamos escribir este valor en un archivo de datos. Pero sub DAG está lanzando el File doesn't exist error . Esto podría ser porque estamos generando dinámicamente este archivo.

Puede alguien ayudarme con esto.

Lo he hecho con la Opción 3. La clave es devolver un dag válido sin tareas, si el archivo no existe. Así que load_config generará un archivo con su número de tareas o más información si es necesario. Su fábrica de subdagras se vería algo así como:

 def subdag(...): sdag = DAG('%s.%s' % (parent, child), default_args=args, schedule_interval=timedelta(hours=1)) file_path = "/path/to/generated/file" if os.path.exists(file_path): data_file = open(file_path) list_tasks = data_file.readlines() for task in list_tasks: DummyOperator( task_id='task_'+task, default_args=args, dag=sdag, ) return sdag 

En dag generation verás una subetiqueta sin tareas. En la ejecución de dag, una vez que se realiza load_config, puede ver que genera una subetiqueta generada dinámicamente

La opción 1 debería funcionar si simplemente cambia la llamada a xcom_pull para incluir el dag_id del dag principal. Por defecto, la llamada xcom_pull buscará el task_id 'load_config' en su propio dag que no existe.

así que cambia la macro de llamada x_com a:

 subdag=sub_dag(PARENT_DAG_NAME, CHILD_DAG_NAME, default_args, "'{{ ti.xcom_pull(task_ids='load_config', dag_id='" + PARENT_DAG_NAME + "' }}'" ), 

Si el nombre de archivo en el que está escribiendo no es dynamic (por ejemplo, está escribiendo sobre el mismo archivo una y otra vez para cada instancia de tarea), la respuesta de Jaime funcionará:

 file_path = "/path/to/generated/file" 

Pero si necesita un nombre de archivo único o desea que cada instancia de tarea escriba un contenido diferente para las tareas ejecutadas en paralelo, el flujo de air no funcionará en este caso, ya que no hay forma de pasar la fecha de ejecución o la variable fuera de una plantilla. Echa un vistazo a este post .

Eche un vistazo a mi respuesta aquí , en la que describo una forma de crear una tarea de forma dinámica en función de los resultados de una tarea ejecutada anteriormente utilizando xcoms y subdags.