Apache Airflow: Custom Task Triggering for Efficient Data Pipelines

Breno Jones Agrelli
gb.tech
Published in
8 min readSep 4, 2023
Photo by 夜 咔罗 on Unsplash

Introduction

Apache Airflow is an indispensable tool for orchestrating data pipelines, making it a must-know tool for any data engineer in 2023. Like any tool, Airflow has its advantages and disadvantages. While it boasts excellent built-in functionality, there are situations where custom solutions are required to address specific use cases. One of the captivating aspects of Airflow is its high level of customizability, making it a fascinating tool for data engineers like myself.

To fully comprehend the concepts and solutions presented in this article, readers are expected to have a foundational understanding of Airflow. These include familiarity with DAGs, knowledge of tasks and operators, understanding of trigger rules for task execution and basic Python programming skills. While you don’t need to be an Airflow expert, having these basics in place will enable you to follow along and explore the customization magic that Airflow has to offer.

For those new to the world of data engineering and Apache Airflow, imagine Directed Acyclic Graphs (DAGs) as a visual representation of your data pipeline’s workflow. Picture a flowchart where each box represents a task, and arrows illustrate the order in which tasks should be executed. Importantly, DAGs don’t allow cycles, meaning the tasks can’t loop back onto themselves or create circular dependencies. This ensures a clear path for data to move through the pipeline without confusion.

Yellow circles representing vertices with arrows connecting them as edges. They follow the structure of a directed acyclic graph.
A graph is formed by vertices and by edges connecting them, while a directed graph has oriented edges. A directed acyclic graph (DAG) is a type of directed graph without cycles. Source: Wikipedia

Problem Definition

Let’s begin by defining the problem we want to solve: ingesting data from an API with three endpoints. For this purpose, we will create three tasks, one for each endpoint, responsible for making HTTP requests and storing the responses in a raw format, such as a storage bucket. For illustration purposes, we will focus on the extraction phase, but keep in mind that this solution can be adapted to various scenarios, not only involving data extraction from APIs.

An ETL diagram showing, from left to right, a gear icon representing a REST API, Apache’s Airflow logo and a database management system logo with arrows connecting them left to right.
In this scenario, we will leverage Apache Airflow to orchestrate an ETL process that involves fetching data from an API and loading it into a database.

Real-world scenario: API Quota Constraint

By default, we would execute these three tasks in parallel since the endpoints are independent and can be queried individually. This parallel execution would speed up the process and eliminate inter-task dependencies.

However, let’s introduce a constraint here — imagine our API has a low quota, restricting parallel execution due to potential quota exceeding errors, leading to frequent extraction process failures. In this case, we are left with no choice but to opt for a sequential execution pattern.

To meet the requirements, we must ensure the following:

  • The execution of tasks must be independent so that the failure of one task does not impact the execution of others. That way we don’t delay the job and make debugging easier.
  • Downstream tasks should only be executed if all the extraction tasks are successful, ensuring that data processing is done with up-to-date information.

DAG Structure

The following code will give us the DAG shown right below it:

from datetime import datetime
from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.exceptions import AirflowFailException


# Define the default arguments for the DAG
default_args = {
'owner': 'airflow',
'start_date': datetime(2023, 7, 30),
}


# Create the Airflow DAG
dag = DAG('example_dag', default_args=default_args, schedule_interval=None)


# Define the task functions
def task_success(**kwargs):
task_instance = kwargs['task_instance']
print(f"Task {task_instance.task_id} succeeded.")
return True

def task_failure(**kwargs):
task_instance = kwargs['task_instance']
print(f"Task {task_instance.task_id} failed.")
raise AirflowFailException(f"Task {task_instance.task_id} failed.")


# Define the tasks
with dag:

start = DummyOperator(task_id='start')

extract_1 = PythonOperator(
task_id='extract_1',
python_callable=task_failure,
provide_context=True
)

extract_2 = PythonOperator(
task_id='extract_2',
python_callable=task_success,
provide_context=True
)

extract_3 = PythonOperator(
task_id='extract_3',
python_callable=task_success,
provide_context=True
)

transform = DummyOperator(
task_id='transform'
)

end = DummyOperator(
task_id='end'
)


# Define the dependencies
start >> extract_1 >> extract_2 >> extract_3 >> transform >> end
The DAG with the following tasks in order: start, extract_1, extract_2, extract_3, transform and end.

Our Directed Acyclic Graph (DAG) includes a start dummy task (start), three extraction tasks (extract_1, extract_2, extract_3), a transformation task (transform), and an end dummy task (end). Let’s execute this DAG as it is and see what happens. There are two python functions that force either success or error states, task_success and task_failure.

For the sake of this experiment the extract_1 task will be forced to fail, that way we can test if our solution works as expected.

The DAG with the following tasks and their states, in order: start, success, extract_1, failed, extract_2, upstream_failed, extract_3, upstream_failed, transform, upstream_failed, end, upstream_failed.

Initially executing the tasks with default trigger rules results in a failure to meet both premises. extract_2 and extract_3 can only be executed if extract_1 is successful, violating the first premise. Additionally, the incomplete execution prevents us from validating the second premise, as we require all tasks to run for a comprehensive check.

An initial attempt to resolve this involves setting the trigger_rule parameter of all three extraction tasks explicitly to “all_done.”, that way even if extract_1 fails, extract_2 would be executed and so on. Our tasks should look like this now.

    extract_1 = PythonOperator(
task_id='extract_1',
python_callable=task_failure,
provide_context=True,
trigger_rule='all_done'
)

extract_2 = PythonOperator(
task_id='extract_2',
python_callable=task_success,
provide_context=True,
trigger_rule='all_done'
)

extract_3 = PythonOperator(
task_id='extract_3',
python_callable=task_success,
provide_context=True,
trigger_rule='all_done'
)

Let’s execute the DAG once more and check the results:

The DAG with the following tasks and their states, in order: start, success, extract_1, failed, extract_2, success, extract_3, success, transform, success, end, success.

Well, although this modification addresses the first premise by allowing tasks to proceed independently, it falls short on the second premise. In this case, the transformation task relies solely on the state of extract_3, disregarding the status of extract_1 and extract_2.

Unfortunately, the built-in functionality of Airflow proves insufficient to tackle this challenge. We need to create our own tailored solution, one that can meet both premises and deliver us the result we want.

Custom Solution

To address the problem and fulfill the desired outcome, we need a function to check the states of specific task instances and halt the job if any task fails or proceed with downstream tasks if all extractions are successful.

For this purpose, we can create a Python function called check_tasks_state using the PythonOperator class. The function verifies the states of Airflow tasks with a given task prefix within the context of a specific DAG run. The DAG run provides essential information about the ongoing execution. The function checks if all tasks with the specified prefix have successfully completed. If any task fails, it raises an AirflowFailException to indicate the failure.

def check_tasks_state(task_prefix, **context):
"""
This function is used to check the states of Airflow tasks with a given task prefix within the context of a specific DAG run.

Parameters:
task_prefix (str): The prefix of the task IDs to check.
**context (dict): Additional keyword arguments (context) passed to the function. It should contain the "dag_run" key
referring to the current DAG run instance.
Returns:
bool: True if all tasks with the specified prefix have successfully completed; otherwise, raises an AirflowFailException.

Raises:
AirflowFailException: If any of the tasks with the specified prefix did not complete successfully.
"""
# Retrieve the 'dag_run' object from the context, which contains information about the current DAG run.
dag_run = context["dag_run"]
# Iterate over the task instances of the DAG run to check their states.
for task_instance in dag_run.get_task_instances():
# Extract the prefix of the current task's ID.
_task_prefix = task_instance.task_id.split("_")[0]
# Check if the task's ID matches the provided 'task_prefix'.
if task_prefix == _task_prefix:
# If the task is found with a matching prefix, check if it has a state of "success".
if task_instance.state == "success":
# If the task is successful, move on to the next task without doing anything.
pass
else:
# If the task has not completed successfully, raise an 'AirflowFailException' to indicate the failure.
raise AirflowFailException(f"Task {task_instance.task_id} failed.")
# If all tasks with the specified prefix have completed successfully, return True.
return True

To integrate the custom function into the DAG, we can add an intermediate task called check between extract_3 and transform. The check task calls the check_tasks_state function and acts as a checkpoint to ensure downstream tasks proceed only when all “extract” tasks have successfully completed.

    check = PythonOperator(
task_id='check',
python_callable=check_tasks_state,
op_args=["extract"],
provide_context=True
)

After that, we need to add this task in our dependencies.

start >> extract_1 >> extract_2 >> extract_3 >> check >> transform >> end

Below is the DAG execution using this solution.

The DAG with the following tasks and their states, in order: start, success, extract_1, failed, extract_2, success, extract_3, success, check, failed, transform, upstream_failed, end, upstream_failed.

Now we have a check task that fails whenever any of the extraction tasks fails. Using this customized solution, we achieve the desired outcome that meets both premises — tasks are executed independently, and downstream tasks are only executed when all extraction tasks are successful.

Here’s another try with all tasks having a successful state.

The DAG with the following tasks and their states, in order: start, success, extract_1, failed, extract_2, success, extract_3, success, check, success, transform, success, end, success.

The final code can be found here.

Conclusion

In conclusion, Apache Airflow offers a powerful framework for orchestrating data pipelines, and with its flexibility, we can craft custom solutions to address specific challenges. By incorporating a custom task using the PythonOperator, we can efficiently manage task execution and ensure robust data processing while meeting requirements and constraints.

It is essential to recognize that this script serves as a foundational example that can be further customized and extended to address various use cases and challenges. Data engineers can adapt the code to include additional tasks, implement different task dependencies, or even integrate external services to enrich the pipeline’s capabilities. For instance, the check_tasks_state function can be modified to accommodate more complex validation logic, such as checking specific task outputs or dynamically changing the task execution order based on external events.

Moreover, this approach is not limited to data extraction scenarios. Data transformation, loading, and even complex workflow orchestration can benefit from similar custom task triggering techniques. Data engineers can leverage Airflow’s vast ecosystem of operators and sensors to integrate with diverse data sources, cloud platforms, or other tools to create comprehensive and sophisticated data processing pipelines.

Follow me on LinkedIn: https://www.linkedin.com/in/bjagrelli/

--

--

Breno Jones Agrelli
gb.tech
Writer for

Data Engineer, martial arts nerd and heavy metal connoisseur.