Airflow: 獨立功能成物件,像 python function一樣傳遞參數

許博淳
數據共筆
Published in
Jul 12, 2022

如果你看了上一篇(Airflow: 使用 Xcom / Variable傳遞參數)發現那不是你要的,你要的是像 python的 function一樣傳遞,不用定義區域/全域變數,那你就來對地方了!

以下就以傳送訊息到 slack為例來和大家說明。

以下範例中,在某一個dag之中宣告一個傳送訊息的工作任務,使用 SlackWebhookOperator

. . .
from airflow.providers.slack.operators.slack import SlackAPIPostOperator


with models.DAG(
"daily_pipeline",
start_date=utils.dates.days_ago(0),
schedule_interval=None,
max_active_runs=1,
catchup=False,
) as dag:
trigger_GA_to_GBQ = TriggerDagRunOperator(
task_id="trigger_GA_to_GBQ",
trigger_dag_id="GA_to_GBQ",
wait_for_completion=True,
)
send_Slack_msg = SlackWebhookOperator(
task_id='send_Slack_msg',
text=
"Daily pipeline has completed with success. <@UTEDB6A5A> <@U7U2NE2RX>",
channel='#pipeline通知',
username='Airflow',
token=slack_webhook_token
)

如果能將傳訊息的 operator獨立成一個檔案,這樣每個 DAG想要使用的時候只要 import來使用就可以。

宣告一個用來傳 slack訊息的工作任務,命名為 send_message_to_slack.py

from airflow.decorators import task
from airflow.models import Variable
from slack_sdk import WebClient

# token 事先在 dokcer中的 variable檔案中定義
slack_token = Variable.get('SLACK_SECRET')
client = WebClient(token=slack_token)
# 定義 task,後續可以直接像 python的 function一樣使用
# 可以定義的內容和一般 DAG一樣,包含 trigger_rule
@task(trigger_rule="all_success")
def send_user_defined_message(message):
client.chat_postMessage(
channel = "#2022-data-scrum-task",
text = message
)

使用上述宣告的工作任務傳送訊息到 slack,就可以像一般 python一樣, import進來後

. . .from pathlib import Path
file_name = Path(__file__).stem
from slack_message import send_message_to_slack
file
# dependencies
# 像在
xxx前置工作 >> send_message_to_slack.send_user_defined_message(f"{file_name} was failed")

這樣就可以傳送以下的訊息,甚至可以包含 slack icon。

--

--