資料工程大小事(三)— 淺談Airflow(概念篇)

Jesshsieh
9 min readApr 6, 2024

--

這個系列會紀錄我在學習資料工程的筆記,希望可以幫助到我以外,也可以幫助到正在學習的路上且有點焦慮的你。

什麼是 Airflow?

Airflow 是 Airbnb 的開源專案,當時 Airflow 為了管理複雜的 data pipeline 而使用 Python 開發的工具,後來成為 Apache 的開源專案。
Airflow 是一個 Workflow Management System,用來管理任務調度與執行,有良好的使用者介面可以操作。

Airflow 想解決什麼問題?

熟悉 Linux 環境的讀者應該知道,欲針對某項任務排程,可以使用 crontab 指令進行排程,該任務會在指定的時間點運行,在任務數量少的情境下的確十分方便,但請讀者試想一個情境:
若我們所要管理的任務高達上百種,那使用 crontab 指令進行任務的排程是否就不恰當了?
這時候我們就可以使用 Airflow!剛才有提到,Airflow 有友善的使用者介面,可以查看目前任務的狀態以及運行所產生的 log,在監控或是 debug 方面都方便許多。另外,由於 Airflow 是由 Python 所開發的,在定義工作流程也十分靈活。

介紹完 Airflow 的優點後,我們先認識 workflow 與 DAG 的概念。

Workflow in Data Engineering

在資料工程的 workflow 中,以常見的 ETL data pipeline 來舉例,簡單來說會經歷以下環節:

  • 原始資料搜集(Extract)
  • 資料處理(Transform)
  • 存入 Data Warehouse(Load)
  • 資料應用:dashboard 呈現、機器學習模型訓練等

Workflow in Airflow

在 Airflow 的概念中,Workflow 與 DAG 是指同一個概念,並不一定包含上述我們所描述的資料工程 Workflow 的整個環節,而是可以將所有需要排程(在設定的時間執行)的任務聚集在一起,透過Airflow 這個工具去管理,所以可以是上述 Workflow 的一部分。

簡單來說 Workflow 與 DAG 在 Airflow 的觀念裡就是管理一系列任務的執行時間、上下游關係等等。

可以搭配稍後的 DAG 介紹閱讀會比較完整。

什麼是 DAG?

DAG(Directed Acyclic Graph) 有向無環圖
從中文名稱可以很好理解:
「有向」:這裡的有向指的是 Airflow 的任務排序具有方向性,可以自行設定任務間的上下游關係,實際執行之後會按照設定的順序執行任務。
「無環」:無環的意思是無循環,執行完一次指定的任務後並不會再重頭執行一遍。
DAG 負責管理一個 Workflow 內任務之間的依賴關係,還有所有運行任務所需的參數。
這些設定在 DAG 創建之後都會存在 Metadata Database 內(稍後詳細說明)。

接下來會介紹 Airflow 中各個 Component,這裡放上一張 Airflow 架構圖,讀者可以時不時回來搭配這張圖閱讀本文

https://airflow.apache.org/docs/apache-airflow/2.0.1/concepts.html

Webserver

這是 Airflow 的 User Interface 介面,可以在這個介面上創造、管理、監控 DAGs,也可以將 workflow 視覺化,查看目前每個任務的狀態等,適合用來 debug 用。

可以看出現在有哪些 DAG 被創建以及 DAG 目前的狀態
點進去其中一個 DAG 可以看到目前有三個任務,三個任務依序執行順序為 get_name > get_age > greet,從右上角的狀態可以發現,目前曾經運行過的任務都是 success 的狀態(綠色)

Scheduler

負責監控排定的 Workflow,將預計執行的 Task 送到 Executor,確保任務在對的時間、對的上下游關係下執行。
Scheduler 會去解析存放 DAG 檔案的資料夾,將 DAG 中設定的參數更新至 Metadata Database 中。

Executor

負責任務的調度。Executor 從 Scheduler 接收到接下來要運行的任務訊息 ,Executor 會啟動進程或是容器去執行任務,要注意的是,Executor 只負責任務的啟動,不負責執行。
常見的 Executor 有 LocalExecutor、CeleryExecutor、KubernetesExecutor。

DAG Processor

DAG Processor 負責解析 DAG 資料夾內的檔案 ,並且將定義 DAG 的參數設定更新至 Metadata Database 中。
等等,這個是不是跟 Scheduler 很像?是的, DAG Processor 是 Scheduler 中的一部分,當 DAG Processor 存在時,Scheduler 就不會直接解析 DAG 檔案。
按照官網的說法,會將 DAG processor 獨立出來是希望 Scheduler 專心做調度任務的工作,這樣設計可以確保擴充性與安全性高,進階的用法就讓讀者自行嘗試了。

Worker

負責實際執行任務。是由 Executor 發起的進程或是容器,負責 run Task 內所設定的程式碼、Script 等。

Task 任務

在 DAG 裡可以設定多個任務,任務之間有可以有上下游關係,任務可以做的事情很多,包含資料處理、資料存取,由開發者自行設定,基本上就是將你預計執行的步驟切分成多個任務,便可以在指定的時間點依序運行任務。
任務間的上下游關係有多種設定方式,也可以指定同時執行多個任務再執行下一個任務,稍後在實作篇會再多做說明。

在 Airflow 中會用不同狀態來紀錄任務當下的進度,可以搭配下圖作參考:

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html
  • none: 尚未加入排程的任務
  • scheduled: 已加入排成的任務,等待
  • queued: 任務已經分配給 Executor,等待 Worker 執行
  • running: 正在執行中的任務
  • success: 執行成功的任務
  • restarting: 任務在運行時被外部請求重啟
  • failed: 失敗的任務
  • skipped: 被跳過的摁物
  • upstream_failed: 上游的任務失敗,導致該任務中止
  • up_for_retry: 任務失敗,但未達到 retry 的上限
  • up_for_reschedule: 預計重新排程的任務
  • deferred: 該任務被延後
  • removed: 在 DAG run 時任務消失了

Metadata Database

用來存放 DAGs 的參數設定與任務的所有資訊,包含 DAGs 的實例、目前的運行狀態,每個任務的起始運行時間、間隔運行時間、目前的狀態等等,基本上所有操作的資訊都會存放在這裡。

note:要啟動 Airflow 最少需要以下的 components,而其他 components 則是 optional 的:

  • Scheduler
  • Webserver
  • 存放 DAG 檔案的資料夾
  • Metadata Database

那實際上 Airflow 內各個 Components 是如何彼此合作的?

  1. Scheduler 定期掃描 DAG 的資料夾,檢查 Metadata Database 中是否已經有 DAG Run 實例,如果沒有就創建一個,如果有就辨認是哪一個實例。
  2. Scheduler 查看 DAG 任務間的 dependencies,將任務加到 work queue 中,並且更新 Metadata Database 中 Taskinstance 的狀態為 queue
  3. Scheduler 將 queue 中的任務分配給 Executor,Executor 接收到任務後會啟動 Worker 去執行任務。
  4. 當 Worker 執行任務時, Taskinstance 的狀態改為 running
  5. 根據執行任務的結果將 Taskinstance 的狀態改為 failedsuccess 等。
  6. 一旦所有 queued 的任務都執行完成,Scheduler 會根據目前 DAG Run 所有任務的執行結果,將 DAG 的狀態改為 failedsuccess,如果任務還沒有執行完,那 DAG Run 的狀態會維持 running

以上是我對 Airflow 概念的基本介紹,如果對你的學習有幫助,也請幫我拍手👏

參考資料:

https://leemeng.tw/a-story-about-airflow-and-data-engineering-using-how-to-use-python-to-catch-up-with-latest-comics-as-an-example.html#Airflow-變數以及-Jinja-模板

https://medium.com/@dustinstansbury/understanding-apache-airflows-key-concepts-a96efed52b1a

https://www.youtube.com/watch?v=K9AnJ9_ZAXE&ab_channel=coder2j

https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html

--

--