configurando s3 para registros en el flujo de air

Estoy usando docker-compose para configurar un clúster de flujo de air escalable. Basé mi enfoque en este Dockerfile https://hub.docker.com/r/puckel/docker-airflow/

Mi problema es configurar los registros para escribir / leer desde s3. Cuando un dag ha completado recibo un error como este

*** Log file isn't local. *** Fetching here: http://ea43d4d49f35:8793/log/xxxxxxx/2017-06-26T11:00:00 *** Failed to fetch log file from worker. *** Reading remote logs... Could not read logs from s3://buckets/xxxxxxx/airflow/logs/xxxxxxx/2017-06- 26T11:00:00 

Configuré una nueva sección en el archivo airflow.cfg como este

 [MyS3Conn] aws_access_key_id = xxxxxxx aws_secret_access_key = xxxxxxx aws_default_region = xxxxxxx 

Y luego especificó la ruta s3 en la sección de registros remotos en airflow.cfg

 remote_base_log_folder = s3://buckets/xxxx/airflow/logs remote_log_conn_id = MyS3Conn 

¿Configuré esto correctamente y hay un error? ¿Hay alguna receta para el éxito que me estoy perdiendo?

    – Actualizar

    Intenté exportar en formatos URI y JSON y ninguno parecía funcionar. Luego exporté aws_access_key_id y aws_secret_access_key y luego el flujo de air comenzó a recogerlo. Ahora me sale su error en los registros del trabajador.

     6/30/2017 6:05:59 PMINFO:root:Using connection to: s3 6/30/2017 6:06:00 PMERROR:root:Could not read logs from s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00 6/30/2017 6:06:00 PMERROR:root:Could not write logs to s3://buckets/xxxxxx/airflow/logs/xxxxx/2017-06-30T23:45:00 6/30/2017 6:06:00 PMLogging into: /usr/local/airflow/logs/xxxxx/2017-06-30T23:45:00 

    – Actualizar

    Encontré este enlace también https://www.mail-archive.com/dev@airflow.incubator.apache.org/msg00462.html

    Luego realicé el shell en una de mis máquinas de trabajo (separadas del servidor web y del progtwigdor) y ejecuté este bit de código en Python

     import airflow s3 = airflow.hooks.S3Hook('s3_conn') s3.load_string('test', airflow.conf.get('core', 'remote_base_log_folder')) 

    Recibo este error.

     boto.exception.S3ResponseError: S3ResponseError: 403 Forbidden 

    Intenté exportar varios tipos diferentes de AIRFLOW_CONN_ AIRFLOW_CONN_ como se explica aquí en la sección de conexiones https://airflow.incubator.apache.org/concepts.html y por otras respuestas a esta pregunta.

     s3://:@S3 {"aws_account_id":"","role_arn":"arn:aws:iam:::role/"} {"aws_access_key_id":"","aws_secret_access_key":""} 

    También he exportado AWS_ACCESS_KEY_ID y AWS_SECRET_ACCESS_KEY sin éxito.

    Estas credenciales se almacenan en una base de datos, por lo que, una vez que las agregué en la interfaz de usuario, los trabajadores deben seleccionarlas, pero no pueden escribir / leer registros por algún motivo.

    Debe configurar la conexión s3 a través de la interfaz de usuario de flujo de air. Para esto, debe ir a la pestaña Admin -> Conexiones en la interfaz de usuario de flujo de air y crear una nueva fila para su conexión S3.

    Una configuración de ejemplo sería:

    Id de conexión: my_conn_S3

    Tipo de conexión: S3

    Extra: {“aws_access_key_id”: “your_aws_key_id”, “aws_secret_access_key”: “your_aws_secret_key”}

    ACTUALIZACIÓN Airflow 1.10 facilita mucho el registro .

    Para el registro de s3, configure el enlace de conexión según la respuesta anterior

    y luego simplemente agregue lo siguiente a airflow.cfg

      [core] # Airflow can store logs remotely in AWS S3. Users must supply a remote # location URL (starting with either 's3://...') and an Airflow connection # id that provides access to the storage location. remote_base_log_folder = s3://my-bucket/path/to/logs remote_log_conn_id = MyS3Conn # Use server-side encryption for logs stored in S3 encrypt_s3_logs = False 

    Para gcs logging,

    1. Instale primero el paquete gcp_api, así: pip install apache-airflow [gcp_api].

    2. Configure el gancho de conexión según la respuesta anterior

    3. Agregue lo siguiente a airflow.cfg

       [core] # Airflow can store logs remotely in AWS S3. Users must supply a remote # location URL (starting with either 's3://...') and an Airflow connection # id that provides access to the storage location. remote_logging = True remote_base_log_folder = gs://my-bucket/path/to/logs remote_log_conn_id = MyGCSConn 

    NOTA: A partir de Airflow 1.9, el registro remoto se ha modificado significativamente . Si está utilizando 1.9, siga leyendo.

    Referencia aqui

    Instrucciones completas:

    1. Cree un directorio para almacenar configuraciones y colóquelo de manera que se pueda encontrar en PYTHONPATH. Un ejemplo es $ AIRFLOW_HOME / config

    2. Cree archivos vacíos llamados $ AIRFLOW_HOME / config / log_config.py y $ AIRFLOW_HOME / config / __ init__.py

    3. Copie el contenido de airflow / config_templates / airflow_local_settings.py en el archivo log_config.py que acaba de crearse en el paso anterior.

    4. Personaliza las siguientes porciones de la plantilla:

       #Add this variable to the top of the file. Note the trailing slash. S3_LOG_FOLDER = 's3:///' Rename DEFAULT_LOGGING_CONFIG to LOGGING CONFIG LOGGING_CONFIG = ... Add a S3TaskHandler to the 'handlers' block of the LOGGING_CONFIG variable 's3.task': { 'class': 'airflow.utils.log.s3_task_handler.S3TaskHandler', 'formatter': 'airflow.task', 'base_log_folder': os.path.expanduser(BASE_LOG_FOLDER), 's3_log_folder': S3_LOG_FOLDER, 'filename_template': FILENAME_TEMPLATE, }, Update the airflow.task and airflow.task_runner blocks to be 's3.task' instead >of 'file.task'. 'loggers': { 'airflow.task': { 'handlers': ['s3.task'], ... }, 'airflow.task_runner': { 'handlers': ['s3.task'], ... }, 'airflow': { 'handlers': ['console'], ... }, } 
    5. Asegúrese de que se haya definido un gancho de conexión s3 en Flujo de air, según la respuesta anterior . El gancho debe tener acceso de lectura y escritura al grupo s3 definido anteriormente en S3_LOG_FOLDER.

    6. Actualice $ AIRFLOW_HOME / airflow.cfg para que contenga:

       task_log_reader = s3.task logging_config_class = log_config.LOGGING_CONFIG remote_log_conn_id =  
    7. Reinicie el servidor web y el progtwigdor de Airflow, y active (o espere) la ejecución de una nueva tarea.

    8. Verifique que los registros se muestren para las tareas recién ejecutadas en el grupo que ha definido.

    9. Verifique que el visor de almacenamiento s3 esté funcionando en la interfaz de usuario. Levanta una tarea recién ejecutada y verifica que veas algo como:

       *** Reading remote log from gs:///example_bash_operator/run_this_last/2017-10-03T00:00:00/16.log. [2017-10-03 21:57:50,056] {cli.py:377} INFO - Running on host chrisr-00532 [2017-10-03 21:57:50,093] {base_task_runner.py:115} INFO - Running: ['bash', '-c', u'airflow run example_bash_operator run_this_last 2017-10-03T00:00:00 --job_id 47 --raw -sd DAGS_FOLDER/example_dags/example_bash_operator.py'] [2017-10-03 21:57:51,264] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,263] {__init__.py:45} INFO - Using executor SequentialExecutor [2017-10-03 21:57:51,306] {base_task_runner.py:98} INFO - Subtask: [2017-10-03 21:57:51,306] {models.py:186} INFO - Filling up the DagBag from /airflow/dags/example_dags/example_bash_operator.py 

    (Actualizado a partir de Airflow 1.10.2)

    Aquí hay una solución si no usa la IU de administrador.

    Mi flujo de air no se ejecuta en un servidor persistente … (Se lanza de nuevo todos los días en un contenedor Docker, en Heroku). Sé que me estoy perdiendo muchas características excelentes, pero en mi configuración mínima, nunca toque la IU de administración o el archivo cfg. En su lugar, tengo que establecer variables de entorno específicas de Airflow en un script de bash, que reemplaza el archivo .cfg.

    flujo de air apache [s3]

    En primer lugar, necesita tener instalado el subpaquete s3 para escribir sus registros de Airflow en S3. ( boto3 funciona bien para los trabajos de Python dentro de sus DAG, pero el S3Hook depende del subpaquete s3).

    Una nota más: la instalación de Conda no maneja esto todavía , así que tengo que hacer pip install apache-airflow[s3] .

    Variables de entorno

    En un script de bash, establezco estas variables core . A partir de estas instrucciones, pero utilizando la convención de nomenclatura AIRFLOW__{SECTION}__{KEY} para las variables de entorno, hago:

     export AIRFLOW__CORE__REMOTE_LOGGING=True export AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://bucket/key export AIRFLOW__CORE__REMOTE_LOG_CONN_ID=s3_uri export AIRFLOW__CORE__ENCRYPT_S3_LOGS=False 

    ID de conexión S3

    El s3_uri anterior es un ID de conexión que s3_uri . En Airflow, corresponde a otra variable de entorno, AIRFLOW_CONN_S3_URI . El valor de eso es su ruta S3, que tiene que estar en forma de URI. Eso es

     s3://access_key:secret_key@bucket/key 

    Almacene esto, sin embargo, maneja otras variables de entorno sensibles.

    Con esta configuración, Airflow podrá escribir sus registros en S3. s3://bucket/key/dag/task_id/timestamp/1.log la ruta de s3://bucket/key/dag/task_id/timestamp/1.log .


    Apéndice sobre la actualización de Airflow 1.8 a Airflow 1.10

    Recientemente actualicé mi línea de producción de Airflow 1.8 a 1.9, y luego a 1.10. La buena noticia es que los cambios son bastante pequeños; El rest del trabajo fue simplemente descubrir matices con las instalaciones del paquete (no relacionadas con la pregunta original sobre los registros de S3).

    (1) En primer lugar, necesitaba actualizar a Python 3.6 con Airflow 1.9.

    (2) El nombre del paquete cambió de airflow de airflow a airflow de airflow apache-airflow con 1.9. También podría encontrarse con esto en su pip install .

    (3) El paquete psutil tiene que estar en un rango de versión específico para Airflow. Es posible que encuentre esto cuando esté haciendo pip install apache-airflow .

    (4) Se necesitan encabezados python3-dev con Airflow 1.9+.

    (5) Estos son los cambios sustanciales: export AIRFLOW__CORE__REMOTE_LOGGING=True ahora es obligatorio. Y

    (6) Los registros tienen una ruta ligeramente diferente en S3, que actualicé en la respuesta: s3://bucket/key/dag/task_id/timestamp/1.log .

    ¡Pero eso es todo! Los registros no funcionaron en 1.9, por lo que recomiendo ir directamente a 1.10, ahora que está disponible.

    Para completar la respuesta de Arne con las actualizaciones recientes de Airflow, no necesita configurar task_log_reader a un valor diferente al predeterminado: task

    Como si siguiera la plantilla de registro predeterminada airflow / config_templates / airflow_local_settings.py , puede ver desde este compromiso (tenga en cuenta que el nombre del controlador cambió a 's3': {'task'... lugar de s3.task ) ese es el valor en la La carpeta remota ( REMOTE_BASE_LOG_FOLDER ) reemplazará el controlador con el correcto:

     REMOTE_LOGGING = conf.get('core', 'remote_logging') if REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('s3://'): DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['s3']) elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('gs://'): DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['gcs']) elif REMOTE_LOGGING and REMOTE_BASE_LOG_FOLDER.startswith('wasb'): DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['wasb']) elif REMOTE_LOGGING and ELASTICSEARCH_HOST: DEFAULT_LOGGING_CONFIG['handlers'].update(REMOTE_HANDLERS['elasticsearch']) 

    Más detalles sobre cómo iniciar sesión en / leer desde S3: https://github.com/apache/incubator-airflow/blob/master/docs/howto/write-logs.rst#writing-logs-to-amazon-s3

    Solo una nota al margen para cualquiera que siga las instrucciones muy útiles de la respuesta anterior : Si se topa con este problema: “ModuleNotFoundError: No hay un módulo llamado ‘airflow.utils.log.logging_mixin.RedirectStdHandler'” como se hace referencia aquí (lo que sucede cuando se usa el flujo de air 1.9), la solución es simple: use más bien esta plantilla base: https://github.com/apache/incubator-airflow/blob/v1-9-stable/airflow/config_templates/airflow_local_settings.py (y siga todas las demás instrucciones en la respuesta anterior )

    La plantilla actual incubator-airflow / airflow / config_templates / airflow_local_settings.py presente en la twig maestra contiene una referencia a la clase “airflow.utils.log.s3_task_handler.S3TaskHandler”, que no está presente en apache-airflow == 1.9.0 python paquete. ¡Espero que esto ayude!

    Haz que funcione con Airflow 10 en kube. Tengo los siguientes juegos env var:

     AIRFLOW_CONN_LOGS_S3=s3://id:secret_uri_encoded@S3 AIRFLOW__CORE__REMOTE_LOGGING=True AIRFLOW__CORE__REMOTE_BASE_LOG_FOLDER=s3://xxxx/logs AIRFLOW__CORE__REMOTE_LOG_CONN_ID=logs_s3