由於均一工作溝通主要使用 Slack,如果 Airflow各階段的執行結果都能在 Slack跳訊息通知,就不需要一直切換到 Airflow之中確認狀態;另外也不是每個人都有權限可以打開 Airflow,但都可以藉由 slack通知來了解資料更新的情況。 — 由於均一工作溝通主要使用 Slack,如果 Airflow各階段的執行結果都能在 Slack跳訊息通知,就不需要一直切換到 Airflow之中確認狀態;另外也不是每個人都有權限可以打開 Airflow,但都可以藉由 slack通知來了解資料更新的情況。 SlackAPIPostOperator官方文件 有另外一個類似的 SlackWebhookOperator,似乎要設定更多,因此我偷懶(誤)選擇了 SlackAPIPostOperator. 我們一起來看一下範例 from airflow import models, util
from airflow.providers.slack.operators.slack import SlackAPIPostOperator
...
# 這是預先設定在 Docker中設定的 token
slack_token = models.Variable.get('SLACK_SECRET')
# 要傳入 slack的訊息
slack_msg = "Dialy pipeline has compeleted with success. <@U1234ABCD> ..."
default_args = {
"gcp_conn_id": "GCP_secret",
"use_legacy_sql": False,
"location": "US",
"wait_for_completion": True,
}
with models.DAG(
"send_result_to_slack",
default_args=default_args,
start_date=utils.dates.days_ago(0),
schedule_interval=None,
max_active_runs=1,
catchup=False,
) as dag:
pipeline_finish_notification = SlackAPIPostOperator(
task_id='pipeline_finish_notification',
text=slack_msg,
channel='#2022-data-scrum-task',
username='Airflow',
token = slack_token
)