Airflow:BrachPythonOperator

許博淳
數據共筆
Published in
May 8, 2022

Airflow:PythonOperator這篇文章我們已經講過 Airflow和 Python的愛恨情仇(誤),所以這邊不再重述。

BranchPythonOperator官方文件

BranchPythonOperator和 PythonOperator的運作原理很類似,不同的地方是,PythonOperator會 call .py檔,但不一定有回傳結果,BranchPythonOperator 則是會回傳一到多個 task_id。

換句話說 BranchPythonOperator就是用來決定後續要執行什麼工作,以下用一個均一的例子說明。

from airflow.operators.python_operator import BranchPythonOperator
...

def _check_is_Monday():
if date.today().weekday() == 0:
return 'trigger_weekly_pipeline'
return 'trigger_daily_pipeline'


with models.DAG(
'main_pipeline',
start_date=utils.dates.days_ago(0),
schedule_interval=None,
max_active_runs=1,
catchup=False,
) as dag:
check_is_Monday = BranchPythonOperator(
task_id="check_is_Monday",
python_callable=_check_is_Monday,
)
trigger_weekly_pipeline = TriggerDagRunOperator(
task_id="trigger_weekly_pipeline",
trigger_dag_id="weekly_pipeline",
wait_for_completion=True,
)
trigger_daily_pipeline = TriggerDagRunOperator(
task_id="trigger_daily_pipeline",
trigger_dag_id="daily_pipeline",
wait_for_completion=True,
)
...# 這邊可以注意到 BranchPythonOperator後的 operator要用 []包起來
check_is_Monday >> [trigger_weekly_pipeline, trigger_daily_pipeline] >> ...

從上述語法我們可以了解到

  • task名稱是 check_is_Monday,應該是判斷今天是否是星期一。
  • python_callable是要使用的 Python檔案或 function,這邊呼叫的是 _check_is_Monday這個 function。

從 _check_is_Monday我們可以看到,如果是星期一,回傳 trigger_weekly_pipeline,反之回傳 trigger_daily_pipeline。

這邊很明顯就是一個條件判斷式,根據星期幾決定要執行哪一個 pipeline,實務上不同條件下執行不同的 DAGs的組合時就可以使用 BranchPythonOperator。

另外可以注意到的是,不論是 weekly_pipeline還是 daily_pipeline這邊看起來都是單一個 DAG,但事實上其內容可能有多個 DAGs,只是我們這邊用大 DAG包裝了多個子 dags。

這樣操作維持每一個檔案一個邏輯,可以讓每一層的流程更好理解,也會更好維護。

--

--