一週內打造易管理ETL系統,Airflow就決定是你了!

Che-Yu Lin
AlfredCamera Team Blog
21 min readDec 30, 2021

Airflow是由Airbnb開源內部開發的工作流程管理系統,使用者能夠透過精簡的介面監控每日需要執行的工作狀態,並且模組化的程式架構讓開發人員能夠簡單透過程式碼新增自己需要的每日/每週/每月的固定執行流程,同時也能夠有效的記錄流程失敗的狀態與原因
接下來就看看為什麼阿福需要這套系統以及我們如何在一個相對限縮的時間內快速導入系統試用,一路上所做的抉擇與踩過的大坑吧!

嗨,我是Cheyu,目前在阿福管家擔任Solution Team Lead,前身是專門負責機器學習研究/落地的ML團隊,不過在研究之餘,我們也很常協助解決公司內部的流程優化或產出產品PoC,故改名成Solution Team,大概就是銀魂萬事屋的角色。
本篇內容不包含基礎架設及Airflow詳細DAG內容解析,但請務必來看看我們有趣且緊湊的開發日常。

為什麼阿福需要Airflow

Day0-從痛點開始

阿福是一間非常仰賴數據科學自動化的公司,為此我們的數據分析師花了相當大的時間與力氣部署大量ETL工作流程消化並處理我們產品每天產出的數據及各式指標。
起初我們透過crontab+腳本的管理這些工作流程,隨著流程越來越多,我們也越來越難管理與確定資料是否有在正確的時間內去到該去的地方。

總歸來說,我們在資料流管理上遇到的痛點會是:

  • 新增/監控/管理任務需要多團隊協作,很常因為Sprint繁忙導致延遲
  • 每日資料流任務資源分散,一出問題需要四處(GCP/AWS/實體爬蟲機)排查原因
  • 時常有任務跑不完的情況產生,需要後續再補資料,且時常沒發現
  • 實作方式都以 “可運作” 為優先,僅少數有迭代成易維護的架構

與此同時,盤點了目前有多少任務是在crontab/cloud function的管理內:

  • 爬取各廣告商資料
  • 提取Firebase相關資料
  • 與Firebase資料相關的後處理任務
  • 爬蟲任務
  • 報表任務

如果能夠將上述任務都集中在Airflow的管理介面內,同時加上即時的失敗回報系統,那麼將能夠大大的減少Data團隊處理意外的時長,讓團隊更專注在資料與洞見本身。

Script + Cron, save the world
100x Script + Cron, disaster

Day1–2-選擇與實踐

經團隊評估後,發現Airflow應該能夠很好的解決我們遇到的問題,一開始設定能夠透過以下方式解決上述痛點:

  • 透過模組化並且統一的DAG Code Template,讓團隊成員都能夠花費較小的工作量完成排程的上架
  • 透過單一平台管理每日資料流狀態,並同時記錄工作日誌與推播通知當責成員
  • 制定明確命名及寫作規範(部分歸功於Airflow DAG模組化設計),並確保上架前會經過大部分團隊成員Review
  • 與叢集式架構整合,在資源稀缺的情況自動開啟新機器運行,閒置時自動關閉節省成本

最後還是希望在一個可控的範圍內,對系統進行最快速的迭代,以避免未知性召喚出更大顆的隕石,我們一開始還是選擇了用單機器(Single Instance)的方式進行開發與部署,確認這套系統是不是能夠符合我們對ETL工作流程管理上的預期。

原則上所有新增/轉移的工作流程,在單機器(Single Instance)或叢集式(Cluster)的架構底下都能夠通用,所以不需要擔心重工的部分實屬萬幸。

一不小心,就會召喚隕石

有賴於網路資源與Airflow的發展成熟,單機器Airflow的架設並沒有太多阻礙,我們也依照與Data團隊討論的ETL優先級進行轉移,定義任務模板,讓未來Data團隊成員能夠自主處理小規模的任務上架需求。

單機版本測試與轉移任務概覽

自建Airflow與服務商的成本抉擇

決定採納新架構/系統,當然也需要估算一下雲端服務費用成本會增加多少,同時比較包套服務成本,如果服務成本便宜又好用,那何苦自己再造一次輪子呢?

Day3-科技巨頭真方便,但好貴

Airflow已經被各大企業及公司行號使用許久,當然會有雲端服務商推出包套的託管服務,付出相應的成本,就能夠省去服務管理層面的麻煩,以下就先針對兩大雲端服務商端上桌的服務進行成本分析。

Google Cloud Composer

Cloud Composer是Google Cloud提供的Airflow完整架設服務
方便的地方是,能夠自動幫使用者生成需要的環境,包含K8S更新,連結Cloud Storage,免除掉99%的Infra管理的麻煩,但代價就是較高的收費
Cloud Composer主要收費是以叢集硬體費用為主
硬體費用
Web Server = $4.70 / 日 * 31 = $145.7
SQL Server = $7.82 / 日 * 31 = $242.42
Cloud Storage = $1 / Month
總計收費 $389.12 USD / Month

Amazon Managed Workflows for Apache Airflow (MWAA)

與Cloud Composer不同,AWS計價是以環境+任務數量x執行時間為計價標準
以小型環境24小時全月無休計算,附帶25個任務流程管理
環境架設費用
機器計價 (小時計費) = 31天 x 24 小時/日 = 744 小時
x $0.49 (每小時收費金額)
= $364.56

Worker Node費用
機器計價 (小時計費) = 31 日 x 1 小時/日 x 25 額外任務機器 = 775 小時
x $0.055 (任務機器每小時收費)
= $42.625

資料庫額外費用
10 GB or storage x $0.10 GB-month = $1.00
總收費 $408.185 USD / Month

Self Organized Instances

假設是自主架設的機器,則只會計算到機器開啟費用
單機器架構
以Google Cloud N2-Standard-2機器開啟一個月為例
$56.71

叢集架構
以Google Cloud 2台N2-standard-2 + 其他先佔機器
$113.42~/month

詳細優劣比較
簡易的收費比較

從上述的比較可以發現Managed Service有諸多好處(就是無腦又方便)
雖然說Composer的定價文件會強調:”如果使用以上架構N天N小時後刪除,收費僅有xx元“,以目前使用目的來說,根本不可能使用完就馬上把環境刪除,撈紀錄麻煩倒還好,腳本沒有成功刪除被額外收費可是很痛的。
衡量之後發現其實有部分好處是我們目前在架構轉型時享受不到的,同時卻要付出幾倍的成本,可能還不適合現在的我們,於是確定了剩下的日子就開始我們的踩坑之路。

自建Airflow部署之路

Day4-總之先搞懂叢集多了什麼

基本上單機器的環境相對單純,好好跟著官方文件的步驟安裝及設定資料庫,並啟動Airflow裡面幾個重要的元件:WebServer / Scheduler / DB,基本上就已經做好預設的環境架設,剩下只需要調整airflow.cfg內的參數,就能夠滿足大部分資料流轉移的需求了。
在架設完成實際用了幾天之後,我們還是認為這次專案不能停在這裡,單機器版本雖然運作得很好,但其他團隊沒辦法把DAG丟在Cloud Storage Bucket就坐等上架,其實跟我們原先設定「方便」的目標還是有些差距。

簡單說一下組成Airflow的整體架構,基本上Airflow包含了:

  • Airflow-Core
    核心內容,這邊可以進行平行數量/工作者數量調整,也針對DAG預設執行準則內容(是否預設重試,補足所有時間段資料等)定義
  • WebServer
    管理網頁元件頁面以及網域連結/SSL等資訊設定
  • Scheduler
    管理排程相關參數,包含殭屍排程/健康檢查/排程心跳及DAG檔案檢查時間定義
  • DB for metadata / DAG
    搜集所有排程執行成果與相應內容(包含Code本身與執行時間等),供WebServer介面呈現相關紀錄與分析

在切分明確的情況下,其實不難勾勒出叢集式架構的長相,把相應的服務設定轉換成 Service / Deployment / Pod,原則上就能夠各司其職,但是中間設定與轉換太過複雜,我們決定不自己處理這邊的問題,而是直接站在Bitnami的肩膀上。

從單機到叢集的改變,怕

Day5–6-叢集的部署之路

參考Bitnami的樣板文件後,便立刻著手進行相關的部署與配置。

我們把這整個過程拆分成三個步驟:

  • 開啟叢集
    第一步,當然先把硬體準備好,不然部署的所有Docker只會對你傻笑 “資源不足”
    以我們的需求來說,大概只需要兩台2vcpu 8gram的機器足矣,當然,如果你需要AutoScaler,也可以自行設定水平擴展機制,在系統用不到的時候把不需要的機器關掉,但我們採用了其他的作法與方案,這就是後話了
自動召喚Worker工作,便宜又省力,敬請期待第二季故事…
  • helm chart使用及部署相應容器化服務
    Helm Chart — Bitnami Airflow安裝及使用手冊
    首先根據Bitnami Dockerfile內容調整過後的Image,會先被我們推送到Container Registry作部署準備
    bitnami/airflow:2.1.2-debian-10-r16
    └── asia.gcr.io/somewhere-to-your-registry/bitnami-airflow
    bitnami/airflow-worker:2.1.1-debian-10-r9
    └── asia.gcr.io/somewhere-to-your-registry/bitnami-airflow-worker
    bitnami/airflow-scheduler:2.1.2-debian-10-r13
    └── asia.gcr.io/somewhere-to-your-registry/bitnami-airflow-scheduler
    接著根據設定的腳本去做相應部署動作,同時設定好身份相關內容設定

以下將有限地寫出腳本與相應部署yaml file,若有其他問題也歡迎在Medium下方留言討論。

Cluster ConfigMap安裝相關 k8s-setup.sh

# Python相關Package打包進configmap
kubectl create configmap requirements --from-file requirements.txt
# 系統預設DAG內容,從local端打包進configmap
kubectl create configmap dag-map --from-file dags

針對helm chart — bitnami airflow所使用的values.yaml

dagGKEDisk: "airflow-dag-bucket-here"
service:
type: LoadBalancer
loadBalancerIP: "your-cluster-ip"
metrics:
enabled: true
# 利用csiGcs連結cloud storage bucket當作儲存空間
csiGcs:
dagBucket: airflow-dag-bucket
dataBucket: airflow-data-bucket
containerSecurityContext:
enabled: true
web:
# 選定特定Node Pool掛載Workload
nodeSelector:
cloud.google.com/gke-nodepool: default-pool
image:
registry: asia.gcr.io
repository: somewhere-to-your-registry/bitnami-airflow
tag: latest
pullPolicy: Always
resources:
requests:
cpu: 300m
extraVolumeMounts:
# 掛載Cloud Storage服務的PV mount設定
- name: gcs-pvc-dag
mountPath: /opt/bitnami/airflow/dags/gcs
readOnly: true
extraVolumes:
# 掛載Cloud Storage服務的PV
- name: gcs-pvc-dag
persistentVolumeClaim:
claimName: gcs-pvc-dag
worker:
tolerations:
- key: "dedicated"
operator: "Equal"
value: "worker"
effect: "NoSchedule"
image:
registry: asia.gcr.io
repository: somewhere-to-your-registry/bitnami-airflow-worker
pullPolicy: Always
tag: latest
extraVolumeMounts:
# 相應Python Requirments預載
- name: requirements
mountPath: /bitnami/python/
# 掛載Cloud Storage服務的PV mount設定
- name: gcs-pvc-dag
mountPath: /opt/bitnami/airflow/dags/gcs
readOnly: true
extraVolumes:
# 從自行創建的Configmap提取相應Library
- name: requirements
configMap:
name: requirements
# 掛載Cloud Storage服務的PV
- name: gcs-pvc-dag
persistentVolumeClaim:
claimName: gcs-pvc-dag
resources:
requests:
cpu: 500m
memory: 8Gi
scheduler:
nodeSelector:
cloud.google.com/gke-nodepool: default-pool
image:
registry: asia.gcr.io
repository: somewhere-to-your-registry/bitnami-airflow-scheduler
tag: latest
pullPolicy: Always
extraVolumeMounts:
# 相應Python Requirments預載
- name: requirements
mountPath: /bitnami/python/
# 掛載Cloud Storage服務的PV mount設定
- name: gcs-pvc-dag
mountPath: /opt/bitnami/airflow/dags/gcs
readOnly: true
extraVolumes:
# 從自行創建的Configmap提取相應Library
- name: requirements
configMap:
name: requirements
# 掛載Cloud Storage服務的PV
- name: gcs-pvc-dag
persistentVolumeClaim:
claimName: gcs-pvc-dag
resources:
requests:
cpu: 500m
extraEnvVars:
# Log位置參數設定,詳細辛酸血淚史請參見細數大坑
- name: AIRFLOW__SCHEDULER__DAG_DIR_LIST_INTERVAL # dag_dir_list_interval
value: "60"
- name: AIRFLOW__SCHEDULER__MIN_FILE_PROCESS_INTERVAL
value: "60"
- name: AIRFLOW__LOGGING__REMOTE_LOGGING
value: "True"
- name: AIRFLOW__LOGGING__REMOTE_BASE_LOG_FOLDER
value: gs://somewhere-to-your-data-bucket/logs

helm_install.sh

# bash helm_install.sh
# 使用此腳本前請先確定有取得特定Cluster正確連線
# e.g. gcloud container clusters get-credentials $CLUSTER_NAME --zone $ZONE --project $PROJECT
RELEASE=${1:-"my-release"}
PACK="./bitnami/airflow"
# 設定相關yaml file
VAL_OVER=${2:-"values.yaml"}
echo "$RELEASE, $PACK, $VAL_OVER"# Export Secret相關資訊,不建議寫在yaml file當中
export AIRFLOW_PASSWORD=$自行設定用戶密碼
export AIRFLOW_FERNETKEY=$自行設定相應FernetKey
export POSTGRESQL_PASSWORD=$自行設定PGSQL資料庫
export REDIS_PASSWORD=$自行設定Redis密碼
# 進行helm chart安裝
helm install \
--set redis.auth.password=$REDIS_PASSWORD \
--set auth.password=$AIRFLOW_PASSWORD \
--set auth.fernetKey=$AIRFLOW_FERNETKEY \
--set postgresql.postgresqlPassword=$POSTGRESQL_PASSWORD \
-f $VAL_OVER \
$RELEASE $PACK

sleep 3
kubectl get services | grep $RELEASE-airflow
# echo ">>> helm get values $RELEASE"
# helm get values $RELEASE針對儲存空間以及參數的細部設定
部署完成後的Services
  • 參數調整與Cloud Storage掛載
    Airflow內含許多重要參數可依照使用者需求調整,相應內容請參閱此文件
    由於我們希望跟Cloud Composer一樣方便,預期是直接掛載指定的Storage Bucket達到DAG的存取/Log紀錄/爬蟲原始資料儲存等目的,需要借助一些第三方Kubernetes工具的力量,選擇採用 csi-gcs 這個套件,詳細設定相關請參閱社群文件

到這邊的設定大致完成,剩下比較多需要因應團隊使用或設備資源調整的參數未來有機會再跟大家分享。

細數大坑

Day7-並不完美,仍須努力

  • GCS-CSI連結Cloud Storage,帳單爆炸
    由於新版本Airflow會定期將DAG序列化成固定格式放在資料庫供Worker存取,又加上我們是使用Cloud Storage Bucket當作我們的DAG存放位置,不難想像會有相應的操作費用,但出乎我們意料的是,操作次數的計算跟我們想像得不一樣,導致服務剛上線前幾天帳單內的Class A Operation(對Bucket File內做非Get指令操作,詳細請閱讀此文件)次數暴增一千多萬次,帳單資費飛速上漲,為此我們做了兩次處理:
    1. 調整檔案存取區間,直接性的降低次數
    AIRFLOW_SCHEDULER_MIN_FILE_PROCESS_INTERVAL 30秒 > 60秒
    原則上會直接將操作次數砍半
    2. 針對Airflow內讀取檔案機制打上補丁,對沒有改變的檔案不做處理
    進行上述兩項調整後,單日帳單金額從$20快速消退到小於$0.2,足足省了一百倍,可喜可賀。
last_finish_time = self.get_last_finish_time(file_path)
if (last_finish_time is not None
and (now - last_finish_time).total_seconds() < self._file_process_interval
and not (is_mtime_mode and file_modified_time and (file_modified_time > last_finish_time))):
file_paths_recently_processed.append(file_path)
+L1072,L1074
# 針對未更動檔案不進行處理,降低執行費用
if last_finish_time is not None:
if is_mtime_mode and file_modified_time and (file_modified_time <= last_finish_time):
file_paths_recently_processed.append(file_path)
新版Airflow DAG Serialization機制
看那精美的Storage Access Count
  • 設定相關處處是坑,得用心爬文件
    Airflow相關設定千百種,時常會運行得不如預期,這時候就必須快速確定問題點,並好好看官方文件給出的指引進行修正,會是一條漫漫長路。

後續優化項目

寫到這邊,第一階段的冒險(踩坑)差不多結束了,實際使用後發現,阿福團隊內部除了Data之外,還有不少團隊也有可能需要用到這套系統,但資源需求度卻是不同維度的,例如:將使用者回報問題進行機器標記類別,資料整理並自動訓練等機器學習相關日常工作,以我們目前開啟的機器類型可能也不適合執行這類任務,於是第二階段的冒險就是要嘗試解決上述提到的資源需求不同的問題了。

每日狀態通報系統
即時失敗錯誤通報

結語與工作日常

與Data以及Server團隊合作將Airflow上線後,第一個體驗到的方便性在於一大早只要打開Slack就可以看到昨天相關ETL任務的執行狀態,以及任務失敗當下就會即時被通知轟炸,另外也透過撰寫模組化DAG與相關教學文件讓Data團隊能夠快速在測試環境試錯與上架新任務,最後當然是盡可能把失散各地的工作集中到單一的Dashboard上監控,才能夠最快的排查問題/補足資料,應該還算是個成功的專案吧!
希望大家還喜歡我們分享緊湊但有趣的開發日常,Solution Team在阿福內部有時穿梭於專案之間,有時又躲回自己研究的小角落,沒有完全固定的工作項目,除了研究之外,近期比較常在協助團隊內的大家解決工作流程優化問題,雖然說每次遇到的題目不一定是自己熟悉的內容,但只要團隊間有適當的討論與推演,都能夠找出適合我們目前使用的解決方法,假如喜歡這種有挑戰性的工作日常,也歡迎來投遞履歷,加入我們!

👏 如果喜歡這篇文章,可以幫阿福拍手
🤗 想要加入阿福管家,可以查看我們的職缺
🙌 歡迎追蹤我們的 MediumFacebook 關注阿福管家的團隊資訊喔!

--

--