[Apache Airflow Fundamentals] สร้าง data pipeline ที่ยืดหยุ่นขึ้นด้วย Skip + Branching + ShortCircuit

Punsiri Boonyakiat
CJ Express Tech (TILDI)
5 min readJan 28, 2024

ต่อจากบทความที่แล้ว เราเล่าพื้นฐานการทำงานของ Scheduler + และการ Trigger บน ​ Airflow เชื่อว่าหลายๆคนน่าจะพอสามารถเอาหลักการการใช้ trigger_rule ไปใช้สร้าง และควบคุมการทำงาน workflow ในแต่ละ pipeline ได้พอสมควร

อย่างไรก็ดีหากเราอยากออกแบบ pipelineให้มีความยืดหยุ่นมากขึ้น สามารถมีเงื่อนไขในการทำงานที่มากขึ้น เช่นลัดหรือข้ามบางขั้นตอนของ DAG เพื่อป้องกันการนำ data ที่ไม่ต้องการเข้าไปยัง downstream หรือสร้างรูปแบบ pipeline ที่ซับซ้อนมากขึ้น บทความนี้ขอมาตอบโจทย์สิ่งนี้ค่ะ

#Table of Contents
1. AirflowSkipException
2. Branching
3. Short Circuit
4. ส่งท้าย :)

แน่นอนว่า Data Engineer ทุกคนน่าจะทราบอยู่แล้วว่าการออกแบบ Pipeline ของ Airflow นั้นจะเป็นแบบ Directed Acyclic Graph คือแต่ละ task จะไม่มีการ flow ลูปย้อนหลัง จะมีแต่ flow ไปข้างหน้าจาก upstream ไปยัง downstream อย่างไรก็ดี ถ้าเราอยากข้าม หรือลัด task บางขั้นตอนให้ pipeline ทำงานได้มีคุณภาพ รับมือกับรูปแบบ data ที่หลากหลายได้จะทำยังไง? คำตอบคือการ skip บาง task ซึ่ง technique ที่เราหยิบออกมาใช้ และทำได้หลายวิธี!

1. AirflowSkipException

การ skip ใน Airflow สามารถทำได้ ผ่านการ skip การทำงานของ task ที่สร้างจาก PythonOperatorโดยเราจะใช้การประกาศ exception ที่ชื่อว่า AirflowSkipException เมื่อ task ถูก execute มาถึง line ที่มีการประกาศ exception นี้ taks ดังกล่าวก็จะถูกข้ามการทำงานและขึ้น status เป็น skipped

from airflow.exceptions import AirflowSkipException # import library ที่ใช่ในการ skip task

#################################################################
# วีธีการเขียน condition check ใน PythonOperator เพื่อประกาศ skip exception
if condition:
raise AirflowSkipException
#################################################################

โดยจากตัวอย่างด้านล่างเราสามารถ skip task ได้ด้วยการเขียน condition ใน python เช่น row_count ของ upstream มีค่าเป็น 0 ให้ข้าม stage ในการ transfrom และ upload ข้อมูลจาก local ไปยัง downstream ที่ GCS

import pandas
from airflow import DAG
from datetime import datetime
from airflow.decorators import task
from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.exceptions import AirflowSkipException

with DAG(
'sample_skip_if_empty_upload',
schedule_interval='@daily',
start_date=datetime(2024, 1, 25),
) as dag:

ingest = GCSToLocalFilesystemOperator(
task_id="ingest",
bucket="YOUR_GSC_BUCKET_ID",
object_name="sample.csv",
filename="sample.csv",
)

@task
def transfrom():
ingest_data = pandas.read_csv("sample.csv")
if len(ingest_data) == 0:
raise AirflowSkipException
else:
ingest_data['type'] = "DE"
ingest_data.to_parquet("sample.parquet")

load = LocalFilesystemToGCSOperator(
task_id="load",
src="sample.parquet",
dst="serving/sample.parquet",
bucket="YOUR_GSC_BUCKET_ID",
)

ingest >> transfrom() >> load

เราจะเห็นว่า task แรกจะเป็นการอ่าน file จาก Google Cloud Storage (GCS) ลงมาที่ local และ task ที่2 เป็นการ transform ข้อมูล ก่อนที่จะ upload ไฟล์จาก local ไปยัง GCS โดยในระหว่างที่ transform หากอ่าน file ได้ค่า row count = 0จะมี condition ในการตรวจสอบ และทำการ skip ซึ่งจะเป็นการป้องกัน​ fault alarm ถ้า DAG failed และไม่ให้เอาข้อมูลที่ผิดพลาดส่งไป serving

AirflowSkipException

2. Branching

Branching หรือการแยก task flows ออกเป็นหลายๆสาขานั้นก็มีประโยชน์มากหากเราต้องการรัน pipeline ที่มีการทำงานหลายรูปแบบที่ขึ้นอยู่กับ condition ของข้อมูล หรือรายละเอียดงานที่เราจะต้องการทำ โดยเราสามารถใช้ operator ที่ชื่อว่า BranchPythonOperator ในการตรวจสอบเงื่อนไงในการทำงาน และหากตรงกับเงื่อนไขใด เงื่อนไขหนึ่ง pipeline DagRun ก็จะทำงานใน branch ที่ตรงกับเงื่อนไขนั้นๆ และ skip การทำงานของ branch ที่ไม่ตรงกับเงื่อนไขใน DagRun นั้นๆ

จากรูปด้านล่าง จะว่าไปแล้วก็เหมือนกับเป็นท่อที่เราสามารถปรับรูปแบบการส่งข้อมูลตาม conditions ในการบริหารจัดการ pipeline ซึ่ง BranchPythonOperator เป็นเหมือนวาล์วน้ำ ถ้าไม่มีทุกท่อที่แยกออกก็จะไหลพร้อมๆกัน แต่ถ้ามีวาล์วก็จะสามารถควบคุมได้ว่าให้ไหลไปตามทิศทางใดบ้าง

โดยการใช้งาน BranchPythonOperator จะทำโดยการสร้าง function ในการตัดสินใจการแยกแยะในกระบวนการทำงานของ Dag Run ในรอบนั้นๆ

# วิธีการใช้งาน BranchPythonOperator
#################################################################
def branching_condition(result: int = 0): # python condition ในการเลือกทางที่จะให้ DAG ทำงาน
if result > 0.5:
return 'task_a' # return ค่าเป็น task_id ที่ต้องการ
return 'task_b'

branching = BranchPythonOperator(
task_id='branching',
python_callable=branching_condition #เรียก function ที่ตัดสนใจในการเลือก branch
)
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.edgemodifier import Label

import random
from pendulum import datetime

with DAG(
dag_id='branch_python_operator_example',
start_date=datetime(2024, 1, 1),
schedule="@daily"
) as dag:

start = EmptyOperator(task_id='start')
# ตัวเลือกในการตัดสินใจมี 3 branch คือ a,b,c
options = ['branch_a', 'branch_b', 'branch_c']
branching = BranchPythonOperator( #เรียกใช้ BranchPythonOperator ในการแยกรันงานตาม condition
task_id='branching',
python_callable=lambda: random.choice(options), # สุ่มค่า option จากการรันต่อละรอบของ pipeline ด้วย lambda function
)

end = EmptyOperator(task_id='end' ,trigger_rule="none_failed_min_one_success")

start >> branching
for option in options: #วนลูปตาม options ที่มีเพื่อสร้าง task flow

option_task = EmptyOperator(task_id=option)
option_follow_task = EmptyOperator(task_id='follow_' + option)

# สร้าง task flow ของแต่ละ branch
branching >> Label(option) >> option_task >> option_follow_task >> end
BranchPythonOperator

จากตัวอย่างด้านบนจะเห็นว่าเราสร้าง DAG ที่เวลาทำงานจะตัดสินในได้ 3 รูปแบบในแต่ละช่วงของการรัน pipeline ซึ่งหาก function ในการตัดสินใจได้ค่าใดค่าหนึ่งก็จะ ทำการรันแค่ task flow ที่ตรงกับ condition นั้น ดังนั้นเราจึงเห็นว่าเมื่อตัดสินใจว่าจะทำ branch_a ก็จะทำการ skip การทำงานของ branch_b และ branch_c และจากตัวอย่างนี้ถ้าหากไม่มี BranchPythonOperator ทั้งสาม branch จะรัน parallel กัน

กรณี use caseในการนำ BranchPythonOperator ไปใช้งานนั้นก็น่าสนใจ เราสามารถไปประยุกต์สร้าง pipeline ที่ยืดหยุ่นและซับซ้อนได้ตาม python_callableในการตัดสิน flowในแต่ละ running interval

โดยผู้เขียนแนะนำ use case สี่แบบในการเอา BranchPythonOperator ไปใช้ในการตัดสินใจการทำงานใน pipeline มานำเสนอดังนี้

  1. ตัดสินใจตามสถานะของข้อมูล / ตามสถานะงาน — เช่นสถานะของข้อมูล หรือสถานะของงานของ upstream จากการดึงค่า Xcom ของ task ก่อนหน้ามาทำการตรวจสอบ
def branching(**kwargs):
data_status = kwargs['ti'].xcom_pull(task_ids='extract_data')
# ใช้ XCom ดึงข้อมูลการ Extract ข้อมูลจาก upstream task แล้วตัดสินใจ
if data_status == 'success': return 'data_available_branch'
elif data_status == 'incompleted': return 'handle_incompleted_data_branch'
elif data_status == 'invalid': return 'handle_invalid_data_branch'
else: return 'no_data_branch'

branch_task = BranchPythonOperator(
task_id='branching',
python_callable=branching
)

2. ตัดสินใจตามเงื่อนไขเวลา — เช่นเวลาช่วงที่แตกต่างกันตามวันหยุด หรือเวลาที่แตกต่างกันใน interval ที่ pipeline ถูกตั้งให้ทำงานระหว่างวัน (เช้า เที่ยง เย็น)

def branching(**kwargs): # function ตัดสินใจเพื่อระบุ branch ที่จะทำงานในแต่ละ interval
current_day = kwargs['execution_date'].weekday()
if current_day < 5: # วันจันทร์ถึงวันศุกร์
return 'weekday_branch'
else: return 'weekend_branch' #วันเสาร์อาทิตย์

branch_task = BranchPythonOperator(
task_id='branching',
python_callable=branching
)

3. ตัดสินใจตามการวิเคราะห์ข้อมูล/ การทำนายผลของ ML model — เช่นการตรวจสอบความถูกต้องในการวิเคราะห์ข้องมูลเพื่อทำ data quality หรือผลจากการทำนายค่าอ้างอิงจากข้อมูลที่ได้จาก XCom ของ task ก่อนหน้า

def branching(**kwargs):
# ฟังก์ชันนี้ให้ค่าตัดสินใจเพื่อระบุทางที่จะไปตามการวิเคราะห์ข้อมูล
analysis_result = kwargs['ti'].xcom_pull(task_ids='analyze_data_task')
if analysis_result == 'positive': return 'positive_analysis_branch'
else: return 'negative_analysis_branch'

branch_task = BranchPythonOperator(
task_id='branching',
python_callable=branching
)

4. ตัดสินใจตาม DAG Run Type — เช่นในการรันแบบ manual หรือ scheduled เราต้องการให้ pipeline ทำงานแตกต่างกัน เช่นเมื่อรัน manual อาจจะเป็นการซ่อมข้อมูล และการรันแบบ scheduled เป็นการเอาข้อมูลเข้าแบบ incremental

def branching(**kwargs):
# ฟังก์ชันนี้ให้ค่าตัดสินใจเพื่อระบุทางที่จะไปตามการวิเคราะห์ข้อมูล
run_id = kwargs["dag_run"].run_id
is_manual = run_id.startswith("manual__")
is_scheduled = run_id.startswith("scheduled__")
if is_manual: return "process_triggered_ingest"
elif is_scheduled: return "process_scheduled_ingest"

branch_task = BranchPythonOperator(
task_id='branching',
python_callable=branching
)

*** ปล. หมายเหตุตัวโตๆ — จริงๆแล้วนอกเหนือจาก BranchPythonOperator ก็ยังมี Branching Operator ตัวอื่นๆในสังกัดที่น่าสนใจที่แนะนำให้ศึกษาเพิ่ม เช่น

  • BranchSQLOperator: เลือกตัดสินใจด้วยการเขียน SQL และให้ผลลัทธ์ เป็น true หรือfalse
  • BranchDayOfWeekOperator: เลือกตัดสินใจตามวันที่ Dag Run โดยการระบุค่า week_day parameter.
  • BranchDateTimeOperator: เลือกตัดสินใจตามเวลา ณ ปัจจุบันที่ DAG ทำงาน โดยมีค่าระหว่างเวลา สองช่วงคือtarget_lower และ target_upper

3. Short Circuit

“Short Circuit” หรือที่แปลว่า “ลัดวงจร” ในภาษาไทย คือการที่เราสามารถมีเงื่อนไขในการตัดสินใจของ pipeline ว่าจะลัดขั้นตอนบาง tasks หรือไม่ โดยเราจะใช้ operator ที่ชื่อว่าShortCircuitOperator เพื่อควบคุมว่า pipeline ทำงานต่อหรือไม่ตามเงื่อนไขที่​ DE เขียนใน python_callable ฟังก์ชัน โดยถ้า function ที่ตรวจสอบเงื่อนไขในการตัดสินใจได้ค่าเป็น true pipeline ทำงานต่อจนจบ แต่ถ้าค่าเป็น falseก็จะถูก short-circuited หรือทำให้ลัดวงจรตามที่กำหนดไว้

from airflow.operators.python_operator import ShortCircuitOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from airflow import DAG

with DAG(
dag_id='example_short_circuit',
start_date=datetime(2024, 1, 21),
schedule="@daily"
) as dag:

start = EmptyOperator(task_id='start')
end = EmptyOperator(task_id='end', trigger_rule="all_done")

task1 = EmptyOperator(task_id='task1')
task1_follow = EmptyOperator(task_id='task1_follow')

task2 = EmptyOperator(task_id='task2')
task2_follow = EmptyOperator(task_id='task2_follow')
task2_print = PythonOperator(
task_id="task2_print",
python_callable=lambda: print("task2_print"),
trigger_rule='always'
)

short_circuit_task1 = ShortCircuitOperator(
task_id='short_circuit_task1',
python_callable=lambda: True,
)
short_circuit_task2 = ShortCircuitOperator(
task_id='short_circuit_task2',
python_callable=lambda: False,
ignore_downstream_trigger_rules=False
)

start >> short_circuit_task1 >> task1 >> task1_follow >> end
start >> short_circuit_task2 >> task2 >> task2_follow >> task2_print >> end

ShortCircuitOperator ถูกใช้เพื่อทำการตรวจสอบเงื่อนไขที่กำหนดไว้และหากเงื่อนไขถูกต้องคืนค่า Trueและทำงานตาม task ที่กำหนด แต่ถ้าไม่ตรงเงื่อนไขจะคืนค่า False และลัดวงจร หรือข้ามงานทั้งหมด ซึ่งจากตัวอย่าง python ด้านบนจะเป็นสอง flows ที่รัน parallel กัน โดยเส้นแรกจะเป็นตัวอย่างที่เงื่อนไขของ short circuit คืนค่าเป็น True ทำให้ pipeline ทำงานสำเร็จทั้ง flow ส่วนเส้นที่สองจะเป็นตัวอย่างค่าเป็น False โดยค่าปกติเมื่อ python_callable ได้ค่าเป็น Falseจะลัดวงจรและข้ามการทำงานของ downstream ทั้งหมด แต่จาก python code ด้านบนมีการระบุว่าให้ใช้ trigger_rule ของ downstram มาควบคุมการทำงานของ pipeline ด้วยคำสั่ง ignore_downstream_trigger=False จึงลัดวงจรมาทำ task2_print ที่มีค่า trigger_rule='all_done' ต่อ ซึ่งจะเห็นว่าเราสามารถใช้ทั้ง trigger_rule และShortCircuitOperator ร่วมกันมาออกแบบ pipeline ได้ตามโจทย์ที่ต้องการ!

ShortCircuitOperator แตกต่างกันตามการควบคุม Trigger Rule เพื่อลัดทั้งวงจร หรือ ข้ามแค่บางส่วน

เราสามารถใช้ ShortCircuitOperator ไปประยุกต์ในการออกแบบ pipeline ที่ซับซ้อนมากขึ้นได้เช่น ตัวอย่าง use case ที่ผู้เขียนขอนำเสนอ ดังต่อไปนี้

  1. ตรวจสอบการอัปเดตข้อมูลก่อน upsert — เช่นเราอาจจะไปประยุกต์ในกรณีที่ต้องการ check ข้อมูล ทำ data reconciliation / data validation ว่าข้อมูลใน ingest เป็นข้อมูลล่าสุดไหม ถูกต้องตาม schema ที่กำหนดไหมก่อนที่จะ update หรือ insert ข้อมูลเข้า downstream
# ตรวจสอบการอัปเดตข้อมูลก่อน upsert
data_reconciliation_check_task = ShortCircuitOperator(
task_id='data_reconciliation_check',
python_callable=data_reconciliation_check,
provide_context=True,
)

2. ตรวจสอบค่าตามความต้องการของ Business Domain — เช่นตรวจสอบว่าค่าที่ได้รับจาก task ปัจจุบัน หรือก่อนหน้า ตรงกับค่ามาตรฐานของธุรกิจที่สอดคล้องกับ data model ที่ pipeline ส่งออกไปยัง serving หรือไม่

# ตรวจสอบค่าตามความต้องการของ Business Domain
check_business_logic_task = ShortCircuitOperator(
task_id='check_business_logic',
python_callable=retail_logic_check,
provide_context=True,
)

3. ตรวจสอบสถานะระบบ external system — เช่น API provider, Database ว่ามีความพร้อมในการให้บริการไหม ถ้าระบบที่เป็น upstream ขัดข้อง จะต้องทำการลัดวงจร และมี task ที่ทำการแจ้งเตือนไปยังเจ้าของ system นั้นต้นทางให้แก้ไข

# ตรวจสอบสถานะระบบ external system
system_status_check_task = ShortCircuitOperator(
task_id='external_system_status_check_task',
python_callable=check_system_status,
provide_context=True,
)

4. ส่งท้าย :)

ในบทความนี้เสนอการสร้าง pipeline ที่มีความยืดหยุ่นผ่านการข้ามงาน หรือแยก branch ของ task flow ซึ่งผู้เขียนเองก็มีโอกาสได้นำเทคนิคเหล่านี้ไปใช้จริงในการสร้าง data pipeline ของตนเองเช่นกัน :) หวังว่าบทความนี้จะมีประโยชน์กับ เพื่อนๆพี่ๆน้องๆ DE ทุกท่านที่กำลังศึกษา หรืออาจจะทำงานบน Apache Airflow อยู่แล้วและสามารถนำไปต่อยอดหรือประยุกต์กับ pipeline ของทุกคนได้ค่ะ

🔥 หากบทความนี้มีประโยชน์อย่าลืม กดปรบมือ กดแชร์ กดติดตาม ให้ด้วยนะคะ 🤩ผู้เขียนตั้งใจว่าจะปล่อยบทความเกี่ยวกับพื้นฐานงาน DE อีกในปีนี้ 🔥🔥

ทุกกดปรบมือคือกำลังใจหลักเลยค่ะ :) ขอให้ทุกคนสนุกกับงาน data pipeline 🤩

Reference

--

--

Punsiri Boonyakiat
CJ Express Tech (TILDI)

Senior Data Engineer | Google Cloud Certified Professional Data Engineer| Traveller | A Mom | Learner