Python Airflow – Resultado de retorno de PythonOperator

He escrito un DAG con múltiples PythonOperators

task1 = af_op.PythonOperator(task_id='Data_Extraction_Environment', provide_context=True, python_callable=Task1, dag=dag1) def Task1(**kwargs): return(kwargs['dag_run'].conf.get('file')) 

Desde PythonOperator estoy llamando al método “Task1”. Ese método está devolviendo un valor, ese valor que necesito pasar al siguiente PythonOperator. ¿Cómo puedo obtener el valor de la variable “task1” o cómo puedo obtener el valor que se devuelve del método Task1?

actualizado :

  def Task1(**kwargs): file_name = kwargs['dag_run'].conf.get[file] task_instance = kwargs['task_instance'] task_instance.xcom_push(key='file', value=file_name) return file_name t1 = PythonOperator(task_id = 'Task1',provide_context=True,python_callable=Task1,dag=dag) t2 = BashOperator( task_id='Moving_bucket', bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1',key='file') }} ', dag=dag, ) t2.set_upstream(t1) 

Es posible que desee revisar XCOM de Airflow: https://airflow.apache.org/concepts.html#xcoms

Si devuelve un valor de una función, este valor se almacena en xcom. En su caso, podría acceder a él desde otros códigos de Python:

 task_instance = kwargs['task_instance'] task_instance.xcom_pull(task_ids='Task1') 

o en una plantilla como tal:

 {{ task_instance.xcom_pull(task_ids='Task1') }} 

Si desea especificar una clave, puede presionar en XCOM (estar dentro de una tarea):

 task_instance = kwargs['task_instance'] task_instance.xcom_push(key='the_key', value=my_str) 

Luego, más adelante, puedes acceder a él de esta manera:

 task_instance.xcom_pull(task_ids='my_task', key='the_key') 

EDITAR 1

Pregunta de seguimiento: en lugar de usar el valor en otra función, ¿cómo puedo pasar el valor a otro PythonOperator como – “t2 =” BashOperator (task_id = ‘Moving_bucket’, bash_command = ‘python /home/raw.py “% s” ‘% file_name, dag = dag) “— quiero acceder a file_name que se devuelve con” Task1 “. ¿Cómo se puede lograr esto?

En primer lugar, me parece que, de hecho, el valor no se pasa a otro PythonOperator sino a un BashOperator .

En segundo lugar, esto ya está cubierto en mi respuesta anterior. El campo bash_command tiene una plantilla (vea template_fields en la fuente: https://github.com/apache/incubator-airflow/blob/master/airflow/operators/bash_operator.py ). Por lo tanto, podemos usar la versión con plantilla:

 BashOperator( task_id='Moving_bucket', bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1') }} ', dag=dag, ) 

Editar 2

Explicación: El flujo de air funciona de esta manera: ejecutará la Tarea 1, luego llenará xcom y luego ejecutará la siguiente tarea. Entonces, para que su ejemplo funcione, primero necesita ejecutar Task1 y luego ejecutar Moving_bucket en sentido descendente de Task1.

Como está utilizando una función de retorno, también puede omitir la key='file' de xcom_pull y no establecerla manualmente en la función.