Airflow Setup and Teardown Tasks

Hussein Awala
Apache Airflow
Published in
7 min readAug 21, 2023
Photo by Towfiqu barbhuiya on Unsplash

In the realm of big data, most data workflows require the creation and configuration of certain infrastructure resources before they commence. Additionally, these resources need to be reconfigured and cleaned up upon the termination of their execution. This necessity has consistently posed a challenge for data engineers, as the schedulers commonly used do not offer an easy way to implement these tasks.

A Real-World Example

Consider the scenario of Spark ETLs (Extract, Transform, Load pipelines) running on Google Cloud Platform’s DataProc service. GCP suggests the creation of job-scoped ephemeral clusters for executing these tasks by initiating a specific job and shutting down upon the job’s completion.

The DataProc service is optimized to create clusters of this type in approximately 90 seconds. This speed signifies that opting for a dedicated cluster per job proves to be cost-efficient, exceptionally scalable, and boasts minimal latency. Furthermore, users have the flexibility to generate and utilize a cluster for a collection of jobs that share similar characteristics, thus mitigating execution latency effectively.

Airflow Before AIP-52 Implementation

Airflow stands out as the most extensively employed scheduler within the data realm. Most GCP users opt for Airflow to orchestrate their workflows, primarily due to GCP’s provision of a fully managed workflow orchestration service known as Cloud Composer, built on Apache Airflow.

Despite Airflow’s support for different types of callback functions at DAG and task levels, creating workflows akin to ephemeral DataProc clusters has historically presented a challenge. In Airflow, these callbacks lack a transparent state, a built-in retry mechanism and are not conducive to easy result utilization or interaction with other tasks. Users were faced with diverse alternatives for implementing such workflows.

Photo by Amy Elting on Unsplash

Utilizing Standard Tasks:

The initial approach involved the creation and subsequent deletion of the cluster using regular tasks:

# a cluster per dag
create_cluster >> run_job1 >> run_job2 >> run_job3 >> delete_cluster

# a cluster per jobs group
(
create_cluster1 >> run_job1 >> delete_cluster_1
) >> (
create_cluster2 >> run_job2 >> run_job3 >> delete_cluster_2
)

This solution appears ideal in scenarios where an assumption is made that jobs will invariably conclude in a successful state, a technically implausible presumption. However, when accommodating failure scenarios, the solution becomes overly intricate.

To address failures, we had to set up the delete_cluster task as a downstream task for each job task, to run it after all jobs, regardless of their state. This is feasible in cases where the DAG’s dependency structure isn’t complex, allowing for the inclusion of all job tasks as upstream dependencies for the delete_cluster task. Subsequently, the trigger rule for the delete_cluster task is adjusted to ALL_DONE.

A secondary challenge emerges concerning the re-execution of the create_cluster task following the resolution of an issue, mainly when the job task state is cleared. The resolution entailed manual re-execution of the task either through the user interface (UI) or the command-line interface (CLI), followed by waiting for it before clearing the job task state along with its downstream tasks to resume execution.

Utilizing DAG/Task Callbacks:

The alternative approach involved leveraging the on_execute_callback to create the cluster and employing on_success_callback and on_failure_callback to handle cluster deletion:

def create_cluster(context):
# create DataProc cluster

def delete_cluster(context):
# delete DataProc cluster

# a cluster per dag
DAG(
...,
on_execute_callback=create_cluster,
on_success_callback=delete_cluster,
on_failure_callback=delete_cluster,
):
run_job1 >> run_job2 >> run_job3

# a cluster per jobs group
# a cluster for job1 and a cluster for job 2 and 3
def create_cluster_if_not_exist(context):
# check if the DataProc cluster exists and create it if not

DAG(
...,
):
run_job1 = RunJob(
...,
on_execute_callback=create_cluster,
on_success_callback=delete_cluster,
on_failure_callback=delete_cluster,
)

run_job2 = RunJob(
...,
on_execute_callback=create_cluster,
on_failure_callback=delete_cluster,
)

run_job3 = RunJob(
...,
on_execute_callback=create_cluster_if_not_exist,
on_success_callback=delete_cluster,
on_failure_callback=delete_cluster,
)

run_job1 >> run_job2 >> run_job3

In simpler scenarios, this approach might suffice to fulfill the requirements. However, outlining the logic graph for the callback methods can prove challenging.

This solution comes with certain limitations. Failures in the callback methods don’t halt task execution; they’re considered “soft fails.” Consequently, ensuring cluster availability before execution isn’t guaranteed.

Additionally, the on_execute_callback and on_success_callback are regarded as integral to the task, potentially affecting task monitoring due to the inclusion of their execution times in the job’s execution duration.

Incorporating Cluster Management Code within the Job Task:

In this approach, we directly integrate the creation and deletion of clusters into the job operator’s execute method. By doing so, if the method encounters a failure, the task itself fails. However, caution must be exercised during cluster deletion to prevent triggering a complete task retry in case of failure:

class DataProcJob(BaseOperator):
def execute(context):
try:
create_cluster()
run_job()
finally:
# to avoid failing the task when delete cluster fails
# we need to run it in a loop with try except pass statement
for i in range(max_delete_attemps):
try:
delete_cluster_if_exists()
break
except:
pass

def on_kill():
# if the task is forcelly killed, we need to delete the cluster
for i in range(max_delete_attemps):
try:
delete_cluster_if_exists()
break
except:
pass

This solution makes the logic within the operators more intricate, and it still faces the same challenge regarding task monitoring as the second option.

Airflow 2.7.0 and AIP-52

An exciting advancement was introduced in Airflow 2.7.0 through AIP-52 (Airflow Improvement Proposals). This enhancement centers around the support for automatic setup and teardown tasks.

This new feature enables the definition of setup and teardown tasks for a set of tasks. Airflow takes charge of executing the setup task before initiating these tasks and executes the teardown task after all tasks have terminated (irrespective of their individual states). Moreover, when a task’s state is cleared, Airflow also clears the states of its associated setup and teardown tasks. This ensures that the task is not executed before its prerequisites are arranged, and it’s not terminated without the necessary clean-up of the environment.

Example:

DAG graph

In the given example, Airflow executes the setup_task before executing the remaining tasks, given that it serves as an upstream task for all others. Upon the completion of tasks t1, t2, and t3 with various end states (such as success, failure, skipping, or encountering upstream failure), Airflow proceeds to execute the teardown_task. Additionally, if any of the state of the mentioned tasks (t1, t2, or t3 ) is cleared, Airflow will clear the state of their associated setup and teardown tasks. It will then execute the setup task before the cleared task, even if it isn’t a direct upstream task (note: there is a known issue in this scenario, which will be resolved in version 2.7.1).

DataProc DAG code:

In the following code, I implement a method to create a setup and a teardown task for each DataprocSubmitJobOperator task, and I use this method to configure three jobs:

DataProc DAG with setup/teardown tasks

The introduced feature also extends support to dynamic tasks and dynamic task groups. To illustrate, we can restructure the initial DAG by substituting the method previously employed for task creation with a task group. Subsequently, we can initialize a task group and utilize the expand_kwargs method to dynamically generate clusters and submit jobs.

DataProc DAG with setup/teardown tasks and dynamic task group

Contrasting with Previous Solutions:

Comparing the approach involving setup and teardown tasks with the initial three methods, we notice resemblances to the first approach we discussed. Here, the setup and teardown tasks serve as Airflow tasks, but a notable distinction lies in eliminating the requirement to create branching structures for the delete cluster task across all job tasks. Additionally, there’s no need to manually adjust its state to ALL_DONE. Moreover, the create cluster task is triggered whenever an issue is resolved, and the job task is reset.

On the other hand, when compared to the second and third solutions, any failure within the setup task functions as an impediment to the job tasks. In the monitoring context, the setup and job tasks operate independently, facilitating the individual customization of retries, timeouts, and notifications for each. Regarding the teardown task, the choice of whether a DagRun should be marked as a failure upon encountering issues in the teardown task can be dictated by the on_failure_fail_dagrun parameter.

Other use cases:

  • DynamoDB Scaling: Before a batch ingestion task, increase the DynamoDB write capacity to accommodate the incoming data load. After the task concludes, revert the capacity to its original state.
  • Ephemeral Cluster Management: Analogous to the GCP DataProc case, it’s common to create temporary AWS EMR and Databricks clusters before running spark batch jobs. Once the desired results are obtained, these clusters are promptly removed.
  • ML Model Training Infrastructure: Before initiating the machine learning model training, provision virtual machines equipped with GPU instances. Following the training job’s completion, scale down the resources to optimize utilization.

Summary:

In the world of big data workflows, managing pipeline jobs infrastructure has been a challenge. In Airflow, initial solutions involved creating and deleting infrastructure resources through regular tasks or callbacks, which had drawbacks. Airflow 2.7.0 and AIP-52 changed the game. Automatic setup and teardown tasks were introduced, streamlining infrastructure management and task execution. This combines the benefits of the old solutions without complexities. The setup and teardown tasks adapt to issues and are flexible for retries, timeout, and notifications.

--

--

Hussein Awala
Apache Airflow

Sr. Data Engineer @Voodoo & Apache Airflow Committer & PMC member