Advancing Apache Airflow Workflow Schedules
By Google's definition “Apache Airflow is a workflow management platform for data engineering pipelines.” This amazing tool lets us create pipelines for varying complexities, therefore in this blog post, I will be deep dive into concepts that give more control over defining data intervals and workflow schedules within Apache Airflow for different needs.
In Airflow, “DAG” is responsible to ensure that the tasks associated with it are running at the right time and in the right order. On the other hand, “DAG Run” is an object which represents the instance of a workflow at a time.
How scheduler decides when to schedule DAG runs?
DAG runs are scheduled mainly based on the two parameters, schedule_interval
and start_date
. Whenever the DAG object is created, the schedule_interval
is defined either implicitly or explicitly. The default value schedule_interval
is one day which means our new DAG Runs will be scheduled each day starting from the start_date
. We could also use the end_date
variable to specify when the DAG scheduler stops scheduling new DAG Runs. (It is also worth mentioning here, it is suggested to set a static start_date
value rather than a dynamic one starting from v1.8.0. Further details regarding the dynamic start date can be found here)
Catchup and execution_date
Until now we have schedule_interval
and star_date
and end_date
to schedule, DAG runs. In theory, these three variables are already giving a good sense of control to manage our workflow and series of DAG runs (especially while working with bulk datasets).
But wait.., what happens, when start_date
is referencing the earlier time than the actual running time of the DAG Run? In Airflow, such cases are handled with the Catchup concept. If catchup is enabled on the DAG level, the scheduler executes a new DAG Run for each interval that has been not run at the time of trigger.
This allows us to run workflows based on time intervals and keep our task definitions atomic, therefore the catchup concept is especially valuable when we are working with atomic datasets that can easily be split into specific time frames.
When a workflow starts by triggering UI manually or by the scheduler, the catchup concept might start multiple DAG runs automatically for different time intervals (check Image 1, Catchup=enabled). Then … how we will differentiate these DAG Runs, as their start_date
become the same value? If you check the Airflow UI -> Browse -> DAG Runs, there is a table with a column execution_date
(or in another name logical_date
). The execution_date
is in the context of schedule intervals and represents the start time of the corresponding schedule interval. Therefore, this variable can be used to differentiate the DAG runs running at the same time.
I know things get complicated but, let's consider a really simple DAG example and see how Airflow handles scheduling DAG Runs.
In the code snippet below, thestart_date
is set to the first day of 2019, schedule_interval
is set to the yearly interval, and catchup
is set to True. Whenever the workflow is triggered by the scheduler, the scheduler will create three different DAG Runs, and four different DAG Runs if manually triggered through UI.
from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperatordef print_hello():
print("Hello world!")default_args = {
'owner': 'irem.ertuerk',
'start_date': datetime(2019, 1, 1),
}dag = DAG('dag-example',
description='Simple Dag Example',
schedule_interval="@yearly",
default_args = default_args,
catchup=True)dummy_operator = DummyOperator(task_id='dummy_task',
retries=3, dag=dag)hello_operator = PythonOperator(task_id='hello_task',
python_callable=print_hello, dag=dag)dummy_operator >> hello_operator
As highlighted in Image 1, the details of each DAG run include the Logical Date (aka Execution Date), Start Date, and End Date information. Both start date and end date represent the actual time of execution. On the other hand, the logical date is representing the time that DAG Run is expected to be executed.
As we have specified start_date
to 2019 in our DAG object, whenever triggering the first DAG Run in 2022, the scheduler realizes that the DAG Runs are missing for 2019, 2020, and 2021 and full-fill them and set corresponding logical dates to each run.
It is important to highlight that, as seen in the image, the scheduler scheduled three DAG Runs for 2019, 2020, and 2021. You might ask the question “why there is no DAG Run for 2022?” as we are already in passed the interval start which was 2022–01–01. Airflow is designed for ETL pipelines, therefore it makes sense to run a workflow for 2022 when all of the data is collected which means at the end schedule interval and basically just before 2023. Therefore the DAG Run for 2022 should be scheduled on 2023–01–01 00:00.
As seen in Image 2, the data interval start and end are defined by the scheduler and it does not necessarily correlate with the real start and end time specified under UTC.
Hope this article helps you to understand how Airflow schedules different DAG Runs. Feel free to share your feedback and thoughts!
PS: In the next blog post, I will be explaining Task Communication and accessing Airflow parameters by Jinja Templating. Stay Tuned!
References
- Airflow Offical Documentation
+ FAQ , +Concepts +DAG Run +Template References - Other blog-posts
+ Apache Airflow Tips and Best Practices
+Airflow Schedule Interval 101