Advancing Apache Airflow Workflow Schedules

Irem Ertürk
4 min readFeb 7, 2022

--

By Google's definition “Apache Airflow is a workflow management platform for data engineering pipelines.” This amazing tool lets us create pipelines for varying complexities, therefore in this blog post, I will be deep dive into concepts that give more control over defining data intervals and workflow schedules within Apache Airflow for different needs.

In Airflow, “DAG” is responsible to ensure that the tasks associated with it are running at the right time and in the right order. On the other hand, “DAG Run” is an object which represents the instance of a workflow at a time.

How scheduler decides when to schedule DAG runs?

DAG runs are scheduled mainly based on the two parameters, schedule_interval and start_date. Whenever the DAG object is created, the schedule_interval is defined either implicitly or explicitly. The default value schedule_interval is one day which means our new DAG Runs will be scheduled each day starting from the start_date. We could also use the end_date variable to specify when the DAG scheduler stops scheduling new DAG Runs. (It is also worth mentioning here, it is suggested to set a static start_date value rather than a dynamic one starting from v1.8.0. Further details regarding the dynamic start date can be found here)

Catchup and execution_date

Until now we have schedule_interval and star_date and end_date to schedule, DAG runs. In theory, these three variables are already giving a good sense of control to manage our workflow and series of DAG runs (especially while working with bulk datasets).

But wait.., what happens, when start_date is referencing the earlier time than the actual running time of the DAG Run? In Airflow, such cases are handled with the Catchup concept. If catchup is enabled on the DAG level, the scheduler executes a new DAG Run for each interval that has been not run at the time of trigger.

Image 1: Example schedule plans of DAG Runs with Catchup enabled and disabled

This allows us to run workflows based on time intervals and keep our task definitions atomic, therefore the catchup concept is especially valuable when we are working with atomic datasets that can easily be split into specific time frames.

When a workflow starts by triggering UI manually or by the scheduler, the catchup concept might start multiple DAG runs automatically for different time intervals (check Image 1, Catchup=enabled). Then … how we will differentiate these DAG Runs, as their start_date become the same value? If you check the Airflow UI -> Browse -> DAG Runs, there is a table with a column execution_date (or in another name logical_date). The execution_date is in the context of schedule intervals and represents the start time of the corresponding schedule interval. Therefore, this variable can be used to differentiate the DAG runs running at the same time.

I know things get complicated but, let's consider a really simple DAG example and see how Airflow handles scheduling DAG Runs.

In the code snippet below, thestart_date is set to the first day of 2019, schedule_interval is set to the yearly interval, and catchup is set to True. Whenever the workflow is triggered by the scheduler, the scheduler will create three different DAG Runs, and four different DAG Runs if manually triggered through UI.

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
def print_hello():
print("Hello world!")
default_args = {
'owner': 'irem.ertuerk',
'start_date': datetime(2019, 1, 1),
}
dag = DAG('dag-example',
description='Simple Dag Example',
schedule_interval="@yearly",
default_args = default_args,
catchup=True)
dummy_operator = DummyOperator(task_id='dummy_task',
retries=3, dag=dag)
hello_operator = PythonOperator(task_id='hello_task',
python_callable=print_hello, dag=dag)
dummy_operator >> hello_operator

As highlighted in Image 1, the details of each DAG run include the Logical Date (aka Execution Date), Start Date, and End Date information. Both start date and end date represent the actual time of execution. On the other hand, the logical date is representing the time that DAG Run is expected to be executed.

Image 1: DAG Run Details

As we have specified start_date to 2019 in our DAG object, whenever triggering the first DAG Run in 2022, the scheduler realizes that the DAG Runs are missing for 2019, 2020, and 2021 and full-fill them and set corresponding logical dates to each run.

It is important to highlight that, as seen in the image, the scheduler scheduled three DAG Runs for 2019, 2020, and 2021. You might ask the question “why there is no DAG Run for 2022?” as we are already in passed the interval start which was 2022–01–01. Airflow is designed for ETL pipelines, therefore it makes sense to run a workflow for 2022 when all of the data is collected which means at the end schedule interval and basically just before 2023. Therefore the DAG Run for 2022 should be scheduled on 2023–01–01 00:00.

Image 2: Details of first DAG Run corresponding to 2019

As seen in Image 2, the data interval start and end are defined by the scheduler and it does not necessarily correlate with the real start and end time specified under UTC.

Hope this article helps you to understand how Airflow schedules different DAG Runs. Feel free to share your feedback and thoughts!

PS: In the next blog post, I will be explaining Task Communication and accessing Airflow parameters by Jinja Templating. Stay Tuned!

References

--

--