[Apache Airflow Fundamentals] สร้าง data pipeline ที่ยืดหยุ่นขึ้นด้วย Skip + Branching + ShortCircuit
ต่อจากบทความที่แล้ว เราเล่าพื้นฐานการทำงานของ Scheduler + และการ Trigger บน Airflow เชื่อว่าหลายๆคนน่าจะพอสามารถเอาหลักการการใช้
trigger_rule
ไปใช้สร้าง และควบคุมการทำงาน workflow ในแต่ละ pipeline ได้พอสมควรอย่างไรก็ดีหากเราอยากออกแบบ pipelineให้มีความยืดหยุ่นมากขึ้น สามารถมีเงื่อนไขในการทำงานที่มากขึ้น เช่นลัดหรือข้ามบางขั้นตอนของ DAG เพื่อป้องกันการนำ data ที่ไม่ต้องการเข้าไปยัง downstream หรือสร้างรูปแบบ pipeline ที่ซับซ้อนมากขึ้น บทความนี้ขอมาตอบโจทย์สิ่งนี้ค่ะ
#Table of Contents
1. AirflowSkipException
2. Branching
3. Short Circuit
4. ส่งท้าย :)
แน่นอนว่า Data Engineer ทุกคนน่าจะทราบอยู่แล้วว่าการออกแบบ Pipeline ของ Airflow นั้นจะเป็นแบบ Directed Acyclic Graph คือแต่ละ task จะไม่มีการ flow ลูปย้อนหลัง จะมีแต่ flow ไปข้างหน้าจาก upstream ไปยัง downstream อย่างไรก็ดี ถ้าเราอยากข้าม หรือลัด task บางขั้นตอนให้ pipeline ทำงานได้มีคุณภาพ รับมือกับรูปแบบ data ที่หลากหลายได้จะทำยังไง? คำตอบคือการ skip บาง task ซึ่ง technique ที่เราหยิบออกมาใช้ และทำได้หลายวิธี!
1. AirflowSkipException
การ skip ใน Airflow สามารถทำได้ ผ่านการ skip การทำงานของ task ที่สร้างจาก PythonOperator
โดยเราจะใช้การประกาศ exception ที่ชื่อว่า AirflowSkipException
เมื่อ task ถูก execute มาถึง line ที่มีการประกาศ exception นี้ taks ดังกล่าวก็จะถูกข้ามการทำงานและขึ้น status เป็น skipped
from airflow.exceptions import AirflowSkipException # import library ที่ใช่ในการ skip task
#################################################################
# วีธีการเขียน condition check ใน PythonOperator เพื่อประกาศ skip exception
if condition:
raise AirflowSkipException
#################################################################
โดยจากตัวอย่างด้านล่างเราสามารถ skip task ได้ด้วยการเขียน condition ใน python เช่น row_count
ของ upstream มีค่าเป็น 0 ให้ข้าม stage ในการ transfrom และ upload ข้อมูลจาก local ไปยัง downstream ที่ GCS
import pandas
from airflow import DAG
from datetime import datetime
from airflow.decorators import task
from airflow.providers.google.cloud.transfers.gcs_to_local import GCSToLocalFilesystemOperator
from airflow.providers.google.cloud.transfers.local_to_gcs import LocalFilesystemToGCSOperator
from airflow.exceptions import AirflowSkipException
with DAG(
'sample_skip_if_empty_upload',
schedule_interval='@daily',
start_date=datetime(2024, 1, 25),
) as dag:
ingest = GCSToLocalFilesystemOperator(
task_id="ingest",
bucket="YOUR_GSC_BUCKET_ID",
object_name="sample.csv",
filename="sample.csv",
)
@task
def transfrom():
ingest_data = pandas.read_csv("sample.csv")
if len(ingest_data) == 0:
raise AirflowSkipException
else:
ingest_data['type'] = "DE"
ingest_data.to_parquet("sample.parquet")
load = LocalFilesystemToGCSOperator(
task_id="load",
src="sample.parquet",
dst="serving/sample.parquet",
bucket="YOUR_GSC_BUCKET_ID",
)
ingest >> transfrom() >> load
เราจะเห็นว่า task แรกจะเป็นการอ่าน file จาก Google Cloud Storage (GCS) ลงมาที่ local และ task ที่2 เป็นการ transform ข้อมูล ก่อนที่จะ upload ไฟล์จาก local ไปยัง GCS โดยในระหว่างที่ transform หากอ่าน file ได้ค่า row count = 0
จะมี condition ในการตรวจสอบ และทำการ skip ซึ่งจะเป็นการป้องกัน fault alarm ถ้า DAG failed และไม่ให้เอาข้อมูลที่ผิดพลาดส่งไป serving
2. Branching
Branching หรือการแยก task flows ออกเป็นหลายๆสาขานั้นก็มีประโยชน์มากหากเราต้องการรัน pipeline ที่มีการทำงานหลายรูปแบบที่ขึ้นอยู่กับ condition ของข้อมูล หรือรายละเอียดงานที่เราจะต้องการทำ โดยเราสามารถใช้ operator ที่ชื่อว่า BranchPythonOperator
ในการตรวจสอบเงื่อนไงในการทำงาน และหากตรงกับเงื่อนไขใด เงื่อนไขหนึ่ง pipeline DagRun
ก็จะทำงานใน branch ที่ตรงกับเงื่อนไขนั้นๆ และ skip การทำงานของ branch ที่ไม่ตรงกับเงื่อนไขใน DagRun
นั้นๆ
จากรูปด้านล่าง จะว่าไปแล้วก็เหมือนกับเป็นท่อที่เราสามารถปรับรูปแบบการส่งข้อมูลตาม conditions ในการบริหารจัดการ pipeline ซึ่ง BranchPythonOperator
เป็นเหมือนวาล์วน้ำ ถ้าไม่มีทุกท่อที่แยกออกก็จะไหลพร้อมๆกัน แต่ถ้ามีวาล์วก็จะสามารถควบคุมได้ว่าให้ไหลไปตามทิศทางใดบ้าง
โดยการใช้งาน BranchPythonOperator
จะทำโดยการสร้าง function ในการตัดสินใจการแยกแยะในกระบวนการทำงานของ Dag Run
ในรอบนั้นๆ
# วิธีการใช้งาน BranchPythonOperator
#################################################################
def branching_condition(result: int = 0): # python condition ในการเลือกทางที่จะให้ DAG ทำงาน
if result > 0.5:
return 'task_a' # return ค่าเป็น task_id ที่ต้องการ
return 'task_b'
branching = BranchPythonOperator(
task_id='branching',
python_callable=branching_condition #เรียก function ที่ตัดสนใจในการเลือก branch
)
from airflow.models.dag import DAG
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import BranchPythonOperator
from airflow.utils.edgemodifier import Label
import random
from pendulum import datetime
with DAG(
dag_id='branch_python_operator_example',
start_date=datetime(2024, 1, 1),
schedule="@daily"
) as dag:
start = EmptyOperator(task_id='start')
# ตัวเลือกในการตัดสินใจมี 3 branch คือ a,b,c
options = ['branch_a', 'branch_b', 'branch_c']
branching = BranchPythonOperator( #เรียกใช้ BranchPythonOperator ในการแยกรันงานตาม condition
task_id='branching',
python_callable=lambda: random.choice(options), # สุ่มค่า option จากการรันต่อละรอบของ pipeline ด้วย lambda function
)
end = EmptyOperator(task_id='end' ,trigger_rule="none_failed_min_one_success")
start >> branching
for option in options: #วนลูปตาม options ที่มีเพื่อสร้าง task flow
option_task = EmptyOperator(task_id=option)
option_follow_task = EmptyOperator(task_id='follow_' + option)
# สร้าง task flow ของแต่ละ branch
branching >> Label(option) >> option_task >> option_follow_task >> end
BranchPythonOperator
จากตัวอย่างด้านบนจะเห็นว่าเราสร้าง DAG ที่เวลาทำงานจะตัดสินในได้ 3 รูปแบบในแต่ละช่วงของการรัน pipeline ซึ่งหาก function ในการตัดสินใจได้ค่าใดค่าหนึ่งก็จะ ทำการรันแค่ task flow ที่ตรงกับ condition นั้น ดังนั้นเราจึงเห็นว่าเมื่อตัดสินใจว่าจะทำ branch_a
ก็จะทำการ skip การทำงานของ branch_b
และ branch_c
และจากตัวอย่างนี้ถ้าหากไม่มี BranchPythonOperator
ทั้งสาม branch จะรัน parallel กัน
กรณี use caseในการนำ
BranchPythonOperator
ไปใช้งานนั้นก็น่าสนใจ เราสามารถไปประยุกต์สร้าง pipeline ที่ยืดหยุ่นและซับซ้อนได้ตาม python_callableในการตัดสิน flowในแต่ละ running interval
โดยผู้เขียนแนะนำ use case สี่แบบในการเอา BranchPythonOperator
ไปใช้ในการตัดสินใจการทำงานใน pipeline มานำเสนอดังนี้
- ตัดสินใจตามสถานะของข้อมูล / ตามสถานะงาน — เช่นสถานะของข้อมูล หรือสถานะของงานของ upstream จากการดึงค่า Xcom ของ task ก่อนหน้ามาทำการตรวจสอบ
def branching(**kwargs):
data_status = kwargs['ti'].xcom_pull(task_ids='extract_data')
# ใช้ XCom ดึงข้อมูลการ Extract ข้อมูลจาก upstream task แล้วตัดสินใจ
if data_status == 'success': return 'data_available_branch'
elif data_status == 'incompleted': return 'handle_incompleted_data_branch'
elif data_status == 'invalid': return 'handle_invalid_data_branch'
else: return 'no_data_branch'
branch_task = BranchPythonOperator(
task_id='branching',
python_callable=branching
)
2. ตัดสินใจตามเงื่อนไขเวลา — เช่นเวลาช่วงที่แตกต่างกันตามวันหยุด หรือเวลาที่แตกต่างกันใน interval ที่ pipeline ถูกตั้งให้ทำงานระหว่างวัน (เช้า เที่ยง เย็น)
def branching(**kwargs): # function ตัดสินใจเพื่อระบุ branch ที่จะทำงานในแต่ละ interval
current_day = kwargs['execution_date'].weekday()
if current_day < 5: # วันจันทร์ถึงวันศุกร์
return 'weekday_branch'
else: return 'weekend_branch' #วันเสาร์อาทิตย์
branch_task = BranchPythonOperator(
task_id='branching',
python_callable=branching
)
3. ตัดสินใจตามการวิเคราะห์ข้อมูล/ การทำนายผลของ ML model — เช่นการตรวจสอบความถูกต้องในการวิเคราะห์ข้องมูลเพื่อทำ data quality หรือผลจากการทำนายค่าอ้างอิงจากข้อมูลที่ได้จาก XCom ของ task ก่อนหน้า
def branching(**kwargs):
# ฟังก์ชันนี้ให้ค่าตัดสินใจเพื่อระบุทางที่จะไปตามการวิเคราะห์ข้อมูล
analysis_result = kwargs['ti'].xcom_pull(task_ids='analyze_data_task')
if analysis_result == 'positive': return 'positive_analysis_branch'
else: return 'negative_analysis_branch'
branch_task = BranchPythonOperator(
task_id='branching',
python_callable=branching
)
4. ตัดสินใจตาม DAG Run Type — เช่นในการรันแบบ manual หรือ scheduled เราต้องการให้ pipeline ทำงานแตกต่างกัน เช่นเมื่อรัน manual อาจจะเป็นการซ่อมข้อมูล และการรันแบบ scheduled เป็นการเอาข้อมูลเข้าแบบ incremental
def branching(**kwargs):
# ฟังก์ชันนี้ให้ค่าตัดสินใจเพื่อระบุทางที่จะไปตามการวิเคราะห์ข้อมูล
run_id = kwargs["dag_run"].run_id
is_manual = run_id.startswith("manual__")
is_scheduled = run_id.startswith("scheduled__")
if is_manual: return "process_triggered_ingest"
elif is_scheduled: return "process_scheduled_ingest"
branch_task = BranchPythonOperator(
task_id='branching',
python_callable=branching
)
*** ปล. หมายเหตุตัวโตๆ — จริงๆแล้วนอกเหนือจาก BranchPythonOperator
ก็ยังมี Branching Operator ตัวอื่นๆในสังกัดที่น่าสนใจที่แนะนำให้ศึกษาเพิ่ม เช่น
- BranchSQLOperator: เลือกตัดสินใจด้วยการเขียน SQL และให้ผลลัทธ์ เป็น
true
หรือfalse
- BranchDayOfWeekOperator: เลือกตัดสินใจตามวันที่
Dag Run
โดยการระบุค่าweek_day
parameter. - BranchDateTimeOperator: เลือกตัดสินใจตามเวลา ณ ปัจจุบันที่ DAG ทำงาน โดยมีค่าระหว่างเวลา สองช่วงคือ
target_lower
และtarget_upper
3. Short Circuit
“Short Circuit” หรือที่แปลว่า “ลัดวงจร” ในภาษาไทย คือการที่เราสามารถมีเงื่อนไขในการตัดสินใจของ pipeline ว่าจะลัดขั้นตอนบาง tasks หรือไม่ โดยเราจะใช้ operator ที่ชื่อว่าShortCircuitOperator
เพื่อควบคุมว่า pipeline ทำงานต่อหรือไม่ตามเงื่อนไขที่ DE เขียนใน python_callable
ฟังก์ชัน โดยถ้า function ที่ตรวจสอบเงื่อนไขในการตัดสินใจได้ค่าเป็น true
pipeline ทำงานต่อจนจบ แต่ถ้าค่าเป็น false
ก็จะถูก short-circuited หรือทำให้ลัดวงจรตามที่กำหนดไว้
from airflow.operators.python_operator import ShortCircuitOperator
from airflow.operators.empty import EmptyOperator
from airflow.operators.python_operator import PythonOperator
from datetime import datetime
from airflow import DAG
with DAG(
dag_id='example_short_circuit',
start_date=datetime(2024, 1, 21),
schedule="@daily"
) as dag:
start = EmptyOperator(task_id='start')
end = EmptyOperator(task_id='end', trigger_rule="all_done")
task1 = EmptyOperator(task_id='task1')
task1_follow = EmptyOperator(task_id='task1_follow')
task2 = EmptyOperator(task_id='task2')
task2_follow = EmptyOperator(task_id='task2_follow')
task2_print = PythonOperator(
task_id="task2_print",
python_callable=lambda: print("task2_print"),
trigger_rule='always'
)
short_circuit_task1 = ShortCircuitOperator(
task_id='short_circuit_task1',
python_callable=lambda: True,
)
short_circuit_task2 = ShortCircuitOperator(
task_id='short_circuit_task2',
python_callable=lambda: False,
ignore_downstream_trigger_rules=False
)
start >> short_circuit_task1 >> task1 >> task1_follow >> end
start >> short_circuit_task2 >> task2 >> task2_follow >> task2_print >> end
ShortCircuitOperator
ถูกใช้เพื่อทำการตรวจสอบเงื่อนไขที่กำหนดไว้และหากเงื่อนไขถูกต้องคืนค่า True
และทำงานตาม task ที่กำหนด แต่ถ้าไม่ตรงเงื่อนไขจะคืนค่า False
และลัดวงจร หรือข้ามงานทั้งหมด ซึ่งจากตัวอย่าง python ด้านบนจะเป็นสอง flows ที่รัน parallel กัน โดยเส้นแรกจะเป็นตัวอย่างที่เงื่อนไขของ short circuit คืนค่าเป็น True
ทำให้ pipeline ทำงานสำเร็จทั้ง flow ส่วนเส้นที่สองจะเป็นตัวอย่างค่าเป็น False
โดยค่าปกติเมื่อ python_callable
ได้ค่าเป็น False
จะลัดวงจรและข้ามการทำงานของ downstream ทั้งหมด แต่จาก python code ด้านบนมีการระบุว่าให้ใช้ trigger_rule
ของ downstram มาควบคุมการทำงานของ pipeline ด้วยคำสั่ง ignore_downstream_trigger=False
จึงลัดวงจรมาทำ task2_print
ที่มีค่า trigger_rule='all_done'
ต่อ ซึ่งจะเห็นว่าเราสามารถใช้ทั้ง trigger_rule
และShortCircuitOperator
ร่วมกันมาออกแบบ pipeline ได้ตามโจทย์ที่ต้องการ!
ShortCircuitOperator แตกต่างกันตามการควบคุม Trigger Rule เพื่อลัดทั้งวงจร หรือ ข้ามแค่บางส่วน
เราสามารถใช้ ShortCircuitOperator
ไปประยุกต์ในการออกแบบ pipeline ที่ซับซ้อนมากขึ้นได้เช่น ตัวอย่าง use case ที่ผู้เขียนขอนำเสนอ ดังต่อไปนี้
- ตรวจสอบการอัปเดตข้อมูลก่อน upsert — เช่นเราอาจจะไปประยุกต์ในกรณีที่ต้องการ check ข้อมูล ทำ data reconciliation / data validation ว่าข้อมูลใน ingest เป็นข้อมูลล่าสุดไหม ถูกต้องตาม schema ที่กำหนดไหมก่อนที่จะ update หรือ insert ข้อมูลเข้า downstream
# ตรวจสอบการอัปเดตข้อมูลก่อน upsert
data_reconciliation_check_task = ShortCircuitOperator(
task_id='data_reconciliation_check',
python_callable=data_reconciliation_check,
provide_context=True,
)
2. ตรวจสอบค่าตามความต้องการของ Business Domain — เช่นตรวจสอบว่าค่าที่ได้รับจาก task ปัจจุบัน หรือก่อนหน้า ตรงกับค่ามาตรฐานของธุรกิจที่สอดคล้องกับ data model ที่ pipeline ส่งออกไปยัง serving หรือไม่
# ตรวจสอบค่าตามความต้องการของ Business Domain
check_business_logic_task = ShortCircuitOperator(
task_id='check_business_logic',
python_callable=retail_logic_check,
provide_context=True,
)
3. ตรวจสอบสถานะระบบ external system — เช่น API provider, Database ว่ามีความพร้อมในการให้บริการไหม ถ้าระบบที่เป็น upstream ขัดข้อง จะต้องทำการลัดวงจร และมี task ที่ทำการแจ้งเตือนไปยังเจ้าของ system นั้นต้นทางให้แก้ไข
# ตรวจสอบสถานะระบบ external system
system_status_check_task = ShortCircuitOperator(
task_id='external_system_status_check_task',
python_callable=check_system_status,
provide_context=True,
)
4. ส่งท้าย :)
ในบทความนี้เสนอการสร้าง pipeline ที่มีความยืดหยุ่นผ่านการข้ามงาน หรือแยก branch ของ task flow ซึ่งผู้เขียนเองก็มีโอกาสได้นำเทคนิคเหล่านี้ไปใช้จริงในการสร้าง data pipeline ของตนเองเช่นกัน :) หวังว่าบทความนี้จะมีประโยชน์กับ เพื่อนๆพี่ๆน้องๆ DE ทุกท่านที่กำลังศึกษา หรืออาจจะทำงานบน Apache Airflow อยู่แล้วและสามารถนำไปต่อยอดหรือประยุกต์กับ pipeline ของทุกคนได้ค่ะ
🔥 หากบทความนี้มีประโยชน์อย่าลืม กดปรบมือ กดแชร์ กดติดตาม ให้ด้วยนะคะ 🤩ผู้เขียนตั้งใจว่าจะปล่อยบทความเกี่ยวกับพื้นฐานงาน DE อีกในปีนี้ 🔥🔥
ทุกกดปรบมือคือกำลังใจหลักเลยค่ะ :) ขอให้ทุกคนสนุกกับงาน data pipeline 🤩
Reference
- https://docs.astronomer.io/learn/airflow-branch-operator
- https://airflow.apache.org/docs/apache-airflow/stable/howto/operator/python.html#howto-operator-shortcircuitoperator
- https://registry.astronomer.io/providers/apache-airflow/versions/latest/modules/shortcircuitoperator
- https://towardsdatascience.com/airflow-skip-task-a5a6ab319378
- https://marclamberti.com/blog/airflow-trigger-rules-all-you-need-to-know/