[Data] Data Pipeline 101(二) — Stage 和 Job

Bryan Yang
A multi hyphen life
5 min readMar 17, 2020

--

前人種樹後人乘涼

如果把 Data Pipeline 是資料處理系統的總稱,這個系統中的最小單位是 Stage/ Task,一個或多個 Stage/ Task 會組成一條基本的處理資料的 Job。有了這個基本認知後,再接下來細看每個東西是在幹嘛的。

Stage/ Task

每個 Stage 代表了單一的工作,裡面可以做很多事,不管是讀取資料、過濾資料、把資料寫到某個地方之類,Input 的資料經過這個 Stage 產生出某種 Output。

以 Airflow 的設計來說,每個 Operator 就是一個獨立的 task

t1 = BashOperator(
task_id='print_date',
bash_command='date'
)

t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3)

每個 task 很明顯就是一個 Bash 的操作。不過當然你也可以在一個 Bash Operator 裡面插入一個非常複雜的 Bash 操作。

Job

通常一個資料處理任務不會只有一個 Stage,通常會有多個不同的 Stage 組合在一起。以一個基本 Aggregation 的任務為例,這些 Stage 「可能」包含了以下東西:

  1. 一個負責監聽資料源或是上游 Job
  2. 一個負責建立最終的表格
  3. 一個負責讀取資料+處理資料
  4. 一個負責把處理完的資料寫到某個地方
  5. 可能會有個任務清除中間的暫存表
  6. 可能會有個任務負責統計處理完的結果
  7. 可能有個任務負責表示以上任務都順利完成

這些 Stages 會有強烈的上下游依賴關係,通常一定要上游成功,才會進入下游的 Stage;有些 Stage 會負責做成功失敗的判斷,比如偵測到失敗就會寄送 Slack 訊息。

一個 Job 在 Airflow 裡面設定為一個 Dag,一個 Dag 裡面會有些常用的設定,不管你是不是用 Airflow 都可以參考實作一樣的設定。這些設定包括:

default_args = {
'owner': 'airflow', # Job 的所有者,壞掉要寄信給他
'depends_on_past': False, # 過去的 Job 如果壞掉,今天要不要繼續跑
'start_date': days_ago(2), # Job 要從幾號開始定期執行
'email': ['airflow@example.com'],
'email_on_failure': False,
'email_on_retry': False, # Email 成功失敗要寄給誰
'retries': 1, # 壞了要重做幾次
'retry_delay': timedelta(minutes=5),
# 'on_failure_callback': some_function, # 如果 job 成功或失敗要不要 call out 其他的 function
# 'on_success_callback': some_other_function,
# 'on_retry_callback': another_function,
# 'sla_miss_callback': yet_another_function,
# 'trigger_rule': 'all_success'
}

Q & A

  1. 一個 Stage 到底要多大?
    通常就是以重新執行時,可以接受的最小單位為主。例如部署一個模型,會有讀取資料、訓練資料、預測資料、寫入資料庫等幾個步驟。當然一個 Job 要成功都是要做完這幾件事,但是訓練和預測都需要不少的時間,而且之間的關聯是獨立的,這時候我就會選擇拆開成為不同的 Stage。
  2. 什麼時候要切成一個 Job?Job 之間的 Dependency 怎麼處理?
    我自己「通常」是以一個「產出」當成一個 Job。這些產出可能包括報表、可能包括一個Table,或某個寫入 s3 的資料。在這種情況下,當「產出」之間有依賴關係時(例如有個 TableB,依賴 TableA 才能計算),處理起來就很麻煩。
    那另外一種方式就是把所有有 Dependency 的產出都放在同一個 Job 裡,這個 Job 就是一個超級大的 Job,彼此之間的關係一目瞭然。
    依賴一定要處理好,要確保當上游任務延遲或是重跑時,下游任務能仍正常被處理。
  3. Data pipeline 中的關係非常重要也非常難處理。特別是當任務一多,先前任務對於資料的處理邏輯,都會影響到下游每一個任務。當任務關聯越複雜,每個任務之間的影響越難被追蹤跟管理。

--

--

Bryan Yang
A multi hyphen life

Data Engineer, Data Producer Manager, Data Solution Architect