Integrating Slack Alerts in Airflow

Kaxil Naik
DataReply
Published in
6 min readDec 11, 2018

You just triggered your Airflow DAG that sends data to your clients and you being confident that the DAG will succeed (Why will it not — you wrote it.. There is no way it can fail), you go to have coffee with your colleagues in Company’s kitchen where the awesome Coffee Machine is waiting for you to serve the most delicious coffee ☕. You discuss how you can make the company better (Of course you don’t talk about how awesome the new Avengers trailer is !!). And then you finally decide to go back to your seat being a smirk to see the green status on your DAG. But wait…… what just happened, your DAG failed — of course, it was not your fault, the letters “DAG” decided to change their order to “DGA” when you were having coffee, hence your DAG failed. At the same time, your boss comes to you and asks you — “How’s your work going? Was the data sent to all clients?”. And you wished that there was some way you could have received an alert on your mobile when you were having coffee.

Well, I can go on and on with this stupid story but the fact is you need alerting when your DAG fails so that you can take actions at the earliest. Airflow has a built-in capability to send alerts on emails but well it gets lost in the pile of other 1000 unread emails. And it is just easier to get alerts where your entire team has an eye on — SLACK.

There are 2 ways in which you can integrate Slack with Airflow.

(1) Using Slack Legacy Tokens:

Legacy tokens are an old method of generating tokens for testing and development and Slack themselves don’t recommend to use this but it is the simplest method — hence you can still use it but bear in mind that it can get deprecated anytime.

Follow this steps:

  • Create a Slack Token from https://api.slack.com/custom-integrations/legacy-tokens. You will see the list of Slack Workspaces your email is associated with. Click on Create Token next to the workspace where you want to send alerts.
  • Use the SlackAPIPostOperator (Remember to install slack dependencies pip install apache-airflow[slack]) Operator in your DAG as below
from airflow.operators.slack_operator import SlackAPIPostOperator
SlackAPIPostOperator(
task_id='failure',
token='YOUR_TOKEN',
text='Hello World !',
channel='SLACK_CHANNEL', # Replace with your Slack username
username='airflow'
)

You can try this example in iPython or using Jupyter notebook as follows:

An Example of running the code in iPython and the sample message received on Slack

However, this is just an example to send a message on slack and not alerts on task failures. Each task in Airflow contains a parameter called on_failure_callback (of callable type) to which you pass a function to be called when a task fails.

Example:

def slack_failed_task(context):  
failed_alert = SlackAPIPostOperator(
task_id='slack_failed',
channel="#datalabs",
token="...",
text = ':red_circle: Task Failed',
username = 'airflow',)
return failed_alert.execute(context=context)
task_with_failed_slack_alerts = BashOperator(
task_id='fail_task',
bash_command='exit 1',
on_failure_callback=slack_failed_task,
provide_context=True,
dag=dag)

Now when you run the dag with the above task, it would send you an alert as shown in the image below:

Example of alerts on task failures. Note that the task ran 6 times so there are 6 alerts

This is useful but there are still 2 issues with the above code:

  • The slack token is stored in plain-text
  • The Slack alert message isn’t properly formatted

The first issue can be resolved by storing the Slack token in Airflow Connections in the password field as follows:

I also recommend running pip install apache-airflow[crypto] which encrypts connection passwords in metadata db.

Now let’s update our function to use token and channel name from connections and also improve alert format:

from airflow.hooks.base_hook import BaseHook
from airflow.operators.slack_operator import SlackAPIPostOperator
SLACK_CONN_ID = 'slack'
def task_fail_slack_alert(context):
"""
Sends message to a slack channel.
If you want to send it to a "user" -> use "@user",
if "public channel" -> use "#channel",
if "private channel" -> use "channel"
"""
slack_channel = BaseHook.get_connection(SLACK_CONN_ID).login
slack_token = BaseHook.get_connection(SLACK_CONN_ID).password
failed_alert = SlackAPIPostOperator(
task_id='slack_failed',
channel=slack_channel,
token=slack_token,
text="""
:red_circle: Task Failed.
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Log Url*: {log_url}
""".format(
task=context.get('task_instance').task_id,
dag=context.get('task_instance').dag_id,
ti=context.get('task_instance'),
exec_date=context.get('execution_date'),
log_url=context.get('task_instance').log_url,
)
)
return failed_alert.execute(context=context)

Sample alert with this function would be as follows:

As you can see it also gives you a Log URL so that you can directly go to the log associated with the failed task.

(2) Using Slack Web Hooks:

Slack recommends Web Hook to send data to it.

Note: There was a bug in SlackWebhookOperator in Airflow≤1.10.3 (Bug Jira Issue). This was fixed in 1.10.4 with this PR (fix commit).

Follow the steps below:

  • Create a Slack app if you don’t have already.
  • Enable Incoming Webhooks on the next page
  • Create an Incoming Webhook by clicking on Add New Webhook to Workspace on the same page

You will see something similar to below image:

So go ahead and pick a channel that the app will post to, and then click to Authorize your app. You’ll be sent back to your app settings, and you should now see a new entry under the Webhook URLs for Your Workspace section, with a Webhook URL that’ll look something like this:

https://hooks.slack.com/services/T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX
  • Create an Airflow connection for Slack with HTTP connection and the part after https://hooks.slack.com/services should go under password:
    Host: https://hooks.slack.com/services
    Conn Type: HTTP
    Password: /T00000000/B00000000/XXXXXXXXXXXXXXXXXXXXXXXX
  • Create a Python function

Note: Reminding you again if you didn’t read this above: there was a bug in SlackWebhookOperator in Airflow≤1.10.3 (Bug Jira Issue). This was fixed in 1.10.4 with this PR (fix commit).

from airflow.hooks.base_hook import BaseHook
from airflow.contrib.operators.slack_webhook_operator import SlackWebhookOperator
SLACK_CONN_ID = 'slack'def task_fail_slack_alert(context):
slack_webhook_token = BaseHook.get_connection(SLACK_CONN_ID).password
slack_msg = """
:red_circle: Task Failed.
*Task*: {task}
*Dag*: {dag}
*Execution Time*: {exec_date}
*Log Url*: {log_url}
""".format(
task=context.get('task_instance').task_id,
dag=context.get('task_instance').dag_id,
ti=context.get('task_instance'),
exec_date=context.get('execution_date'),
log_url=context.get('task_instance').log_url,
)
failed_alert = SlackWebhookOperator(
task_id='slack_test',
http_conn_id='slack',
webhook_token=slack_webhook_token,
message=slack_msg,
username='airflow')
return failed_alert.execute(context=context)

Bonus Tip

You can add on_failure_callback to default_args when defining DAG as below so that you get alert if any task in the DAG fails:

default_args = {
'owner': 'airflow',
'start_date': airflow.utils.dates.days_ago(2),
'retries': 0,
'on_failure_callback': task_fail_slack_alert
}
dag = DAG(
dag_id=DAG_NAME,
default_args=default_args,
schedule_interval=schedule_interval,
)

You can follow the same steps to integrate Slack with Google Cloud Composer.

Let me know in the comments if you have any issues.

--

--

Kaxil Naik
DataReply

Apache Airflow PMC Member and Committer | Open Source Advocate