Accurately Measuring DAG Run Time in Apache Airflow

A critical path approach

Ali Masri
Towards Data Engineering
4 min readJan 16, 2024

--

Photo by Djim Loic on Unsplash

Overview

As data engineers, we often rely on metrics to optimize and troubleshoot our data pipelines, and there’s no metric more fundamental than the run time of our workflows. When using Apache Airflow, the conventional wisdom suggests measuring a DAG’s run time by simply subtracting its `start_date` from its `end_date`. However, this approach paints an incomplete picture, plagued by inaccuracies due to task retries, execution delays, and parallel processing. In this article, we will present a solution that captures the true performance of our pipelines.

Shortcomings of the (end_date — start_date) Approach

  1. Manual Task Restart: Airflow allows users to manually retry or restart tasks. If a task is manually restarted long after the DAG’s initial run, it will unjustly extend the perceived run time of the DAG.
  2. Delayed Task Execution: Tasks might not execute immediately due to the unavailability of executors or delays in the scheduler. Such delays contribute to the `start_date` and `end_date` being farther apart than the true task runtime.
  3. Time Zone Complications: Airflow timestamps are timezone-aware. Depending on how your Airflow instance is configured, discrepancies could arise from timezone conversions, leading to erroneous run time calculations.
  4. Upstream Failures and Clear Downs: A task might execute successfully but later be cleared to re-run due to upstream task failure or data issues. The subsequent runs would add more time to the duration.
  5. External Factors: Issues such as network latency, external system downtime, or data availability can cause delays between tasks that do not reflect the actual computational time of the DAG.

To overcome these limitations, we need a calculation that can reflect the DAG’s computational complexity more faithfully by considering the DAG’s critical path rather than just wall-clock time.

Introducing the `dag_runtime` Function

The `dag_runtime` function provides a better approximation of the DAG run time by focusing on the critical path rather than the wall-clock time from the DAG’s start to end:

from airflow.models.dagrun import DagRun

def dag_runtime(dag_run, roots) -> int:
def node_runtime(task) -> int:
# Get the list of downstream tasks
children = task.downstream_list
# Obtain the duration of the task itself
my_duration = dag_run.get_task_instance(task.task_id).duration
# For tasks with no recorded duration, default to zero
if my_duration is None:
my_duration = 0
# Base case: if no children, the duration is of the task itself
if not children:
return my_duration
# Recursive case: add the task's duration to the max duration of children
return my_duration + max([node_runtime(child) for child in children])
# Handle case with no root tasks
if not roots:
return 0
# Calculate run time based on the critical path starting from root tasks
return max([node_runtime(root) for root in roots])

How does the `dag_runtime` Function Address the Shortcomings?

  1. Manual Task Restart: The function uses the duration of task instances directly, which does not get impacted by significant time gaps caused by manual restarts after the DAG has completed.
  2. Delayed Task Execution: Since we’re looking at the durations of individual tasks and their critical sequence, the delays in scheduling do not misrepresent the actual computational time spent on the DAG.
  3. Time Zone Differences: The task instance durations are calculated within the context of their individual executions, mitigating the impact of any timezone-related issues.
  4. Upstream Failures and Clear Downs: By considering the individual task runtime, the function better isolates the effect of task re-runs due to upstream issues.
  5. External Factors: The function focuses on active compute time, which means that it accounts only for the time the tasks are actually running and does not consider external wait times.

Usage: Implementing `dag_runtime` in Your Airflow Task

To leverage the `dag_runtime` function in your Airflow pipeline, you can create a custom task that calculates the DAG run time based on the longest path. The following example demonstrates how to define a task using the `@task` decorator and utilize `dag_runtime` within it:

from airflow.decorators import task

@task
def notify_success(*args, **context):
dag = context["dag"]
dag_run = DagRun(dag_id=dag.dag_id, run_id=context["run_id"])
total_duration_by_tasks = dag_runtime(dag_run, dag.roots)
print(f"The total DAG run time by critical path is: {total_duration_by_tasks} seconds")

Place this custom task at the end of your DAG to execute after all previous tasks complete, ensuring it captures the entire DAG run time. When you run your actual Airflow DAG, you’ll find the run time output logged for your review and analysis, assisting in more accurate performance monitoring.

Conclusion

In conclusion, accurately gauging the run time of DAGs in Apache Airflow is essential for optimization and efficient resource allocation. This article introduced a method that calculates DAG run time based on the critical path. This technique ensures that parallel task execution, manual interventions, and external delays are accounted for, reflecting a more precise measure of a DAG’s true execution time.

By embedding the `dag_runtime` function into your Airflow tasks, you gain a valuable metric that assists in identifying performance bottlenecks and improving the scheduling of data workflows. With this enhanced level of clarity, you can ensure your data engineering efforts are both effective and efficient, leading to more reliable and timely data insights.

Implement this run time measurement in your Airflow pipelines, and you’ll be equipped to drive your data engineering processes with newfound accuracy and control.

--

--