Integrating Airflow with Nagios

Valentin Omota
THG Tech Blog
Published in
8 min readOct 7, 2020

Motivation

Airflow is a really convenient and powerful scheduling platform that facilitates better visibility over workflows via its functional yet simple UI. However, one thing that Airflow naively lacks is some basic alerting functionality. Having an increasing number of directed acyclic graphs (DAGs) that may run once a day or as often as every 15 minutes, keeping on top of each one’s performance is basically impossible.

Airflow UI

Hence, what Airflow could benefit from is a service that monitors each DAG and alerts on any issue as soon as it happens, at any point during the day. And this is where Nagios comes in.

But surely you can do some of that with Airflow directly, so why Nagios? Well, it’s true, you could define an EmailOperator task in Airflow to send an email if anything is wrong and append it to each DAG. In this way, if a DAG fails, you’ll get an email about it - simple, but maybe not ideal. Why would you want a workflow that does some complex and specific tasks also sending an email? What happens if the emailing part fails, has the entire DAG failed? Do you really want to add this functionality to all your DAGs? What happens when you need to change the functionality of the emailing task, do you change all your workflows?

In this sense, Nagios is a much better choice — you have just one centralised and separate service to handle alerting that could in theory check all your DAGs. Moreover, if all your services are being monitored by Nagios, you’d be better off sticking with it and keeping alerting consistent.

Nagios Alert for DAG duration

One problem though, Nagios requires some metrics or endpoints to work with and Airflow does not really expose significant metrics. Therefore, an extension to Airflow is required.

Exposing Airflow metrics

Airflow has an API, but it is still in the experimental phase. It has nice functionality to trigger, pause and start DAGs or to get some metrics about the latest DAG runs and their tasks. Those metrics might be enough to alert on but, if you want to see more than when a DAG ran, and if it failed or not, the native Airflow API will not be enough.

Fortunately, Airflow is customisable via plugins — which are just python files that can be copied into /airflow/plugins directory to be picked up by the service on start up. These plugins become part of the code base and can access everything the scheduler or the web services have access to. Coming up with a script to access and expose metrics on an endpoint is not that hard. In fact, somebody already did it here https://github.com/airflow-plugins/airflow_api_plugin/

So, going through this plugin code we can see how it works: first thing that’s needed is some way to access the metrics of a DAG run:

from airflow.models import DagBag, DagRun

It’s just that simple! The plugin can import core classes of the code base and use them.

session = settings.Session()query = session.query(DagRun)query = query.filter(DagRun.external_trigger =(request.args.get('external_trigger') in ['true', 'True']))runs = query.order_by(DagRun.execution_date).all()

The plugin has access to the same airflow.cfg config file, so it has access to the same metastore without any extra effort. Using the SQLAlchemy based Session class in Airflow, it can query the database for all the needed information.

And secondly: what about a way to expose these metrics on an endpoint? That’s just a Flask Blueprint routing the endpoint to the functionality.

@airflow_api_blueprint.route(
'/dag_runs/<dag_run_id>', methods=['GET'])

That’s everything! At this point Airflow will serve the DAG run metrics as if it was always designed to do so with no need for any extra connections to the database or any extra services.

Nagios Plugin

Nagios is easily customisable as well with its plugins and command definitions. As long as you have some metrics and a way to process them, Nagios can handle the rest.

Our proposed implementation is a python plugin; it is a script that can be run by Nagios and feeds back something recognisable to the service. The plugin usage would look like:

Usage: airflow_plugin.py [options] -H hostname arg1 arg2 arg3Options:
-h, --help show this help message and exit
-H STRING VM hostname/ address
-w INT Warning threshold. See specific Alert
definition
-c INT Critical threshold. See specific Alert
definition
Failed DAG Runs Alert:
--failed-dag-runs Failed DAG Runs flag;
No extra options
available
Scheduled Latency Alert:
--scheduled-latency Scheduled Latency flag;
-w -c OPTIONAL;
-w is defined in minutes, defaults to 2;
-c is defined in minutes, defaults to 5
Healthcheck Alert:
--healthcheck Healthcheck of the API; No extra options
available

So, a Nagios command to check for scheduling latencies in DAG runs would look like:

python3 $USER1$/airflow_plugin.py --scheduled-latency \
-H $HOSTADDRESS$ \
-w 15 \
-c 30

To go a bit more into implementation details, the workflow for such a plugin would generally go like this:

get a list of all active DAGs
loop through each DAG

get metrics for a DAG
process metrics into some concrete result

if result is greater than the CRIT threshold
append DAG to list of critical
if result is greater than the WARN threshold
append DAG to list of warnings
if list of criticals is not empty
exit with code 2 (which is interpreted by Nagios as CRIT)
if list of warnings is not empty
exit with code 1 (which is interpreted by Nagios as WARN)
else
exit with code 0 (which is interpreted by Nagios as OK)

Everything else is handled by Nagios. Being Nagios, this doesn’t have to be Python, it could as well be bash or PowerShell or any scripting language.

The overall workflow of the plugin is not complicated or some brand new technique, this is a typical way of doing it. The smart thing about all this is how you process the metrics you get, that is, how you go about that process metrics bit. This is obviously dependent on what you want to get out of your alert. It could be as simple as parsing JSON for a certain value or as complex as processing multiple days of metrics into an average and calculating the deviation of the most recent run.

Types of alerts

As for the alerts we are interested in, first and foremost is a healthcheck. We are relying on the API so we need to know if the it’s up. This alert can be easily defined in the Nagios plugin by retrieving a list of all DAGs. The call will test not only the API, but also the web server and the connection to the database.

def healthcheck_alert(hostname):
try:
response = requests.get(
f'http://{hostname}:8080/api/v1/dags'
)
response.raise_for_status()
except Exception as err:
sys.stdout.write(str(err))
sys.exit(2) # CRIT
if response.status_code != 200:
sys.stdout.write(response.text)
sys.exit(2) # CRIT
sys.exit(0) # OK

Secondly, we want alerts firing on DAG failures. That’s easily achievable in the Nagios Plugin: just retrieve the last run of each DAG and return CRITICAL if any failed.

def failed_dag_runs_alert(hostname):
failed_dag_runs = []
response = requests.get(f'http://{hostname}:8080/api/v1/dags')
dag_list = response.json()['response']['dags']
for dag in dags:
response = requests.get(
f'http://{hostname}:8080/api/v1/metrics',
params={dag_id:dag['dag_id']}
)
dag_runs = response.json()['response']['dag_runs']
# Check if the latest run failed
latest_dag_run = dag_runs[0]
if latest_dag_run['state'] == 'failed'
failed_dag_runs.append(dag['dag_id'])
if failed_dag_runs:
sys.stdout.write(failed_dag_runs)
sys.exit(2) # CRIT
sys.exit(0) # OK

In addition, we would like to see if any of the DAGs ran longer than usual. How much is longer? How long is usual? For the latter, this is easily achievable by getting the mean of the last x runs of a DAG— for instance, if we include 7 runs for a daily job, we can assume that the mean would be quite representative. For the former, the easiest would be to use hard thresholds: if it ran 15 mins more then set to WARN and if it ran 30 mins more then set to CRIT. However, this could miss quick jobs that normally take 1 or 2 mins taking many times longer. We could calculate the deviation from the mean and if it’s greater than 30% then set to WARN and if it’s greater than 50% then set to CRIT. This would require more time to fine-tune the right thresholds, especially if there are DAGs with big differences in run times.

# Example for hard thresholds of 15 and 30 minutes
def dag_duration_alert(hostname, warn_threshold=15,
crit_threshold=30, window_size=7):
warn_duration = []
crit_duration = []
response = requests.get(f'http://{hostname}:8080/api/v1/dags')
dag_list = response.json()['response']['dags']
for dag in dag_list:
response = requests.get(
f'http://{hostname}:8080/api/v1/metrics',
params={dag_id:dag['dag_id']}
)
dag_runs = response.json()['response']['dag_runs']
dag_runs = dag_runs[:window_size]
dag_durations = []
for dar_run in dag_runs[1:]:
dag_durations.append(
dag_run['end_date'] - dag_run['start_date'])
median_dag_duration = statistics.median(dag_durations)
latest_dag_duration = dag_duration[0]
deviation_from_median = abs(
latest_dag_duration - median_dag_duration)
if deviation_from_median >= crit_threshold:
crit_duration.append(dag['dag_id'])
elif deviation_from_median >= warn_threshold:
warn_duration.append(dag['dag_id'])
if crit_duration:
sys.stdout.write(crit_duration)
sys.exit(2) # CRIT
elif warn_duration:
sys.stdout.write(warn_duration)
sys.exit(1) # WARN
else:
sys.exit(0) # OK

Finally, another relevant thing to check is whether the DAG ran on time. This check validates more the scheduler rather than the DAG itself, but even so, if your workflows are time critical, you wouldn’t want to suffer any latency. Conveniently, Airflow stores the next run time of a certain DAG run. With this information, we can compare it to the actual run time. Again, while it ideally should always start on time, the same problem as above arises: how much more is a WARN problem and how much more is a CRIT problem?

# Example for hard thresholds of 2 and 5 minutes
def dag_duration_alert(hostname, warn_threshold=2,
crit_threshold=5):
warn_latencies = []
crit_latencies = []
response = requests.get(f'http://{hostname}:8080/api/v1/dags')
dag_list = response.json()['response']['dags']
for dag in dag_list:
response = requests.get(
f'http://{hostname}:8080/api/v1/metrics',
params={dag_id:dag['dag_id']}
)
dag_runs = response.json()['response']['dag_runs']
dag_runs = dag_runs[:window_size]
last_dag_run = dag_runs[0]

start_date = last_dag_run['start_date']

# Calculate the actual start date: that is
# at the end of the scheduling interval from the
# execution_date
scheduled_date = croniter.croniter(
last_dag_run['schedule_interval'],
last_dag_run['execution_date'],
).get_next(datetime)
latency = (start_date - scheduled_date).total_seconds / 60 if latency >= crit_threshold:
crit_latencies.append(dag['dag_id'])
elif latency >= warn_threshold:
warn_latencies.append(dag['dag_id'])
if crit_latencies:
sys.stdout.write(crit_latencies)
sys.exit(2) # CRIT
elif warn_latencies:
sys.stdout.write(warn_latencies)
sys.exit(1) # WARN
else:
sys.exit(0) # OK

Conclusion

Integrating Airflow with Nagios is straightforward with some knowledge of Python and two plugins— one for Airflow and one for Nagios. This plugins give you the freedom to be creative with either of them without affecting the other. They are also examples of how to add custom checks to Nagios (for Airflow or other systems) or to extend Airflow!

We’re recruiting

Find out about the exciting opportunities at THG here:

--

--