Pase una lista de cadenas como parámetro de una tarea dependiente en Airflow

Estoy tratando de pasar una lista de cadenas de una tarea a otra a través de XCom, pero parece que no consigo que la lista insertada se interprete de nuevo como una lista.

Por ejemplo, cuando hago esto en una función blah que se ejecuta en ShortCircuitOperator :

 paths = ['gs://{}/{}'.format(bucket, obj) for obj in my_list] kwargs['ti'].xcom_push(key='return_value', value=full_paths) 

y luego quiero usar dicha lista como un parámetro de un operador. Por ejemplo,

 run_task_after_blah = AfterBlahOperator( task_id='run-task-after-blah', ..., input_paths="{{ ti.xcom_pull(task_ids='find-paths') }}", ..., ) 

Espero que input_paths sea ​​igual a las paths pero no lo hace porque la representación ocurre primero y luego se asigna, y la representación de la plantilla convierte el retorno de xcom_pull a una lista de cadenas (y, posteriormente, mi AfterBlahOperator inserta eso como el valor de un elemento en una JSON.

Intenté concatenar las paths en una cadena separada por algún separador y empujar eso al XCom y luego dividirlo cuando se extrae del XCom, pero cuando el XCom se procesa primero, obtengo esa lista de cadenas cuando se llama la función de split la plantilla o la cadena de paths concatenada original si la función de split se aplica al parámetro (como en "{{ ti.xcom_pull(task_ids='find-paths') }}".split(';') .

XCom parece funcionar muy bien para valores individuales como parámetros de tareas o valores múltiples cuando los valores extraídos pueden procesarse aún más, pero no para que valores_múltiples se conviertan en ‘uno’ como parámetro de una tarea.

¿Hay alguna forma de hacer esto sin tener que escribir una función adicional que devuelva precisamente esa lista de cadenas? O tal vez estoy abusando demasiado de XCom, pero hay muchos operadores en Airflow que toman una lista de elementos como parámetro (por ejemplo, generalmente la ruta completa a múltiples archivos que son el resultado de alguna tarea previa, por lo tanto no se conoce de antemano).

Jinja hace cadenas, por lo que si obtiene un XCom a través de plantillas, siempre será una cadena. En su lugar, deberá buscar el XCom donde tenga acceso al objeto TaskInstance . Algo como esto:

 class AfterBlahOperator(BaseOperator): def __init__(self, ..., input_task_id, *args, **kwargs): ... self.input_task_id = input_task_id super(AfterBlahOperator, self).__init__(*args, **kwargs) def execute(self, context): input_paths = context['ti'].xcom_pull(task_ids=self.input_task_id) for path in input_paths: ... 

Esto es similar a cómo lo buscaría dentro de un PythonOperator , del cual los documentos de XCom proporcionan un ejemplo de.

Tenga en cuenta que aún puede admitir un parámetro input_paths separado para cuando se pueda codificar en un DAG, solo necesitará una verificación adicional para ver de qué parámetro leer el valor.