Airflow:TriggerDAGRunOperator

許博淳
數據共筆
Published in
May 8, 2022

Airflow的每一個節點都是由 DAG組成,因此我們必然會遇到要觸發 DAG執行的情況,藉由一連串的觸發,最後才能完成整條 pipeline.

圖片來源:https://public.bnbstatic.com/static/academy/uploads-original/cfcf2ea486034b96b36722bb79d78e95.png

TriggerDAGRunOperator官方文件

以下是均一其中一個 TriggerDAGRunOperator的範例

from airflow import models, utils
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
from datetime import date, timedelta
...

with models.DAG(
"weekly_pipeline",
default_args=default_args,
start_date=utils.dates.days_ago(0),
schedule_interval=None,
max_active_runs=1,
catchup=False,
# 宣告要去哪一個路徑找尋對應 DAGs
template_searchpath = ['/opt/airflow/dags/city_weekly_report/'],
) as dag:
trigger_GA_to_GBQ = TriggerDagRunOperator(
task_id="trigger_GA_to_GBQ",
trigger_dag_id="GA_to_GBQ",
)
trigger_GCS_to_GBQ = TriggerDagRunOperator(
task_id="trigger_GCS_to_GBQ",
trigger_dag_id="GCS_to_GBQ",
)
trigger_ip_address_mapping = TriggerDagRunOperator(
task_id="trigger_ip_address_mapping",
trigger_dag_id="ip_address_mapping",
trigger_rule="all_done",
)
trigger_git_pull_procedures_to_GBQ = TriggerDagRunOperator(
task_id="trigger_git_pull_procedures_to_GBQ",
trigger_dag_id="git_pull_procedures_to_GBQ",
)
...

trigger_GCS_to_GBQ >> trigger_ip_address_mapping
[
trigger_GA_to_GBQ, trigger_ip_address_mapping
] >> trigger_git_pull_procedures_to_GBQ >> ...

從上述語法我們可以了解到

  • task_id是要執行的工作任務名稱。
  • trigger_dag_id是實際執行的工作任務名稱

以 trigger_GA_to_GBQ為例,這隻的名稱叫做 trigger_GA_to_GBQ,這隻會去路徑 [‘/opt/airflow/dags/city_weekly_report/’] 觸發 trigger_GA_to_GBQ。

我們可以很簡單就發現到,這隻 DAG就是一系列的執行工作,我們將父 DAG寫在比較上層的流程中,執行細節就寫在這裡,這樣每一層分別理解,也好除錯。

我們還可以看到執行順序

  • trigger_GCS_to_GBQ 執行完才執行 trigger_ip_address_mapping.
  • trigger_GA_to_GBQ 和trigger_ip_address_mapping都執行完才執行 trigger_git_pull_procedures_to_GBQ.

--

--