How To Trigger Apache Airflow Failure Dag Alarm in Slack?
First of all pray to Turkiye for the earthquake. Special thanks to Airflow contributors for the supports and motivates me to writing this blog. Also thanks my teammates for the supporting me.
1-) Why we used Airflow integrated with Slack?
Airflow is the data organization tool for the data engineers. Which means Airflow is an open-source platform to programmatically author, schedule, and monitor workflows. It is widely used for orchestration of big data workflows.
Slack is a workplace communication platform that allows team members to communicate through chat channels, direct messages, and file sharing. It helps teams to stay organized and reduces the need for email, enabling real-time collaboration and discussion. Slack integrates with a variety of tools and services to help teams work more efficiently.
We used Slack in the company for the commination and scheduling jobs. Also we integrated alarms and operations in Slack. One of the platform that we used platform is Airflow. We ensure that SLA times in the Big Data platform.
Quick info : SLA means that Service Level Agreement. This term is very large but for data engineers, a SLA defines the expected level of service and performance of the data infrastructure and systems they are responsible for. This may include elements such as the availability and reliability of data storage systems, the processing and transfer speeds of data, and the response times for data queries.
2-) How we handle Airflow alarm?
First of all we use Prometheus to handle alarm in Airflow. Prometheus is the time series alert system and it is perfect fit for any environments. Also Prometheus is very scalable for your resources. If you create a new dag you don’t need to add alarm system because of Prometheus integration with Airflow. You easily create integration with Prometheus and Airflow in Kubernetes with the following stages.
- First of all create a secret.yml for the managing how to adding hooks and namespace.
apiVersion: v1
kind: Secret
metadata:
name: slack-secret
type: Opaque
data:
SLACK_API_TOKEN: <Base64 encoded Slack API token>
SLACK_CHANNEL: <Base64 encoded Slack channel name>
SLACK_HOOK: <Your hook if you want manage only hook>
In this example if you want manage SLACK_API, you write SLACK_API_TOKEN and SLACK_CHANNEL. If you use alarm in the one specific channel I recommended using SLACK_HOOK. Because when you create SLACK_HOOK you specify where the hook sends manage. So that you don’t need to specify SLACK_CHANNEL in the secret.yml.
- Secondly you create a prometheus alarm system in the following yaml.
---
apiVersion: monitoring.coreos.com/v1
kind: Prometheus
metadata:
name: airflow-rules
spec:
replicas: 1
ruleSelector:
matchLabels:
role: prometheus-rulefiles
serviceAccountName: prometheus-sa
serviceMonitorSelector:
matchLabels:
app: myapp
serviceMonitorNamespaceSelector:
any: true
groups:
- name: airflow
rules:
- alert: AirflowDAGFailed
expr: airflowlog_dag_state{dag_id="example_dag", state="failed"} > 0
for: 5m
labels:
severity: critical
annotations:
description: DAG example_dag has failed in Airflow
summary: "Airflow DAG Failure: example_dag"
- alert: AirflowTaskFailed
expr: airflowlog_task_state{task_id="example_task", state="failed"} > 0
for: 5m
labels:
severity: critical
annotations:
description: Task example_task has failed in Airflow
summary: "Airflow Task Failure: example_task"
resources:
limits:
memory: 1024Mi
cpu: 1
nodeSelector:
kubernetes.io/os: linux
Also you will edit groups and alert rules for this example. This yaml is critical level for DAG Failed and Task Failed. If you want create alert severity you look the prometheus alert level severity.
- Third one is you must create a yaml that integrates Slack with Prometheus. Following yaml is the good solution for this problem.
global:
alertmanagers:
- static_configs:
- targets:
- slack-secret
route:
receiver: slack-receiver
group_wait: 30s
group_interval: 5m
repeat_interval: 12h
receivers:
- name: slack-secret
slack_configs:
- api_url: SLACK_HOOK
send_resolved: true
title: |-
[{{ .Status | toUpper }}{{ if eq .Status "firing" }}:{{ .Alerts.Firing | len }}{{ end }}] {{ .CommonLabels.alertname }} for {{ .CommonLabels.name }}
text: >-
<!channel>
*Alert details*:
{{ range .Alerts -}}
*Description:* {{ .Annotations.description }}
{{ end }}
This yaml is important for integrating with Prometheus and Slack and also giving the text.
Finally we giving this yaml in Kubernetes. (If you use CI/CD solutions this is perfect the automation for this usage.)
As you clearly see that if you use Prometheus, you see alarm the automatically. But also you clearly see that you must clear DAG in the interface but what if we manage this in slack?
3-) Why do you need that?
This question when I thought in my head, I was really impressed. Because you can’t open Airflow UI or Rest API or CLI in this situation. Which means you can operate this feature in Slack. Also you can manage dag clear system for event in the slack event. (For example you think X person don’t clear the tasks, you blocked in the code level etc.)
You are free to manage alerts with the support of Slack and Airflow system. For example integrated Airflow failure alarm would be like this
You see that this is an example for the when task having error. I given that task name, dag name, execution time and log url for the following alarm. Also you see that this alert works with SlackWebhookOperator. Which means this alert send message with web hook. So that you can’t listen this event without event handler. (I writing about this situation in fourth part.)
In this example we see that we manage alarm system in the dag. But this is disadvantage because one needs to write to all dags for this alarm mechanism. So that this code block is bad smell for your code. (Prometheus is the very important for this situation because you can’t code in prometheus in the dag)
I compare with Prometheus with this feature, this is bad idea for the handler for several dags in Airflow. Because of this alarm that works one time, Prometheus works several times if you give interval. (Also you manage this alarm system with retry in Airflow if alarm occurred due to a momentary problem) So that this alarm is missing for the operation team. But for example if you have a team and you don’t want create prometheus, you create alarm system like this in Airflow. Slack channel is observed by operational team way more quickly and easily, so that creating alarm system like is more beneficial.
As you clearly see that this alarm system is in Slack actions with using Socket Mode (Slack recommends this usage because the event handler is much easier than using web hook)
The advantages of this usage are :
- Alarm is come one time for dag and you can see much easily than Prometheus.
- You can manage easily that which user triggered button in the following example.
- Also you see that you use block kit in this message. You can enrich message with block kit.
The disadvantages of this usage are :
- As I mentioned previously, this alarm triggered only once. So that this is problem for the alarm tracking if you have many dags.
- Clearly to seen that these are works with events. Airflow is not suggest for handling event.
4-) Future Works
As I mentioned that Airflow is not suggesting for handling events. Also if you are write event handler in Slack events (integrated with socket mode or Flask app — My personal choose is socket mode), so you integrate with dag in your event handler. Following code block is the help for this request. In this code you listen message type and inside the actions and these are running in another web application.
import os
from slack_sdk import WebClient
from slack_sdk.errors import SlackApiError
from slack_bolt import App
client = WebClient(token=['SLACK_API_TOKEN'])
app = App(token=['SLACK_APP_TOKEN'])
@app.event({
"type": "message"
})
def handle_click(ack, respond, command,event,body):
try:
ack()
event_type = body['actions']
latest_message = body['messages'][0]
if "actions" in latest_message:
button_value = action["value"]
if button_value == "button_1":
respond(text=f"{body['user']} is integrated by {body['actions']}")
elif button_value == "button_2":
respond(text=f"{body['user']} is integrated by {body['actions']}")
else:
respond(text=f"{body['user']} is integrated by {body['actions']}")
else:
respond(text="No button was clicked.")
except SlackApiError as e:
respond(text=f"Error: {e}")
if __name__ == "__main__":
app.start("host","port")
You also checking this following page for writing custom actions using in Flask : https://slack.dev/bolt-python/concepts#socket-mode
I improve that this task with the information about fail reason, but I couldn’t succeeded it because we take logs from Elastic Search. The future work is how to get basically logs stored in Elastic.
Final future work is clearing task in REST API of Airflow. Because this is not recommended by the Airflow contributors clearing tasks in the Airflow CLI.
5-) Conclusion
As a result I create without using Airflow UI and integrated with Slack with the alarm.
I hopefully, you enjoy this article :) Thanks for reading
Take care of yourself!
Emin Can
References:
[1] : https://airflow.apache.org/docs/apache-airflow/stable/index.html
[2] : https://slack.dev/bolt-python/tutorial/getting-started
[3] : https://airflow.apache.org/community/