Airflow: Externally trigger a DAG when a condition match

Ngu Truong
Nov 2 · 5 min read

Overall, I need to build a data pipeline architecture pulling data from operational databases and import them into our Data Warehouse (DWH). To implement a pipeline, I usually compose many DAGs. A DAG (Directed Acyclic Graph) is an Airflow term; it defines a sequence of operations and how often they should be run. In our architecture, DAGs belong to one of these ordered stages Datalake, Staging, Datawarehouse, or Datamart. In order to schedule the whole pipeline with many DAGs to run in the right order, I faced the problem of triggering a DAG when condition matches. There are some considered options and in the end, I came with the solution overriding the SensorOperator of Airflow.

A DAG can be executed with a scheduled interval, however, there are use cases that we want it being triggered by a condition match. For example, there are 2 DAG csv_to_s3 and s3_to_dwh. DAG csv_to_s3 is scheduled to run daily and it loads a CSV file into S3. DAG s3_to_dwh has the same configuration; it takes care of transforming the CSV file and then it loads the result into the DWH. It is obvious that the s3_to_dwh needs to wait for the csv_to_s3 finished. In other words, there must be a mechanism to postpone the s3_to_dwh and trigger it after the csv_to_s3 marks done.

In my context, I have 2 DAGs called Source and Target respectively belongs to Datalake and Staging stage. The Source outputs a CSV file and loads it into S3, the Target have to wait for the Source done, so it can pull the CSV file and process afterward. Briefly, I would like to trigger the Target only if the Source successes.

It is clear that we cannot satisfy the above relationship with only the schedule interval of Airflow. Therefore, I tried to find a way to externally trigger the Target from the Source.

There are some considered solutions:

  • Push models: trigger_dag command and its wrapper REST API, DAG triggers DAG
  • Pull models: S3KeySensor, Variable-Sensor Compound

Let’s go through these bullets.

Considered solutions

trigger_dag command

Airflow has a very rich command-line interface that allows for many types of operation on a DAG, starting services, and supporting development and testing. We can externally trigger a DAG Run with trigger_dag command.

Assume having a DAG with id ‘example_trigger_target_dag’, we can trigger it and also pass a message to in by executing the command:

trigger_dag built-in command

The message is held inside a dag_run object at target DAG and we can read as follow:

Read the message in dag_run object

REST API

Airflow exposes a REST API. It is available through the webserver. Endpoints are available at /api/experimental/.

We can trigger the Target with id example_trigger_target_dag by requesting an endpoint like this:

HTTP trigger_dag

The message is read in the same way as when using trigger_dag command.

With this solution, we can set up an AWS lambda triggering the DAG if a new file landing in the S3 bucket.

DAG triggers DAG

From the Source, we can easily trigger the Target if we know its dag_id.

The target runs right after the source_dag finished

In the Source, we can pass the message within dag_run_obj.payload:

Whenever the Source finishes, it will automatically trigger the Target having id `example.trigger_dag.target_dag`:

All 3 trigger_dag family solutions are easy to implement but not satisfy my needs. trigger_dag command is too handy. Setting up an AWS lambda triggering the DAG if a new file landing in the S3 bucket is too difficult for debugging. With DAG triggers DAG, when we have another target DAG, we need to update the source DAG with the dag_id of the new target. It will be hard to manage when the system scales up. Also, I want to run DAGs with different intervals, some are daily, but some are monthly.

S3KeySensor

Basically, S3KeySensor inherits the BaseSensorOperator operator. It waits for a key (a file-like instance on S3) to be present in an S3 bucket. We can use is as the first operator in the DAG, so, technically the DAG only being executed if the S3 file is available.

S3KeySensor mechanism

To use the S3KeySensor operator, you need to provide these args:

List of attributes

With S3KeySensor, I am really close to the solution. The S3KeySensor requires a bucket_name (i.e the name of the file) and this value is commonly dynamic so we cannot provide it. For example, the file name can have format `customers_YYYY_MM_DD.csv` and the Target waits for a specific file with date part satisfy its special requirement.

Variable-Sensor Compound

Through the S3KeySensor experiment, I decided to implement my own Sensor inheriting the BaseSensorOperator. A Sensor in Airflow is a kind of Operator waiting for a condition match. The condition is checked with method poke and it is a predicate that returns True/False.

The idea is the Source will set an Airflow Variable indicating that it finishes its work. Target has a Sensor operator as the first operator in the DAG. The Sensor operator will check the Variable and determine whether it should trigger the series of followed operators.

Variable-Sensor Compound mechanism

In the example below, both DAGs are scheduled daily.

The Source after done its ETL process will set a Variable with its dag_id as the key and its execution time as the value. e.g: ‘example.airflow_variable_operator_20191101_0000

AirflowVariableOperator are defined here

The Target checks if the Source is already executed on that day:

AirflowVariableSensorOperator are defined here

With this solution, I can set up multiple DAG run with their own different interval, and make sure that they are triggered when the condition match!

Outcome

After considering all the current solutions, I choose the Variable-Sensor Compound mechanism to implement in my data pipeline. For now, it satisfies my needs. I can Externally trigger a DAG when a condition match and the condition match function can be defined depends on specific needs. In addition, the DAGs in the pipeline can run at different intervals.

The example code can be found here.

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade