Road to trigger Airflow’s dag through a form

Benjamin Berriot
Maisons du Monde

--

At Maisons du Monde data team, we are using Airflow as our main datalake’s job orchestrator. Hence, all our jobs used for collecting, transforming and exposing data are running via Airflow (which is deployed on Google Cloud Composer).

What we want

We want to be able to trigger a dag with arguments filled through a form on Airflow UI. This can be useful when we want to launch a machine learning pipeline or an ETL job with specific arguments (that we often do in our daily job).

Disclaimer : this is probably not the best way for doing this because it uses low level API that can change at any moment. The following code is tested for Apache Airflow 1.10.2 and python 2 (I know…). Airflow is running on Google Cloud Composer v1.8.2.

How to trigger a dag from the UI?

First, in Airflow there is a button that can trigger the dag on demand.

First button on the left

Let’s search inside the Airflow code how this button works. We start by finding where the button is implemented within the html template file.

The button is linked to airflow.trigger and it’s sending to the function the dag_id. Then, let’s check in the file, views.py, to find the route /trigger.

Perfect, now we have the code to launch a specific dag with its dag id.

How to pass arguments to the dag execution?

This is the most important thing, we want to send information to the dags through a form. We will use the Airflow jinja templating feature to achieve that (https://airflow.apache.org/docs/stable/concepts.html#id1), in particular, we will use dag_run variable. When we create a new dag_run we can send a conf dict with the argument conf.

dag.create_dagrun(
run_id=run_id,
execution_date=execution_date,
state=State.RUNNING,
conf=run_conf, # here
external_trigger=True
)

We just need to parse all arguments sent using the form, fill this dict and then create a new dag_run with these arguments, pretty simple.

Create the plugin

Now that we have all pieces of code needed for doing what we want. It’s time to build the final code.

Important point, we want to keep the plugin as simple as we can and to make easy to add a new form. So we decided to use a json file to describe the form.

The tree of the plugin is as follow:

plugins/
custom_launcher/
custom_launcher.py
config.json
templates/
custom_launcher/
main.html

We can start by creating the dag that we want to launch. Here is an example:

It’s a simple dag with one task that launches a query on BigQuery: the operator needs a dataset id, a table id and a date.

Then, we create the json file (config.json) to describe the form that we want.

The first entry in the file (here: “Z_QUERY_LAUNCHER”) is the dag_id we want to trigger. Then, all entries correspond to the form input, with some additional information like, if the field is required, the placeholder etc.

Now we need to create the html template for the form.

After, the most important file, the python file that parses the config file, renders the html and triggers the dag.

The plugin exposes two endpoints, the root endpoint ‘/’ that renders the html and ‘/trigger’ that triggers the dag.

Finally the plugin class:

# Defining the plugin classclass AirflowCustomLauncher(AirflowPlugin):
name = "custom_launcher"
admin_views = [CustomLauncher(category="Custom Launch", name=k, endpoint=k) for k in form_config]
flask_blueprints = [bp]

The argument ‘admin_views’ adds new views to Airflow and if we want to use one class but different endpoint (one endpoint by form) we specify the argument ‘endpoint=endpoint_name’.

Final result

We can see the new tab in the menu containing the form created following the config.json file.

When we click on the submit button, a new dag_run is created:

If we go to the rendered tab, we have for each dag_run all arguments sent to the dag:

Et voila, we can launch our dag through a form!

Production state

Time for production, we deployed the code on Google Cloud Composer and we experienced some bugs:

  • config.json file not found, the path on our local Airflow and composer was not the same. When we put config.json in plugin file, we replace the path by “conf.get(‘core’, ‘plugins_folder’) + /custom_launcher/config.json” instead of the hard coded path(never a good solution) “/home/airflow/plugins/custom_launcher/config.json”
  • After we deployed on composer we had a high CPU usage on our Kubernetes cluster. The problem was really hard to identify, but we saw a rise of network usage at the same moment as the rise of CPU usage. So we were looking for code that uses network. We found that the instruction “dagbag = models.DagBag(settings.DAGS_FOLDER)” was outside the class CustomLauncher and was called every time Airflow was parsing the file, creating high network and CPU usage. So we moved this instruction inside the class and the problem was fixed.

Conclusion

This feature is really useful for our team and as we can see on some Airflow meetups it’s a missing feature for a lot of users, this is why we are sharing our work. Digging inside Airflow code was very interesting and helped us to understand how DAGs are triggered in Airflow.

--

--