[Apache Airflow Fundamentals] ทำความเข้าใจ Scheduler & Trigger

Punsiri Boonyakiat
CJ Express Tech (TILDI)
9 min readJan 26, 2024

บทความนี้จะมาเล่าเรื่องพื้นฐานที่ Data Engineer หลายคนที่ทำงานอยู่บน open-source ที่ใช้ในนการจัดการ data pipeline อย่าง Apache Airflow อยู่น่าจะคุ้ยเคยกันดีอยู่แล้ว นั้นก็คือเรื่องของ Scheduler และ​ Trigger นั่นเอง เรียกว่า Basic สุดๆ แต่เนื่องจากพื้นฐานความรู้ที่แข็งแรงสำคัญมาก บทความนี้จึงขอมาทบทวนกับทุกคนค่ะ โดยมีหัวข้อดังนี้

# Table of Contents
1. Airflow Scheduler
2. Types of Airflow Triggers
2.1 Methods to Trigger Airflow DAGs
2.2 Manual Trigger
2.3 External Trigger - TriggerDagRunOperator
3. Airflow Trigger Rules
4.ส่งท้าย

1. Airflow Scheduler

Scheduler หรือตัวตั้งเวลานั้นมีความสำคัญมากในงาน Data Pipeline โดยใน Apache Airflow โดย Scheduler จะทำหน้าที่ตรวจสอบความสัมพันธ์ของ tasks และ DAGs (dependency) ที่กำหนดในระบบ และตรวจสอบ schedule_interval หรือช่วงเวลาที่ถูกตั้งของแต่ละ workflow นั้นว่ามีความถี่อย่างไร โดยเมื่อถึงช่วงเวลาในการทำงาน ตัวตั้งเวลาจะเริ่มสร้าง instance ของ Dag Run ที่มีการทำงานของ DAG และทำการ trigger ให้ task instance ทำงานตามช่วงเวลาที่กำหนด เพื่อให้ทำงานตาม workflow ที่กำหนดไว้

Basic Airflow Architecture

จากรูปด้านบนคือโครงสร้างพื้นฐานของ Apache Airflow โดยจะเห็นว่า Data Engineer นั้นจะมีหน้าที่ในการสร้าง DAGs หรือ workflow ในรูปของโค้ด Python เพื่อที่กำหนดขั้นตอนในการทำงานของ workflow และเข้ามาใช้งานผ่าน User Interface ในการบริหารจัดการ และ monitor การทำงานของ workflows โดย Airflow Architecture จะมีองค์ประกอบพื้นฐานดังนี้

  1. Metadata Database: Airflow ใช้ฐานข้อมูล RDBMS เช่น Postgres หรือ MySQL ในเก็บข้อมูล metadata สำคัญที่เกี่ยวกับ data pipeline ที่ทำให้ระบบ Airflow ทำงานได้ เช่น วันที่เริ่มทำงาน (Execution dates), ความสำพันธ์ของขั้นตอนการทำงาน (Task dependencies), การตั้งค่าต่างๆ (Configuration settings) รวมถึง สถานะของ pipeline และผลลัพธ์ของการทำงาน เช่น กำลังทำงาน, เสร็จสิ้น, หรือล้มเหลว (Task status and logs )
  2. Web Server: เว็บเซิร์ฟเวอร์ของ Airflow ทำให้ Data Engineer สามารถบริหาจัดการ รวมถึงตรวจสอบการทำงานของแต่ละ pipelines ได้ผ่าน UI ที่ออกแบบมาให้ใช้งานได้ง่าย
  3. Scheduler: ตัวตั้งเวลาทำหน้าที่ตรวจสอบและสั่งการในการเริ่มทำงานของ pipeline ตามระยะเวลาที่ระบุเมื่อ DAG ถูกสร้าง โดยจะมีการ sync กับ metadata DB เพื่อให้ได้ข้อมูลที่ถูกต้องในการเริ่มให้ task ทำงาน
  4. Executor: Executor ทำหน้าที่รับคำสั่งต่อจากตัวตั้งเวลา หรือ scheduler เพื่อบอกว่า Task ไหนจะต้องทำงาน และจะทำการส่ง Task Instances นั้นๆไปยัง Workers เพื่อที่จะทำงานจริง โดยหากมี concurrence task ที่ต้องทำงานพร้อมกัน Excecuter ก็จะกระจายงานที่ถูกกำหนดใน DAGs ไปยัง workers เพื่อที่จะทำงานไปพร้อมๆกันได้

หลังจากที่ Task ถูก Schedule หรือส่งไปทำงานนั้นสิ่งที่เกิดขึ้น คือจะมี stage ของแต่ละ task เกิดขึ้นตามขั้นตอนและผลลัพธ์ของการ execute เช่น กำลังรัน ทำงานสำเร็จ หรือ ทำงานผิดพลาด ตามรูปด้านล่าง

Example Task Stage in Apache Airflow

จากรูปด้านบนเป็นการบอกถึงวงจรชีวิตของ task งานใน Apache Airflow ซึ่งเป็น happy task flow ว่าเมื่อ DAG เริ่มทำงานจะมี status ของ task ต่างๆเกิดขึ้นดังนี้

  1. No status — เมื่อ scheduler ทำการสร้าง task instance
  2. Scheduled — เมื่อถึงเวลาที่ task ต้องทำงาน และตัวตั้งเวลา
  3. Queued — เมื่อ scheduler ส่ง task ไปยัง executor เพื่อต่อคิวให้ worker รัน
  4. Running — เมื่อ worker เริ่มทำงานใน task นั้นๆ
  5. Success — เมื่อ task ทำงานสำเร็จ
  6. Failed — เมื่อ task ทำงานไม่สำเร็จ

สรุป Airflow Scheduler ทำงานยังไง?

จากภาพ life cycle และ architecture ด้านบนทำให้เราสามารถเชื่อมโยงเห็นถึง stage การทำงานของ Scheduler ชัดเจน คือ scheduler จะดำเนินการตรวจสอบ DAGs ทั้งหมดในโฟลเดอร์ และ sync ข้อมูลลง metadata database ซึ่งจะเก็บข้อมูล schuedule_interval, start_date, end_date รวมถึงข้อมูลอื่นๆ หลัง และเมื่อถึงเวลาทำงานก็จะสร้าง Dag Run สำหรับ Task ที่ต้องทำการ execute ตอนนี้ก็จะมี status เป็น scheduled ซึ่งหลังจากนั้นจะถูกส่งต่อให้ executors ไปตรวจสอบความพร้อมของทรัพยากรในการทำงาน ตอนนี้จะเป็นสถานะเป็น queued หลังจาก worker ดึงงานออกจากคิวไปประมวลผล จะมีสถานะเป็น running และเมื่องานเสร็จก็จะเป็นสถานะ success/failed

จากที่ผ่านมาจะเห็นว่ามีคำศัทพ์ที่เกี่ยวข้องในการทำงานของ Scheduler ดังนี้

  1. DAG Run: Object ที่แทนการทำงานของ DAG
  2. Schedule_interval: การกำหนดค่าความถี่ในการทำงานซึ่ง อยู่รูปของ cron expression หรือ cron presets เช่น @hourly, @weekly, @monthly คือค่าตั้งต้นที่ Airflow กำหนดไว้เพื่อช่วยให้เราตั้ง schedule ได้ง่ายขึ้น แต่หากเราต้องการเวลาเป๊ะๆ เช่นรันทุก จันทร์ พุธ ศุกร เวลา 05:00 ก็สามารถ ใช้ cron trab expression ได้เช่น 0 5 * * 1,3,5 ถ้าใครยังมือใหม่สามารถไปลงกด random ในเว็ป cron guru เพื่อศึกษาได้ค่ะ
  • '@hourly' เท่ากับ '0 * * * *': รันทุกชั่วโมงตอนนาทีที่ 00
  • '@daily' เท่ากับ '0 0 * * *': รันทุกวันตอนช่วงเที่ยงคืนของแต่ละวัน
  • '@weekly' เท่ากับ '0 0 * * 0': รันสัปดาห์ในช่วงเที่ยงคืนของวันอาทิตย์
  • '@monthly' เท่ากับ '0 0 1 * *': รันทุกเดือน ณ เวลาเที่ยงคืนตรงของวันที่ 1
ตัวอย่าง cron expression จาก https://crontab.guru/

3. Data Interval: เป็นช่วงที่ Dag Runเริ่มทำงานจริง เช่นถ้า pipeline ถูกตั้งเวลาให้ทำงานทุกชั่วโมง @hourly Data Interval จะเริ่มต้นที่จุดเริ่มต้นของนาที (นาที 0) และสิ้นสุดที่สุดของชั่วโมง (นาที 59) เพื่อให้สามารถนำค่า data interval ไปคำนวณในการ ETL upsert ตรวจสอบ ข้อมูลในช่วงที่ประมวลผลได้ถูกต้อง

  • data_interval_start = วันที่เริ่มต้นของ Data Interval = วันที่ทำงาน
  • data_interval_end = วันที่สิ้นสุดของ Data Interval ที่ Dag Run รอบนั้นๆ

4. Catchup: อีกค่าเริ่มต้น Airflow Scheduler ที่สำคัญคือ catchup (ค่า defualt เป็น FALSE) ซึ่งหากเราตั้งค่าเป็น TRUEจะช่วยตรวจสอบ start_dateหรือวันเริ่มต้นของ DAG จนถึงปัจจุบัน และทำการสร้าง Dag Run มา exexute workflow ให้ครบทุกรอบในการทำงานจนถึงวันปัจจุบันตาม ช่วง หรือ interval ที่เรากำหนดไว้ใน schedule_interval ด้วย cron expression

2. Type of Airflow Trigger

จากหัวข้อแรกถ้าพูดถึงการทำงานของ Scheduler แล้วจะไม่พูดถึงการ Trigger ไม่ได้เลย ในการทำงานของ Airflow เราสามารถสร้าง DAG ขึ้นมาเป็น flow การทำงานแต่งานที่สัมพันธ์กัน จากที่เล่าด้านบน การทำงานของ DAG ถูก trigger ให้สร้าง DAG Runขึ้นอยู่กับตารางเวลาที่ DE ระบุใน python ผ่านค่าschedule_interval เมื่อถึงช่วงที่ DAG ต้องทำงาน scheduler ก็จะ trigger ให้ pipeline รันในช่วงที่กำหนด ยกตัวอย่างจาก DAG ด้านล่างก็จะถูก scheduler มา trigger ให้รันทั้ง 2 tasks ตามลำดับทุกทุกวันตอนช่วงเที่ยงคืนของแต่ละวัน

from airflow import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.decorators import task
from datetime import datetime

dag = DAG(
'example_manual_trigger_dag_with_params',
schedule_interval='@daily',
start_date=datetime(2024, 1, 1),
tags=["example"],
params={
"param1": "Hello!",
"param2": "Medium!",
},
)

task1 = DummyOperator(task_id='task1', dag=dag)

@task
def task2(**context):
print(context["params"]["param1"], context["params"]["param2"])

task1 >> task2()

อย่างไรก็ดีนอกเหนือจากการ trigger ตามกลไกลของ scheduler เรายังสามารถสั่งให้ DAGs ทำงานได้หลายๆรูปแบบ ไม่ว่าจะเป็น manual trigger หรือ external trigger ซึ่ง part นี้จะมาขยายความในส่วนนี้กัน

2.1 Methods to Trigger Airflow DAGs

จากการศึกษาการทำงาน Airflow ของผู้เขียนเราสามารถที่จะ trigger หรือสั่งให้ DAGs ทำงานได้ 3 รูปแบบดังนี้

  1. Trigger on a Schedule: เมื่อเราสร้าง DAG เรียบร้อยแล้วสามารถให้ ถูก trigger อัตโนมัติ ตามเวลาที่เรากำหนดใน python code ผ่านการทำงานของ scheduler
  2. Manual Trigger : เราสามารถที่จะสั่งให้ DAG ทำงานผ่านการกด manual triger ที่หน้า UI หรือเขียน code ไม่ว่าจะผ่าน API หรือ CLI
  3. External Trigger: เราสามารถที่จะสร้าง DAGs ที่มีความสัมพันธ์กัน 2 DAGs และเมื่อ parent pipeline/ controller pipeline ทำงานเสร็จให้ trigger อีก DAG ให้ทำงานต่อเนื่องกันไปผ่าน TriggerDagRunOperator

2.2 Manual Trigger

อ้างอิงจาก python code ด้านบน เราสามารถทำการ trigger DAGs ให้รัน manual ได้หลายวิธี เช่น ผ่าน UI, CLI, API ในหัวข้อนี้จะเป็นตัวอย่างในการ manual trigger ของ Airflow DAG ในวิธีต่างๆ

ตัวอย่างการ Trigger DAG ผ่าน UI:

โดยวิธีแรกคือการสั่ง trigger ผ่าน User Interface ของ Airflow ซึ่งเราสามารถกดที่เครื่องหมาย ▶ ️สามเหลี่ยมมุมขวาของ DAG นั้นๆ โดยจะมีให้เลือก 2 แบบ คือ Trigger DAG และ Tigger DAG w/ config โดยหากเรื่องแบบ Trigger เฉยๆก็จะสร้าง Dag Run และทำงานของ pipeline อ้างอิงจากเวลาปัจจุบัน จากรูปด้านล่างจะเห็นว่าลักษณะของ Dag Run ที่ถูก trigger manual จะมีความแตกต่างกับการถูก scheduled และจะมี Run type และ Prefix ของ Dag Run ID แตกต่างกันด้วย โดย Dag ที่ถูก​ manual trigger จะมี Run type = manual และ​​ Prefix ของ​ ID เป็น manual_XXX ตามด้วย runing data_interval_start

ตัวอย่างการ Trigger DAG ผ่าน UI
ตัวอย่างการ Trigger DAG w/ config ผ่าน UI

จากด้านบนเป็นตัวอย่างการ Trigger ด้วย config ซึ่งข้อดีคือ เราสามารถปรับ logical date หรือ start_interval_date ที่ DAG Run จะอ้างอิงไปประมวลผลในการทำงานของ pipeline ได้ รวมถึงหากเราเขียน Parse DAG​ parameter เราสามารถเขียนแก้ทับ หรือ Overide ได้ ทำให้ Dag Run ที่เรา trigger นั้นรันตามค่า config ที่เราอยากได้ ซึ่งสำหรับรายละเอียดเรื่อง params สามารถตามอ่านได้ที่นี่

ตัวอย่างการ Trigger DAG ผ่าน CLI:

ในการใช้งาน Apache Airflow การใช้งานผ่าน Command Line Interface นั้นสามารถทำได้ครอบคลุม รวมถึงการ manual trigger เช่นกันซึ่งเราสามารถรัน command ดังต่อไปนี้ได้ airflow dag trigger -c 'PARAM_DICT' <DAG_ID>

airflow dags trigger -c '{"param1":"Hello!!", "param2":"Kitty"}' example_manual_trigger_dag_with_params

ตัวอย่างการ Trigger DAG ผ่าน API:

อีกหนึ่งช่องทางในการบริหารจัดการ data pipeline ใน Apache Airflow ที่ทำให้สะดวกมากขึ้นคือ Airflow REST API ซึ่งเรียกว่า API services ของ Airflow นั้นสามารถทำได้ตั้งแต่สากกระเบือยันเรือรบ ไม่ว่าจะ update, delete DAG, จัดการ Variables, Xcom และอีกมากมาย รวมถึงการ Trigger DAG Run ใหม่ ซึ่งข้อดีคือทำให้เราสามารถบริหารจัดการ pipelines จาก external system ได้อย่างง่ายดาย

ตัวอย่าง API Swagger ของ Airflow REST API
ตัวอย่างการยิงทดสอบ Trigger DAG ผ่านแอป Postman
curl --location 'http://localhost:8080/api/v1/dags/example_manual_trigger_dag_with_params/dagRuns' \
--header 'Content-Type: application/json' \
--header 'Authorization: Basic YOUR_AIRFLOW_AUTH' \
--data '{
"dag_run_id": "example_manual_trigger_dag_with_params",
"logical_date": "2024-01-24T14:15:22Z",
"execution_date": "2024-01-24T14:15:22Z",
"conf": {}
}'

จากตัวอย่างด้านบนเป็นการทดสอบยิง Trigger DAG ผ่าน REST API ด้วยเครื่องมือในการทดสอบ API ที่ชื่อว่า Postman ซึ่งในการตั้งค่าสำหรับเริ่มใช่งาน REST API สามารถดูต่อได้ที่นี่

2.3 External Trigger — TriggerDagRunOperator

นอกจากการ manual trigger ในส่วนนี้ขอพูดถึง External Trigger ที่ออกแบบโดย Apache Airflow ซึ่งจะเป็นการทำงานที่สอดคล้องกันของ 2 pipeline โดย TriggerDagRunOperator จะเป็น task ที่สามารถทำให้การทำงานใน pipeline หลักสามารถออกคำสั่งไป trigger ให้อีก DAG ทำงานได้

โดยในตัวอย่างนี้จะมีการสร้าง DAGs 2 pipline ดังนี้

  1. DAG ID: example_trigger_controller_dag: pipline ตั้งต้นที่จะมีหนึ่งใน task ประกอบไปด้วย TriggerDagRunOperatorเพื่อใช้ในการสั้งการให้ pipeline เป้าหมายทำงานต่อเนื่องกัน
  2. DAG ID: example_trigger_target_dag: pipeline เป้าหมายที่จะทำงานหลังจากที่ ได้รับการ trigger มาจาก controller pipeline

ตัวอย่างการใช้ TriggerDagRunOperator:

import pendulum
from airflow.models.dag import DAG
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.trigger_dagrun import TriggerDagRunOperator

with DAG(
dag_id="example_trigger_controller_dag",
start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
catchup=False,
schedule="@hourly",
tags=["example"],
) as dag:

# ระบุชื่อ DAG ที่ต้องการ Trigger
dag_id_to_trigger = "example_trigger_target_dag"

task1 = DummyOperator(task_id='task1', dag=dag)
task2 = DummyOperator(task_id='task2', dag=dag)
task3 = DummyOperator(task_id='task3', dag=dag)

trigger = TriggerDagRunOperator( # Operator ที่ใช่ในการ trigger pipeline อื่น
task_id="test_trigger_dagrun",
trigger_dag_id=dag_id_to_trigger, # ID ของ DAG เป้าหมายที่ต้องการจะ Trigger
conf={"message": "Hello World"}, # เราสามารถส่ง paramter ข้าม pipeline ไปรันใน pipeline ที่ถูก trigger
)

task1 >> task2 >> task3 >> trigger
import pendulum
from airflow.models.dag import DAG
from airflow.operators.bash import BashOperator


with DAG(
dag_id="example_trigger_target_dag",
start_date=pendulum.datetime(2024, 1, 1, tz="UTC"),
catchup=False, # ตั้งค่า catchup เป็น False ไม่ต้องสร้าง Dag Run ย้อนหลังนับตั้งแต่ start_date จนถึงวันปัจจุบัน
schedule=None, # ตั้งค่า schedule_interval เป็น None เพื่อปิดการทำงานตามตารางเวลา
tags=["example"],
) as dag:

# Pipeline ปลายทางที่ถูก trigger สามารถรับค่า parameter ที่ส่งจาก parent pipeline มาทำงาต่อได้
bash_task = BashOperator(
task_id="bash_task",
bash_command='echo "Here is the message: $message"',
env={"message": '{{ dag_run.conf.get("message") }}'},
)
DAG Dependencies

จากด้านบนเป็น Flow ที่มาจากหน้า DAG Dependencies ซึ่งทำให้เห็นว่าเมื่อ pipeline แรกรัน tasks ครบแล้วเราสามารถเพิ่ม TriggerDagRunOperatorเพื่อให้ไปสั่งการให้ pipeline ที่สองทำงานต่อเนื่องกันได้เลย!

3. Airflow Trigger Rule

Apache Airflow มีตัวแปรที่ชื่อว่า trigger_rule ซึ่ง DE สามารถไปปรับแก้ให้การ trigger ของ task-level ของ pipeline ทำงานได้ตามโจทย์ความต้องการในการออกแบบ workflow ของ DAGs โดย trigger_ruleสามารถใช้กำหนดเงื่อนไขว่าเมื่อไหร่ที่ Task นึงจะถูกเรียกให้ทำงาน โดยปัจจุบันมี trigger_rule ตาม Airflow document หลายตัว เราขอเลือกเอา rule ที่ถูกหยิบมาใช้บ่อยมาอธิบายดังนี้

  1. all_success: Task ที่ถูกกำหนด all_success จะถูกเรียกให้ทำงานเมื่อ task ทุกตัวที่เป็น dependency ทั้งหมด ถูกเรียกให้ทำงานสำเร็จ โดยปกติหากเราไม่ define trigger_rule ค่าพื้นฐานจะเป็น all_success คือ task ก่อนหน้าต้องทำงานสำเร็จ task ถัดไปจึงจะทำงาน
from airflow import DAG
from airflow.decorators import task
from datetime import datetime
from airflow.operators.dummy_operator import DummyOperator


dag = DAG(
'test_trigger_rule_all_success',
schedule_interval='@daily',
start_date=datetime(2024, 1, 25),
)

task1 = DummyOperator(task_id='task1', dag=dag)
task3 = DummyOperator(task_id='task3', dag=dag, trigger_rule='all_success')
@task
def task2(**context):
print("task2")
assert 1==2, "Expect task to fail"

task1 >> task2() >> task3
trigger_rule=’all_success’

2. all_failed: Task ที่ถูกกำหนด all_failed จะถูกเรียกให้ทำงานเมื่อ Task ทุกตัวที่เป็น dependency ทั้งหมดทำงานล้มเหลว

from airflow import DAG
from datetime import datetime
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

dag = DAG(
'test_trigger_rule_all_failed',
schedule_interval='@daily',
start_date=datetime(2024, 1, 25),
)

def expect_to_fail():
raise Exception("This line fails.")

task1 = PythonOperator(
task_id='task1',
python_callable=expect_to_fail,
dag=dag,
)
task2 = DummyOperator(task_id='task2', dag=dag)
task3 = PythonOperator(
task_id='task3',
python_callable=lambda: print("Hello from lambda function!"),
dag=dag,
trigger_rule="all_failed"
)

task1 >> task2 >> task3
trigger_rule=’all_failed’

3. all_done: Task ที่ถูกกำหนด all_done จะถูกเรียกให้ทำงานเมื่อ Task ทุกตัวที่เป็น dependency ทั้งหมดทำงานเสร็จสิ้น ไม่ว่าจะเป็นสำเร็จหรือล้มเหลว

from airflow import DAG
from datetime import datetime
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

dag = DAG(
'test_trigger_rule_all_done',
schedule_interval='@daily',
start_date=datetime(2024, 1, 25),
)

def expect_to_fail():
raise Exception("This line fails.")

task1 = DummyOperator(task_id='task1', dag=dag)
task2 = PythonOperator(
task_id='task2',
python_callable=expect_to_fail,
dag=dag,
)
task3 = PythonOperator(
task_id='task3',
python_callable=lambda: print("test all_done_trigger_rule!"),
dag=dag,
trigger_rule="all_done"
)

task1 >> task2 >> task3
trigger_rule=”all_done”

4. one_success: Task ที่ถูกกำหนด one_success จะถูกเรียกให้ทำงานเมื่อ Task ที่เป็น dependency ของมันมีอย่างน้อยหนึ่งตัวที่ทำงานเสร็จสิ้น จากตัวอย่างด้านล่างจะเห็นว่า task ที่เป็น upstream ของ task3 ทั้งหมดไม่มีตัวไหนทำงานสำเร็จเลย task3 จึงได้ status upstream_failed

from airflow import DAG
from datetime import datetime
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

dag = DAG(
'test_trigger_rule_one_success',
schedule_interval='@daily',
start_date=datetime(2024, 1, 25),
)

def expect_to_fail():
raise Exception("This line fails.")


task1 = PythonOperator(
task_id='task1',
python_callable=expect_to_fail,
dag=dag,
)
task2 = DummyOperator(task_id='task2', dag=dag)
task3 = PythonOperator(
task_id='task3',
python_callable=lambda: print("test one_success!"),
dag=dag,
trigger_rule="one_success"
)

task1 >> task2 >> task3
trigger_rule=”one_success”

5. one_failed: Task ที่ถูกกำหนด one_failed จะถูกเรียกให้ทำงานเมื่อ Task ที่เป็น dependency ของมันมีอย่างน้อยหนึ่งตัวที่ทำงานล้มเหลว จากด้านล่างจะเห็นว่า task3 สามารถทำงานได้สำเร็จเนื่องจากมี task1 ที่ทำงานล้มเหลว 1 task

from airflow import DAG
from datetime import datetime
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

dag = DAG(
'test_trigger_rule_one_failed',
schedule_interval='@daily',
start_date=datetime(2024, 1, 25),
)

def expect_to_fail():
raise Exception("This line fails.")


task1 = PythonOperator(
task_id='task1',
python_callable=expect_to_fail,
dag=dag,
)
task2 = DummyOperator(task_id='task2', dag=dag)
task3 = PythonOperator(
task_id='task3',
python_callable=lambda: print("test one_failed!"),
dag=dag,
trigger_rule="one_failed"
)

task1 >> task2 >> task3
trigger_rule=”one_failed”

6. none_failed: Task ที่ถูกกำหนด none_failed จะถูกเรียกให้ทำงานเมื่อ Task ทุกตัวที่เป็น dependency ทั้งหมดทำงานเสร็จสิ้น, ไม่มีตัวไหนทำงานล้มเหลว จากรูปด้านล่างจะเห็นว่า task3 สามารถทำงานได้สำเร็จเนื่องจาก upstrams ไม่มี task ไหนล้มเหลวเลย

from airflow import DAG
from datetime import datetime
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator

dag = DAG(
'test_trigger_rule_none_failed',
schedule_interval='@daily',
start_date=datetime(2024, 1, 25),
)

task1 = DummyOperator(task_id='task1', dag=dag)
task2 = DummyOperator(task_id='task2', dag=dag)
task3 = PythonOperator(
task_id='task3',
python_callable=lambda: print("test one_failed!"),
dag=dag,
trigger_rule="none_failed"
)

task1 >> task2 >> task3
trigger_rule=”none_failed”

7. none_skipped: Task ที่ถูกกำหนด none_skipped จะถูกเรียกให้ทำงานเมื่อ Task ที่เป็น upstream ไม่ถูก skipped จากตัวอย่างด้านล่างจะเห็นว่า task3 ถูก skipped ไปด้วยตาม upstream

from airflow import DAG
from datetime import datetime
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.exceptions import AirflowSkipException

dag = DAG(
'test_trigger_rule_none_skipped',
schedule_interval='@daily',
start_date=datetime(2024, 1, 25),
)

def expect_to_skip():
raise AirflowSkipException


task1 = DummyOperator(task_id='task1', dag=dag)
task2 = PythonOperator(
task_id='task2',
python_callable=expect_to_skip,
dag=dag,
)
task3 = PythonOperator(
task_id='task3',
python_callable=lambda: print("test none_failed_or_skipped!"),
dag=dag,
trigger_rule="none_skipped"
)

task1 >> task2 >> task3

8. always: Task ที่ถูกกำหนดalways จะถูกเรียกทุกครั้งไม่ว่า status ของ dependency จะเป็นอย่างไร จากตัวอย่างด้านล่างจะเห็นว่า task4 นั้นทำงานต่อแม้ว่า upstream จะมีทั้งที่ skipped และ failed

from airflow import DAG
from datetime import datetime
from airflow.operators.dummy_operator import DummyOperator
from airflow.operators.python_operator import PythonOperator
from airflow.exceptions import AirflowSkipException

dag = DAG(
'test_trigger_rule_always',
schedule_interval='@daily',
start_date=datetime(2024, 1, 25),
)

def expect_to_skip():
raise AirflowSkipException


task1 = DummyOperator(task_id='task1', dag=dag)
task2 = PythonOperator(
task_id='task2',
python_callable=expect_to_skip,
dag=dag,
)
def expect_to_fail():
raise Exception("This line fails.")

task3 = PythonOperator(
task_id='task3',
python_callable=expect_to_fail,
dag=dag,
trigger_rule="all_done"
)
task4 = PythonOperator(
task_id='task4',
python_callable=lambda: print("test always!"),
dag=dag,
trigger_rule="always"
)

task1 >> task2 >> task3 >> task4

4 . ส่งท้าย 🤩

บทความนี้ผู้เขียนตั้งใจรอบรวมเนื้อหาในการสร้าง pipline บน Apache Ariflow 🎈 ทำความเข้า scheduler + trigger ที่เป็นพื้นฐานสำคัญ ผู้เขียนหวังว่าจะมีประโยชน์กับพี่ๆ น้องๆที่ทำงาน บน Apache Airflow ไม่มากก็น้อย หากชอบอย่าลืมปรบมือ + แชร์ต่อเป็นกำลังใจให้ด้วยนะค่ะ ในปีนี้ผู้มีความตั้งใจที่จะเขียนบทความเกี่ยวกับความรู้ในงาน DE ให้มากขึ้นค่ะ 😍

Reference

--

--

Punsiri Boonyakiat
CJ Express Tech (TILDI)

Senior Data Engineer | Google Cloud Certified Professional Data Engineer| Traveller | A Mom | Learner