由於均一工作溝通主要使用 Slack,如果 Airflow各階段的執行結果都能在 Slack跳訊息通知,就不需要一直切換到 Airflow之中確認狀態;另外也不是每個人都有權限可以打開 Airflow,但都可以藉由 slack通知來了解資料更新的情況。
有另外一個類似的 SlackWebhookOperator,似乎要設定更多,因此我偷懶(誤)選擇了 SlackAPIPostOperator.
我們一起來看一下範例
from airflow import models, util
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
...
# 這是預先設定在 Docker中設定的 token
slack_token = models.Variable.get('SLACK_SECRET')
# 要傳入 slack的訊息
slack_msg = "Dialy pipeline has compeleted with success. <@U1234ABCD> ..."
default_args = {
"gcp_conn_id": "GCP_secret",
"use_legacy_sql": False,
"location": "US",
"wait_for_completion": True,
}
with models.DAG(
"send_result_to_slack",
default_args=default_args,
start_date=utils.dates.days_ago(0),
schedule_interval=None,
max_active_runs=1,
catchup=False,
) as dag:
pipeline_finish_notification = SlackAPIPostOperator(
task_id='pipeline_finish_notification',
text=slack_msg,
channel='#2022-data-scrum-task',
username='Airflow',
token = slack_token
)pipeline_finish_notification
從上述語法中我們可以理解到
- task_id 是工作任務名稱。
- text 是要傳送的 slack訊息,這裡預先在前面先宣告了。
- channel 是可以用頻道名稱或是頻道 id,頻道名稱需要加上 #,我選名稱是因為比較好識別
- username 是 Airflow bot的名稱
- token 是可以寫訊息到 slack的權限
當我們成功戳了 slackbot之後,就可以在頻道中看到以下訊息。
關於如何創立一個 slack bot,以及如何取得 api 權限,我獨立寫一篇 Slackbot:建立機器人與 API串接