How to add custom KPIs to Airflow

Hariom Sharma
Mar 31 · 4 min read

Written by Hariom Sharma & Thanigaivel Shanmugam

Apache Airflow is an open-source workflow management platform. Airflow allows you to programmatically author and schedule workflows and monitor them via the built-in Airflow user interface. Airflow is designed under the principle of “configuration as code”. Airflow uses directed acyclic graphs (DAGs) to manage workflow orchestration.

The standard UI of Airflow provides options to view the pipeline job status and filters. As the number of pipelines increases, the status and filters provided in the standard UI make it difficult to filter the information.

More number of pipelines = More difficult to monitor

We see the root of the problem as poor visibility into the data pipeline status.

To mitigate this challenge, Airflow can be customized with a custom Key Performance Indicator (KPI) by making changes to the underlying codebase.

Standard Airflow UI provides just 3 tabs to show the Dag status,

this can be extended to additional custom KPI by tweaking two files dags.html and views.py files.

For example, the below screenshot has the custom KPI added to the Airflow UI:

Code Changes to add the Custom KPI field:

1) dags.html

In this, we will add a custom KPI for “Failed” jobs. In the dags.html you add an additional line for the “Failed” KPI in the same format similar to the other KPI (like All, Active, Paused). This defines the additional tab to be displayed.

<a href="{{ url_for('Airflow.index', status='failed', search=request.args.get('search', None), tags=request.args.get('tags', None)) }}" class="btn {{'btn-primary' if status_filter == 'failed' else 'btn-default'}}" title="Show only failed DAGS">Failed <span class="badge">{{ "{:,}".format(status_count_failed) }}</span></a>

This change will add the additional “Failed” tab to the UI.

2) views.py

The contents of the KPI tab are defined in the views.py. This is done using the predefined ORM (Object-relational mapping, i.e. Airflow metadata table) that's available in Airflow. These ORMs are used to query the data from the Airflow metadata table and pass the results to the UI.

There are three steps to get the data to UI

i. Prepare the filter condition for the KPI

ii. Create an object for KPI Counter

iii. Return the filter dag object back to Airflow base view

Here, we are fetching Failed DAGs information from DagRun ORM Object. Actually, on the backend, it’s running a query on Airflow Metadata.

Prepare the filter condition for the KPI

To filter out the data to be displayed, we will be using the DagRun and DagModel ORM that's mapped with Airflow metadata. A failed_dag object is created by joining the data between these two ORMs.

failed_dags = dags_query.filter(DagRun.dag_id == DagModel.dag_id, DagRun.state.is_('failed'))

Create an object for KPI Counter

A counter variable is created to show the count of results along with the tab name in the UI. The below code does that by directly running a SQL on the airflow metadata table.

status_count_failed = session.execute(
'SELECT count(distinct dag_id) from dag_run where state IN ("failed") and execution_date> current_timestamp -90').scalar()

Return the filter dag object and counter back to Airflow base view

In the filter condition for arg_status_filter you add the two lines in bold. This assigns the newly created object and a counter variable to the standard variables.

if arg_status_filter == 'active':
current_dags = active_dags
num_of_all_dags = status_count_active
elif arg_status_filter == 'paused':
current_dags = paused_dags
num_of_all_dags = status_count_paused
elif arg_status_filter == 'failed':
current_dags = failed_dags
num_of_all_dags = status_count_failed

else:
current_dags = all_dags
num_of_all_dags = all_dags_count

as a final step, the counter variable needs to be assigned as below.

if arg_status_filter else None),
num_runs=num_runs,
tags=tags,
state_color=state_color_mapping,
status_filter=arg_status_filter,
status_count_all=all_dags_count,
status_count_active=status_count_active,
status_count_paused=status_count_paused,
status_count_failed=status_count_failed)

Code Change snapshot:

  1. dags.html

Added :#Line7

2)views.py

Added #Line:8,15–18,26–28,36–38.

How to build the new code changes

In the root directory of Airflow code repo, execute the below statement:

python setup.py install

Now, Start Airflow WebServer using the below command:

./build/airflow webserver -p 8080

Now, check on localhost once the webserver started successfully.

http://localhost:8080

Here you get the magic.

Now we can add as many KPI tabs here (e.g. Running Jobs, LongRunning Jobs, and so on). Each KPI will be fetching the respective latest information from Metadata tables. We can also write complex queries to get more information based on various criteria.

Groupon Product and Engineering

All things technology from Groupon staff

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store