He escrito un DAG con múltiples PythonOperators
task1 = af_op.PythonOperator(task_id='Data_Extraction_Environment', provide_context=True, python_callable=Task1, dag=dag1) def Task1(**kwargs): return(kwargs['dag_run'].conf.get('file'))
Desde PythonOperator estoy llamando al método “Task1”. Ese método está devolviendo un valor, ese valor que necesito pasar al siguiente PythonOperator. ¿Cómo puedo obtener el valor de la variable “task1” o cómo puedo obtener el valor que se devuelve del método Task1?
actualizado :
def Task1(**kwargs): file_name = kwargs['dag_run'].conf.get[file] task_instance = kwargs['task_instance'] task_instance.xcom_push(key='file', value=file_name) return file_name t1 = PythonOperator(task_id = 'Task1',provide_context=True,python_callable=Task1,dag=dag) t2 = BashOperator( task_id='Moving_bucket', bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1',key='file') }} ', dag=dag, ) t2.set_upstream(t1)
Es posible que desee revisar XCOM de Airflow: https://airflow.apache.org/concepts.html#xcoms
Si devuelve un valor de una función, este valor se almacena en xcom. En su caso, podría acceder a él desde otros códigos de Python:
task_instance = kwargs['task_instance'] task_instance.xcom_pull(task_ids='Task1')
o en una plantilla como tal:
{{ task_instance.xcom_pull(task_ids='Task1') }}
Si desea especificar una clave, puede presionar en XCOM (estar dentro de una tarea):
task_instance = kwargs['task_instance'] task_instance.xcom_push(key='the_key', value=my_str)
Luego, más adelante, puedes acceder a él de esta manera:
task_instance.xcom_pull(task_ids='my_task', key='the_key')
EDITAR 1
Pregunta de seguimiento: en lugar de usar el valor en otra función, ¿cómo puedo pasar el valor a otro PythonOperator como – “t2 =” BashOperator (task_id = ‘Moving_bucket’, bash_command = ‘python /home/raw.py “% s” ‘% file_name, dag = dag) “— quiero acceder a file_name que se devuelve con” Task1 “. ¿Cómo se puede lograr esto?
En primer lugar, me parece que, de hecho, el valor no se pasa a otro PythonOperator
sino a un BashOperator
.
En segundo lugar, esto ya está cubierto en mi respuesta anterior. El campo bash_command
tiene una plantilla (vea template_fields
en la fuente: https://github.com/apache/incubator-airflow/blob/master/airflow/operators/bash_operator.py ). Por lo tanto, podemos usar la versión con plantilla:
BashOperator( task_id='Moving_bucket', bash_command='python /home/raw.py {{ task_instance.xcom_pull(task_ids='Task1') }} ', dag=dag, )
Editar 2
Explicación: El flujo de air funciona de esta manera: ejecutará la Tarea 1, luego llenará xcom y luego ejecutará la siguiente tarea. Entonces, para que su ejemplo funcione, primero necesita ejecutar Task1 y luego ejecutar Moving_bucket en sentido descendente de Task1.
Como está utilizando una función de retorno, también puede omitir la key='file'
de xcom_pull
y no establecerla manualmente en la función.