<?xml version="1.0" encoding="UTF-8"?><rss xmlns:dc="http://purl.org/dc/elements/1.1/" xmlns:content="http://purl.org/rss/1.0/modules/content/" xmlns:atom="http://www.w3.org/2005/Atom" version="2.0" xmlns:cc="http://cyber.law.harvard.edu/rss/creativeCommonsRssModule.html">
    <channel>
        <title><![CDATA[Stories by Breno Jones Agrelli on Medium]]></title>
        <description><![CDATA[Stories by Breno Jones Agrelli on Medium]]></description>
        <link>https://medium.com/@bjagrelli?source=rss-a129336de44f------2</link>
        <image>
            <url>https://cdn-images-1.medium.com/fit/c/150/150/1*lbAOFGXE-8mF9lbuRmAzwA.jpeg</url>
            <title>Stories by Breno Jones Agrelli on Medium</title>
            <link>https://medium.com/@bjagrelli?source=rss-a129336de44f------2</link>
        </image>
        <generator>Medium</generator>
        <lastBuildDate>Sat, 30 May 2026 02:26:25 GMT</lastBuildDate>
        <atom:link href="https://medium.com/@bjagrelli/feed" rel="self" type="application/rss+xml"/>
        <webMaster><![CDATA[yourfriends@medium.com]]></webMaster>
        <atom:link href="http://medium.superfeedr.com" rel="hub"/>
        <item>
            <title><![CDATA[Apache Airflow: Custom Task Triggering for Efficient Data Pipelines]]></title>
            <link>https://medium.com/gbtech/apache-airflow-custom-task-triggering-for-efficient-data-pipelines-7fd6f563129e?source=rss-a129336de44f------2</link>
            <guid isPermaLink="false">https://medium.com/p/7fd6f563129e</guid>
            <category><![CDATA[python]]></category>
            <category><![CDATA[data-engineering]]></category>
            <category><![CDATA[data-pipeline]]></category>
            <category><![CDATA[data-orchestration]]></category>
            <category><![CDATA[apache-airflow]]></category>
            <dc:creator><![CDATA[Breno Jones Agrelli]]></dc:creator>
            <pubDate>Mon, 04 Sep 2023 12:38:23 GMT</pubDate>
            <atom:updated>2025-06-18T17:50:48.862Z</atom:updated>
            <content:encoded><![CDATA[<figure><img alt="Imagem com fundo azul escuro à esquerda e salmão claro à direita. No lado esquerdo, a frase “Apache Airflow: Custom Task Triggering for Efficient Data Pipelines” em branco e a logo do Grupo Boticário com seu símbolo em tons de azul e laranja. No lado direito, um conjunto de tubulações e válvulas industriais em tons de laranja e cinza." src="https://cdn-images-1.medium.com/max/1024/1*0ccCHQqEoFCnFHvaDu2_pw.png" /><figcaption>Photo by <a href="https://unsplash.com/@karosu?utm_source=medium&amp;utm_medium=referral">夜 咔罗</a> on <a href="https://unsplash.com?utm_source=medium&amp;utm_medium=referral">Unsplash</a></figcaption></figure><h3>Introduction</h3><p>Apache Airflow is an indispensable tool for orchestrating data pipelines, making it a must-know tool for any data engineer in 2023. Like any tool, Airflow has its advantages and disadvantages. While it boasts excellent built-in functionality, there are situations where custom solutions are required to address specific use cases. One of the captivating aspects of Airflow is its high level of customizability, making it a fascinating tool for data engineers like myself.</p><p>To fully comprehend the concepts and solutions presented in this article, readers are expected to have a foundational understanding of Airflow. These include familiarity with DAGs, knowledge of tasks and operators, understanding of trigger rules for task execution and basic Python programming skills. While you don’t need to be an Airflow expert, having these basics in place will enable you to follow along and explore the customization magic that Airflow has to offer.</p><p>For those new to the world of data engineering and Apache Airflow, imagine Directed Acyclic Graphs (DAGs) as a visual representation of your data pipeline’s workflow. Picture a flowchart where each box represents a task, and arrows illustrate the order in which tasks should be executed. Importantly, DAGs don’t allow cycles, meaning the tasks can’t loop back onto themselves or create circular dependencies. This ensures a clear path for data to move through the pipeline without confusion.</p><figure><img alt="Yellow circles representing vertices with arrows connecting them as edges. They follow the structure of a directed acyclic graph." src="https://cdn-images-1.medium.com/max/1024/1*HiyWshYzIlcRYGONZteD7w.png" /><figcaption>A graph is formed by vertices and by edges connecting them, while a directed graph has oriented edges. A directed acyclic graph (DAG) is a type of directed graph without cycles. <strong>Source: </strong><a href="https://en.wikipedia.org/wiki/Directed_acyclic_graph"><strong>Wikipedia</strong></a></figcaption></figure><h3>Problem Definition</h3><p>Let’s begin by defining the problem we want to solve: ingesting data from an API with three endpoints. For this purpose, we will create three tasks, one for each endpoint, responsible for making HTTP requests and storing the responses in a raw format, such as a storage bucket. For illustration purposes, we will focus on the extraction phase, but keep in mind that this solution can be adapted to various scenarios, not only involving data extraction from APIs.</p><figure><img alt="An ETL diagram showing, from left to right, a gear icon representing a REST API, Apache’s Airflow logo and a database management system logo with arrows connecting them left to right." src="https://cdn-images-1.medium.com/max/1024/1*AIxxmENqigBPhk71NrkLWw.png" /><figcaption>In this scenario, we will leverage Apache Airflow to orchestrate an ETL process that involves fetching data from an API and loading it into a database.</figcaption></figure><h3>Real-world scenario: API Quota Constraint</h3><p>By default, we would execute these three tasks in parallel since the endpoints are independent and can be queried individually. This parallel execution would speed up the process and eliminate inter-task dependencies.</p><p>However, let’s introduce a constraint here — imagine our API has a low quota, restricting parallel execution due to potential quota exceeding errors, leading to frequent extraction process failures. In this case, we are left with no choice but to opt for a sequential execution pattern.</p><p>To meet the requirements, we must ensure the following:</p><ul><li>The execution of tasks must be independent so that the failure of one task does not impact the execution of others. That way we don’t delay the job and make debugging easier.</li><li>Downstream tasks should only be executed if all the extraction tasks are successful, ensuring that data processing is done with up-to-date information.</li></ul><h3>DAG Structure</h3><p>The following code will give us the DAG shown right below it:</p><pre>from datetime import datetime<br>from airflow import DAG<br>from airflow.operators.dummy_operator import DummyOperator<br>from airflow.operators.python_operator import PythonOperator<br>from airflow.exceptions import AirflowFailException<br><br><br># Define the default arguments for the DAG<br>default_args = {<br>    &#39;owner&#39;: &#39;airflow&#39;,<br>    &#39;start_date&#39;: datetime(2023, 7, 30),<br>}<br><br><br># Create the Airflow DAG<br>dag = DAG(&#39;example_dag&#39;, default_args=default_args, schedule_interval=None)<br><br><br># Define the task functions<br>def task_success(**kwargs):<br>    task_instance = kwargs[&#39;task_instance&#39;]<br>    print(f&quot;Task {task_instance.task_id} succeeded.&quot;)<br>    return True<br><br>def task_failure(**kwargs):<br>    task_instance = kwargs[&#39;task_instance&#39;]<br>    print(f&quot;Task {task_instance.task_id} failed.&quot;)<br>    raise AirflowFailException(f&quot;Task {task_instance.task_id} failed.&quot;)<br><br><br># Define the tasks<br>with dag:<br><br>    start = DummyOperator(task_id=&#39;start&#39;)<br><br>    extract_1 = PythonOperator(<br>        task_id=&#39;extract_1&#39;,<br>        python_callable=task_failure,<br>        provide_context=True<br>    )<br><br>    extract_2 = PythonOperator(<br>        task_id=&#39;extract_2&#39;,<br>        python_callable=task_success,<br>        provide_context=True<br>    )<br><br>    extract_3 = PythonOperator(<br>        task_id=&#39;extract_3&#39;,<br>        python_callable=task_success,<br>        provide_context=True<br>    )<br><br>    transform = DummyOperator(<br>        task_id=&#39;transform&#39;<br>    )<br><br>    end = DummyOperator(<br>        task_id=&#39;end&#39;<br>    )<br><br><br># Define the dependencies<br>start &gt;&gt; extract_1 &gt;&gt; extract_2 &gt;&gt; extract_3 &gt;&gt; transform &gt;&gt; end</pre><figure><img alt="The DAG with the following tasks in order: start, extract_1, extract_2, extract_3, transform and end." src="https://cdn-images-1.medium.com/max/1024/0*qsmNXiSVo_KmXu3n" /></figure><p>Our Directed Acyclic Graph (DAG) includes a start dummy task (<strong>start</strong>), three extraction tasks (<strong>extract_1</strong>,<strong> extract_2</strong>,<strong> extract_3</strong>), a transformation task (<strong>transform</strong>), and an end dummy task (<strong>end</strong>). Let’s execute this DAG as it is and see what happens. There are two python functions that force either success or error states, <em>task_success </em>and <em>task_failure</em>.</p><p>For the sake of this experiment the <strong>extract_1 </strong>task will be forced to fail, that way we can test if our solution works as expected.</p><figure><img alt="The DAG with the following tasks and their states, in order: start, success, extract_1, failed, extract_2, upstream_failed, extract_3, upstream_failed, transform, upstream_failed, end, upstream_failed." src="https://cdn-images-1.medium.com/max/1024/0*5skz5AePKk-OX2AW" /></figure><p>Initially executing the tasks with default trigger rules results in a failure to meet both premises. <strong>extract_2 </strong>and <strong>extract_3 </strong>can only be executed if <strong>extract_1 </strong>is successful, violating the first premise. Additionally, the incomplete execution prevents us from validating the second premise, as we require all tasks to run for a comprehensive check.</p><p>An initial attempt to resolve this involves setting the trigger_rule parameter of all three extraction tasks explicitly to “all_done.”, that way even if <strong>extract_1 </strong>fails, <strong>extract_2 </strong>would be executed and so on. Our tasks should look like this now.</p><pre>    extract_1 = PythonOperator(<br>        task_id=&#39;extract_1&#39;,<br>        python_callable=task_failure,<br>        provide_context=True,<br>        trigger_rule=&#39;all_done&#39;<br>    )<br><br>    extract_2 = PythonOperator(<br>        task_id=&#39;extract_2&#39;,<br>        python_callable=task_success,<br>        provide_context=True,<br>        trigger_rule=&#39;all_done&#39;<br>    )<br><br>    extract_3 = PythonOperator(<br>        task_id=&#39;extract_3&#39;,<br>        python_callable=task_success,<br>        provide_context=True,<br>        trigger_rule=&#39;all_done&#39;<br>    )</pre><p>Let’s execute the DAG once more and check the results:</p><figure><img alt="The DAG with the following tasks and their states, in order: start, success, extract_1, failed, extract_2, success, extract_3, success, transform, success, end, success." src="https://cdn-images-1.medium.com/max/1024/0*j_R14JAr_lEMEK_7" /></figure><p>Well, although this modification addresses the first premise by allowing tasks to proceed independently, it falls short on the second premise. In this case, the transformation task relies solely on the state of <strong>extract_3</strong>, disregarding the status of <strong>extract_1 </strong>and <strong>extract_2</strong>.</p><p>Unfortunately, the built-in functionality of Airflow proves insufficient to tackle this challenge. We need to create our own tailored solution, one that can meet both premises and deliver us the result we want.</p><h3>Custom Solution</h3><p>To address the problem and fulfill the desired outcome, we need a function to check the states of specific task instances and halt the job if any task fails or proceed with downstream tasks if all extractions are successful.</p><p>For this purpose, we can create a Python function called <em>check_tasks_state </em>using the PythonOperator class. The function verifies the states of Airflow tasks with a given task prefix within the context of a specific DAG run. The DAG run provides essential information about the ongoing execution. The function checks if all tasks with the specified prefix have successfully completed. If any task fails, it raises an <em>AirflowFailException </em>to indicate the failure.</p><pre>def check_tasks_state(task_prefix, **context):<br>    &quot;&quot;&quot;<br>    This function is used to check the states of Airflow tasks with a given task prefix within the context of a specific DAG run.<br>    <br>    Parameters:<br>        task_prefix (str): The prefix of the task IDs to check.<br>        **context (dict): Additional keyword arguments (context) passed to the function. It should contain the &quot;dag_run&quot; key<br>                          referring to the current DAG run instance.<br>    Returns:<br>        bool: True if all tasks with the specified prefix have successfully completed; otherwise, raises an AirflowFailException.<br>    <br>    Raises:<br>        AirflowFailException: If any of the tasks with the specified prefix did not complete successfully.<br>    &quot;&quot;&quot;<br>    # Retrieve the &#39;dag_run&#39; object from the context, which contains information about the current DAG run.<br>    dag_run = context[&quot;dag_run&quot;]<br>    # Iterate over the task instances of the DAG run to check their states.<br>    for task_instance in dag_run.get_task_instances():<br>        # Extract the prefix of the current task&#39;s ID.<br>        _task_prefix = task_instance.task_id.split(&quot;_&quot;)[0]<br>        # Check if the task&#39;s ID matches the provided &#39;task_prefix&#39;.<br>        if task_prefix == _task_prefix:<br>            # If the task is found with a matching prefix, check if it has a state of &quot;success&quot;.<br>            if task_instance.state == &quot;success&quot;:<br>                # If the task is successful, move on to the next task without doing anything.<br>                pass<br>            else:<br>                # If the task has not completed successfully, raise an &#39;AirflowFailException&#39; to indicate the failure.<br>                raise AirflowFailException(f&quot;Task {task_instance.task_id} failed.&quot;)<br>    # If all tasks with the specified prefix have completed successfully, return True.<br>    return True</pre><p>To integrate the custom function into the DAG, we can add an intermediate task called <strong>check</strong> between <strong>extract_3 </strong>and <strong>transform</strong>. The <strong>check </strong>task calls the <em>check_tasks_state </em>function and acts as a checkpoint to ensure downstream tasks proceed only when all “extract” tasks have successfully completed.</p><pre>    check = PythonOperator(<br>        task_id=&#39;check&#39;,<br>        python_callable=check_tasks_state,<br>        op_args=[&quot;extract&quot;],<br>        provide_context=True<br>    )</pre><p>After that, we need to add this task in our dependencies.</p><pre>start &gt;&gt; extract_1 &gt;&gt; extract_2 &gt;&gt; extract_3 &gt;&gt; check &gt;&gt; transform &gt;&gt; end</pre><p>Below is the DAG execution using this solution.</p><figure><img alt="The DAG with the following tasks and their states, in order: start, success, extract_1, failed, extract_2, success, extract_3, success, check, failed, transform, upstream_failed, end, upstream_failed." src="https://cdn-images-1.medium.com/max/1024/0*2O3hsPgyN3IqhXKn" /></figure><p>Now we have a <strong>check</strong> task that fails whenever any of the extraction tasks fails. Using this customized solution, we achieve the desired outcome that meets both premises — tasks are executed independently, and downstream tasks are only executed when all extraction tasks are successful.</p><p>Here’s another try with all tasks having a successful state.</p><figure><img alt="The DAG with the following tasks and their states, in order: start, success, extract_1, failed, extract_2, success, extract_3, success, check, success, transform, success, end, success." src="https://cdn-images-1.medium.com/max/1024/0*t3Ten201fMj8dyDS" /></figure><p>The final code can be found <a href="https://gist.github.com/bjagrelli/0fc8d79f7e9924f1ed9ae4b25012e529"><strong>here</strong></a><strong>.</strong></p><h3>Conclusion</h3><p>In conclusion, Apache Airflow offers a powerful framework for orchestrating data pipelines, and with its flexibility, we can craft custom solutions to address specific challenges. By incorporating a custom task using the PythonOperator, we can efficiently manage task execution and ensure robust data processing while meeting requirements and constraints.</p><p>It is essential to recognize that this script serves as a foundational example that can be further customized and extended to address various use cases and challenges. Data engineers can adapt the code to include additional tasks, implement different task dependencies, or even integrate external services to enrich the pipeline’s capabilities. For instance, the <em>check_tasks_state </em>function can be modified to accommodate more complex validation logic, such as checking specific task outputs or dynamically changing the task execution order based on external events.</p><p>Moreover, this approach is not limited to data extraction scenarios. Data transformation, loading, and even complex workflow orchestration can benefit from similar custom task triggering techniques. Data engineers can leverage Airflow’s vast ecosystem of operators and sensors to integrate with diverse data sources, cloud platforms, or other tools to create comprehensive and sophisticated data processing pipelines.</p><p><strong>Follow me on LinkedIn: </strong><a href="https://www.linkedin.com/in/bjagrelli/">https://www.linkedin.com/in/bjagrelli/</a></p><img src="https://medium.com/_/stat?event=post.clientViewed&referrerSource=full_rss&postId=7fd6f563129e" width="1" height="1" alt=""><hr><p><a href="https://medium.com/gbtech/apache-airflow-custom-task-triggering-for-efficient-data-pipelines-7fd6f563129e">Apache Airflow: Custom Task Triggering for Efficient Data Pipelines</a> was originally published in <a href="https://medium.com/gbtech">tecnologia no Grupo Boticário</a> on Medium, where people are continuing the conversation by highlighting and responding to this story.</p>]]></content:encoded>
        </item>
    </channel>
</rss>