Airflow:PythonOperator

許博淳
數據共筆
Published in
May 8, 2022

Airflow在其官網中就表明他是一個純粹的 Python架構,因此對 Python的支援很廣泛,作為多數資料工程人員熟悉的語言,如果能用 Python操作一定是相對容易的。

Python

PythonOperator官方文件

在 Airflow中我們可以使用 Python完成做非常多樣的操作,以下的範例是將 GitLab中的程式碼。

from airflow import models, utils
from airflow.operators.python_operator import PythonOperator
from airflow.providers.google.cloud.operators.bigquery import BigQueryExecuteQueryOperator
from datetime import timedelta, date
# 將 .py檔放在獨立資料夾,避免版面混亂
# 要使用的時候獨立 import
from py_modules import Git_Pull_Procedures_to_GBQ
...with models.DAG(
'git_pull_procedures_to_GBQ',
default_args=default_args,
start_date=utils.dates.days_ago(0),
schedule_interval=None,
dagrun_timeout=timedelta(minutes=500),
max_active_runs=1,
catchup=False,
) as dag:
git_pull_procedures_to_GBQ = PythonOperator(
task_id='git_pull_procedures_to_GBQ',
python_callable=Git_Pull_Procedures_to_GBQ.main,
op_kwargs={
'GCP_SERVICE_ACCOUNT_CREDENTIALS': gcp_service_account_credentails
})
git_pull_procedures_to_GBQ

從上述語法我們可以了解到

  • task名稱是 git_pull_procedures_to_GBQ,應該是負責把 git的東西 pull到 Google BigQuery中。
  • python_callable是要使用的 Python檔案或function,這邊呼叫的是 Git_Pull_Procedures_to_GBQ檔案中的 main function。
  • op_kwargs 是呼叫 Python檔案時所需的參數,這邊的例子就是傳入 Google Cloud Platform所需要的憑證。

理論上我們可以在 python_callable中塞入一大堆 python 程式碼,做出相當複雜的操作,但因為這樣版面會很混亂,均一秉持著一個檔案一個功能的邏輯,因此我們獨立成一隻 .py檔。

另外還有一個好處是,獨立的 .py檔要測試會很好處理,可以在 local端建構到好,再修改少數credential的設定,就可以直接應用在 Airflow中。

--

--