這個系列會紀錄我在學習資料工程的筆記,希望可以幫助到我以外,也可以幫助到正在學習的路上且有點焦慮的你。
什麼是 Airflow?
Airflow 是 Airbnb 的開源專案,當時 Airflow 為了管理複雜的 data pipeline 而使用 Python 開發的工具,後來成為 Apache 的開源專案。
Airflow 是一個 Workflow Management System,用來管理任務調度與執行,有良好的使用者介面可以操作。
Airflow 想解決什麼問題?
熟悉 Linux 環境的讀者應該知道,欲針對某項任務排程,可以使用 crontab 指令進行排程,該任務會在指定的時間點運行,在任務數量少的情境下的確十分方便,但請讀者試想一個情境:
若我們所要管理的任務高達上百種,那使用 crontab 指令進行任務的排程是否就不恰當了?
這時候我們就可以使用 Airflow!剛才有提到,Airflow 有友善的使用者介面,可以查看目前任務的狀態以及運行所產生的 log,在監控或是 debug 方面都方便許多。另外,由於 Airflow 是由 Python 所開發的,在定義工作流程也十分靈活。
介紹完 Airflow 的優點後,我們先認識 workflow 與 DAG 的概念。
Workflow in Data Engineering
在資料工程的 workflow 中,以常見的 ETL data pipeline 來舉例,簡單來說會經歷以下環節:
- 原始資料搜集(Extract)
- 資料處理(Transform)
- 存入 Data Warehouse(Load)
- 資料應用:dashboard 呈現、機器學習模型訓練等
Workflow in Airflow
在 Airflow 的概念中,Workflow 與 DAG 是指同一個概念,並不一定包含上述我們所描述的資料工程 Workflow 的整個環節,而是可以將所有需要排程(在設定的時間執行)的任務聚集在一起,透過Airflow 這個工具去管理,所以可以是上述 Workflow 的一部分。
簡單來說 Workflow 與 DAG 在 Airflow 的觀念裡就是管理一系列任務的執行時間、上下游關係等等。
可以搭配稍後的 DAG 介紹閱讀會比較完整。
什麼是 DAG?
DAG(Directed Acyclic Graph) 有向無環圖
從中文名稱可以很好理解:
「有向」:這裡的有向指的是 Airflow 的任務排序具有方向性,可以自行設定任務間的上下游關係,實際執行之後會按照設定的順序執行任務。
「無環」:無環的意思是無循環,執行完一次指定的任務後並不會再重頭執行一遍。
DAG 負責管理一個 Workflow 內任務之間的依賴關係,還有所有運行任務所需的參數。
這些設定在 DAG 創建之後都會存在 Metadata Database 內(稍後詳細說明)。
接下來會介紹 Airflow 中各個 Component,這裡放上一張 Airflow 架構圖,讀者可以時不時回來搭配這張圖閱讀本文
Webserver
這是 Airflow 的 User Interface 介面,可以在這個介面上創造、管理、監控 DAGs,也可以將 workflow 視覺化,查看目前每個任務的狀態等,適合用來 debug 用。
Scheduler
負責監控排定的 Workflow,將預計執行的 Task 送到 Executor,確保任務在對的時間、對的上下游關係下執行。
Scheduler 會去解析存放 DAG 檔案的資料夾,將 DAG 中設定的參數更新至 Metadata Database 中。
Executor
負責任務的調度。Executor 從 Scheduler 接收到接下來要運行的任務訊息 ,Executor 會啟動進程或是容器去執行任務,要注意的是,Executor 只負責任務的啟動,不負責執行。
常見的 Executor 有 LocalExecutor、CeleryExecutor、KubernetesExecutor。
DAG Processor
DAG Processor 負責解析 DAG 資料夾內的檔案 ,並且將定義 DAG 的參數設定更新至 Metadata Database 中。
等等,這個是不是跟 Scheduler 很像?是的, DAG Processor 是 Scheduler 中的一部分,當 DAG Processor 存在時,Scheduler 就不會直接解析 DAG 檔案。
按照官網的說法,會將 DAG processor 獨立出來是希望 Scheduler 專心做調度任務的工作,這樣設計可以確保擴充性與安全性高,進階的用法就讓讀者自行嘗試了。
Worker
負責實際執行任務。是由 Executor 發起的進程或是容器,負責 run Task 內所設定的程式碼、Script 等。
Task 任務
在 DAG 裡可以設定多個任務,任務之間有可以有上下游關係,任務可以做的事情很多,包含資料處理、資料存取,由開發者自行設定,基本上就是將你預計執行的步驟切分成多個任務,便可以在指定的時間點依序運行任務。
任務間的上下游關係有多種設定方式,也可以指定同時執行多個任務再執行下一個任務,稍後在實作篇會再多做說明。
在 Airflow 中會用不同狀態來紀錄任務當下的進度,可以搭配下圖作參考:
none
: 尚未加入排程的任務scheduled
: 已加入排成的任務,等待queued
: 任務已經分配給 Executor,等待 Worker 執行running
: 正在執行中的任務success
: 執行成功的任務restarting
: 任務在運行時被外部請求重啟failed
: 失敗的任務skipped
: 被跳過的摁物upstream_failed
: 上游的任務失敗,導致該任務中止up_for_retry
: 任務失敗,但未達到 retry 的上限up_for_reschedule
: 預計重新排程的任務deferred
: 該任務被延後removed
: 在 DAG run 時任務消失了
Metadata Database
用來存放 DAGs 的參數設定與任務的所有資訊,包含 DAGs 的實例、目前的運行狀態,每個任務的起始運行時間、間隔運行時間、目前的狀態等等,基本上所有操作的資訊都會存放在這裡。
note:要啟動 Airflow 最少需要以下的 components,而其他 components 則是 optional 的:
- Scheduler
- Webserver
- 存放 DAG 檔案的資料夾
- Metadata Database
那實際上 Airflow 內各個 Components 是如何彼此合作的?
- Scheduler 定期掃描 DAG 的資料夾,檢查 Metadata Database 中是否已經有 DAG Run 實例,如果沒有就創建一個,如果有就辨認是哪一個實例。
- Scheduler 查看 DAG 任務間的 dependencies,將任務加到 work queue 中,並且更新 Metadata Database 中 Taskinstance 的狀態為
queue
。 - Scheduler 將 queue 中的任務分配給 Executor,Executor 接收到任務後會啟動 Worker 去執行任務。
- 當 Worker 執行任務時, Taskinstance 的狀態改為
running
。 - 根據執行任務的結果將 Taskinstance 的狀態改為
failed
、success
等。 - 一旦所有 queued 的任務都執行完成,Scheduler 會根據目前 DAG Run 所有任務的執行結果,將 DAG 的狀態改為
failed
或success
,如果任務還沒有執行完,那 DAG Run 的狀態會維持running
。
以上是我對 Airflow 概念的基本介紹,如果對你的學習有幫助,也請幫我拍手👏
參考資料:
https://medium.com/@dustinstansbury/understanding-apache-airflows-key-concepts-a96efed52b1a
https://www.youtube.com/watch?v=K9AnJ9_ZAXE&ab_channel=coder2j
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html