Flujo de air no progtwigdo correctamente Python

Código:

Python versión 2.7.xy versión de flujo de air 1.5.1

mi script dag es este

from airflow import DAG from airflow.operators import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'xyz', 'depends_on_past': False, 'start_date': datetime(2015,10,13), 'email': ['xyz@email.in'], 'schedule_interval':timedelta(minutes=5), 'email_on_failure': True, 'email_on_retry': True, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG('testing', default_args=default_args) run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag) for i in range(5): t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag) t.set_upstream(run_this_first) 

Desde allí puede ver que estoy creando un DAG con 6 tareas, la primera tarea (Inicio1) comienza primero, después de lo cual comienzan las otras cinco tareas.

Actualmente he dado 5 minutos de retraso entre el inicio de DAG

    Se ha ejecutado perfectamente para las seis tareas del primer tipo, pero después de cinco minutos, el DAG no se reinicia.

    Ha pasado más de 1 hora, pero el DAG no se reinicia. Realmente no sé si estoy equivocado.

    Sería realmente bueno si alguien pudiera señalarme qué es lo que está mal. Intenté limpiar utilizando airflow testing clear para que sucediera lo mismo. Se ejecutó en primer lugar y luego me quedé allí.

    Lo único que muestra la línea de comandos es Getting all instance for DAG testing

    Cuando cambio la posición de schedule_interval, simplemente se ejecuta sin ningún intervalo de progtwigción paralelo. Es decir, en 5 minutos se completó 300 o más instancias de tareas. No hay un intervalo de 5 minutos

    Código 2:

     from airflow import DAG from airflow.operators import BashOperator from datetime import datetime, timedelta default_args = { 'owner': 'xyz', 'depends_on_past': False, 'start_date': datetime(2015,10,13), 'email': ['xyz@email.in'], 'email_on_failure': True, 'email_on_retry': True, 'retries': 1, 'retry_delay': timedelta(minutes=5), } dag = DAG('testing',schedule_interval=timedelta(minutes=5),default_args=default_args)#Schedule here run_this_first = BashOperator(task_id='Start1',bash_command='date', dag=dag) for i in range(5): t = BashOperator(task_id="Orders1"+str(i), bash_command='sleep 5',dag=dag) t.set_upstream(run_this_first) 

    Para el Código 2, supongo que la razón por la que se ejecuta cada minuto es:

    1. La hora de inicio es 2015-10-13 00:00.

    2. El intervalo de progtwigción es de 5 minutos.

    3. Cada latido del progtwigdor (5 segundos de forma predeterminada), se comprobará su DAG

      • Primera comprobación: fecha de inicio (no se encontró la última fecha de ejecución) + intervalo del progtwigdor <¿hora actual? Si es así, se ejecutará el DAG y se registrará el último tiempo de ejecución. (Ej. 2015-10-13 00:00 + 5min <¿actual?)
      • Segunda comprobación del siguiente latido: último tiempo de ejecución + intervalo del progtwigdor <¿hora actual? Si es así, el DAG se ejecutará de nuevo.
      • ….

    La solución es configurar la datetime.now() - schedule_interval del DAG como datetime.now() - schedule_interval .

    Y también si quieres depurar:

    1. Configurando el LOGGINGLEVEL para debug en settings.py

    2. Modificar el método de clase is_queueable() de airflow.models.TaskInstance para

    :

     def is_queueable(self, flag_upstream_failed=False): logging.debug('Checking whether task instance is queueable or not!') if self.execution_date > datetime.now() - self.task.schedule_interval: logging.debug('Too early to execute: execution_date {0} + task.schedule_interval {1} > datetime.now() {2}'.format(self.execution_date, self.task.schedule_interval, datetime.now())) return False ... 

    Debido a que la hora de inicio (2015-10-13 00:00) es menos que la hora actual, se activa el relleno de flujo de air. Se ejecutará del 2015-10-13 00:00 cuando cada segundo que detecte el progtwigdor de flujo de air (su Fecha de inicio), pero la Fecha de ejecución esté entre 5 min (tiempo de intervalo de tarea).

    Ver el nombre del registro:

     $tree airflow/logs/testing/ testing/ |-- Orders10 | |-- 2015-10-13T00:00:00 | |-- 2015-10-13T00:05:00 | -- 2015-10-13T00:10:00 |-- Orders11 | |-- 2015-10-13T00:00:00 | |-- 2015-10-13T00:05:00 | -- 2015-10-13T00:10:00 |-- Orders12 | |-- 2015-10-13T00:00:00 | |-- 2015-10-13T00:05:00 | -- 2015-10-13T00:10:00 |-- Orders13 | |-- 2015-10-13T00:00:00 | |-- 2015-10-13T00:05:00 | -- 2015-10-13T00:10:00 |-- Orders14 | |-- 2015-10-13T00:00:00 | |-- 2015-10-13T00:05:00 | -- 2015-10-13T00:10:00 -- Start1 |-- 2015-10-13T00:00:00 |-- 2015-10-13T00:05:00 |-- 2015-10-13T00:10:00 -- 2015-10-13T00:15:00 

    Ver el tiempo de creación de registros:

     $ll airflow/logs/testing/Start1 -rw-rw-r-- 1 admin admin 4192 Nov 9 14:50 2015-10-13T00:00:00 -rw-rw-r-- 1 admin admin 4192 Nov 9 14:50 2015-10-13T00:05:00 -rw-rw-r-- 1 admin admin 4192 Nov 9 14:51 2015-10-13T00:10:00 -rw-rw-r-- 1 admin admin 4192 Nov 9 14:52 2015-10-13T00:15:00 

    Además, puede ver las instancias de tareas en la interfaz de usuario web:

    Instancias de tareas de flujo de aire