Airflow XComs คืออะไร

Patcharachart Ji
3 min readJan 28, 2023

--

Airflow เป็น Open Source Platform ที่ช่วยจัดการ Workflow ด้วยการสร้าง Data Pipeline สามารถเขียนโปรแกรมภาษา python เพื่อควบคุม และจัดการกับข้อมูลมหาศาลได้ โดย Airflow มีการเขียน Workflow เป็น DAG (Directed Acyclic Graph) ซึ่ง DAG ประกอบไปด้วยหลายๆ Task ที่เชื่อมต่อกันและในแต่ละ Task ก็จะมีกระบวนการทำงานที่ต่างกันไปแล้วสงสัยกันไหมครับว่า Task แต่ละส่วนสื่อสารกันได้ยังไง 🤔 วันนี้ผมจะมาอธิบายข้อสงสัยนี้ครับ

Airflow XComs

Airflow XComs (cross-communications) เป็นเครื่องมือที่ช่วยแบ่งข้อมูลกันระหว่าง DAG หรือระหว่าง Task ก็ได้โดยจัดเก็บข้อมูลไว้รวมกันที่ศูนย์กลางที่เรียกว่า XComs ทุก Task สามารถเข้าถึง XComs ได้เหมือนทำให้ตัวแปร Local ย้ายไปอยู่ที่ Global🌍 สามารถส่งข้อมูลถึงกันได้โดยไม่จำเป็นต้องเป็น Task ที่อยู่ติดกัน

Figure 1: XComs workflow example

XComs จะเป็นตัวกลางในการแลกเปลี่ยนข้อมูลระหว่าง Task โดยมีตัวแปรอ้างอิง 4 ตัวคือ

  1. dag_id ชื่อของ DAG ใช้อ้างอิงในกรณีที่ต้องการดึงข้อมูลจาก DAG อื่น
  2. task_idsชื่อของ Task ที่ต้องการดึงข้อมูล
  3. keyชื่อตัวแปรที่ต้องการดึงข้อมูล
  4. valueค่าที่เก็บอยู่ในตัวแปร

วันนี้เราจะมาอธิบายการสื่อสารกันระหว่าง Task ใน DAG เดียวกันตัวอย่างต่อไปนี้จึงไม่มี dag_id ใน code นะครับ

ฟังก์ชั่นที่ใช้

xcom_push เป็นฟังก์ชั่นที่ใช้ส่งค่าไปยัง XComs

xcom_pull เป็นฟังก์ชั่นที่ใช้ดึงค่าจาก XComs มาใช้

ต่อไปจะเป็นตัวอย่าง code ครับมาเริ่มกันเลย 🚀

วิธีรับและส่งข้อมูลผ่าน XComs

การส่งข้อมูลไปเก็บไว้ที่ XComs มี 2 วิธีคือ

1. Return Value ออกมาจากฟังก์ชั่นวิธีนี้ก็เหมือนกันส่งค่าออกมาจากฟังก์ชันที่เราคุ้นเคยกันอยู่แล้วโดย Airflow จะนำค่าที่ Return ออกมาไปเก็บไว้ที่ XComs โดยส่งค่า key ชื่อว่า return_value ให้อัตโนมัติ

@task
def push_by_returning(ti=None):
value_1 = {"a": "b"}
return value_1

2. ใช้ฟังก์ชั่น xcom_push โดยต้องมีพารามิเตอร์สองตัวคือ key และ value ในการส่งข้อมูล

@task
def push(ti=None):
value_2 = [1, 2, 3]
ti.xcom_push(
key="push_key",
value=value_2
)

เมื่อ Run 2 Task นี้ก็จะมีข้อมูลส่งไปที่ XComs ซึ่งสามารถเข้าไปดูได้ที่แถบเมนูใน UI ของ Airflow เลือก Admin ➜ XComs

Figure 4: Show the XComs UI

หรือเข้าไปที่ UI ของแต่ละ Task จะมี XCom อยู่ซึ่งจะแสดงแค่เฉพาะตัวแปรที่ Task นั่นๆส่งไปที่ XComs

Figure 5: Show the XCom UI

ตัวอย่างข้อมูลที่ถูกส่งขึ้นมาใน XComs

Figure 6: Show data in XComs

แล้วเราจะดึงข้อมูลจาก XComs มาใช้ได้ยังไง ?

หลังจาก push ข้อมูลไปยัง XComs เราสามารถดึงข้อมูลได้ด้วยฟังก์ชั่น xcom_pull โดยต้องมีพารามิเตอร์หนึ่งตัวที่จำเป็นต้องใส่คือ task_ids

ส่วนตัวแปร keyนั่นถ้าไม่กำหนดค่าจะดึงค่าจากตัวแปรที่ชื่อว่า return_value ออกมาโดย default

เดียวเราจะลองมาดูตัวอย่างการ pull ข้อมูลกันนะครับ

ตัวอย่างแรก pulled_value_1 เราจะ pull จาก Task push_by_returning โดยไม่ใส่ key จะได้ผลลัพธ์เป็น {‘a’: ‘b’} เพราะ Task push_by_returning ใช้ฟังก์ชั่น return เมื่อไม่ใส่ key, xcom_pull จะเรียกตัวแปร return_value ออกมา

ตัวอย่างที่สอง pulled_value_2 เราจะ pull จาก Task push โดยใส่ key เป็น push_key เพื่อดึงข้อมูลจากตัวแปร push_key ใน XComs ออกมาจะได้ผลลัพธ์เป็น [1, 2, 3]

ตัวอย่างที่สาม pulled_value_3 คราวนี้เราลองไม่ใส่ key ในการดึงข้อมูลจาก Task push บ้างข้อมูลจะออกมาเป็น [1, 2, 3] เหมือน pulled_value_2 ไหมก็ดึงข้อมูลจาก Task เดียวกันนี่หน่า ผลลัพธ์ที่ได้คือ None เพราะว่าเมื่อเราไม่ใส่ key ฟังก์ชั่น xcom_pull จะเรียกตัวแปร return_value ออกมาแต่เนื่องจาก Task push ของเราไม่ได้ใช้ฟังก์ชั่น return จึงไม่มีตัวแปรที่ชื่อว่า return_value นั่นเองครับ

ตัวอย่าง code

@task
def pull_data_from_xcom(ti=None):
pulled_value_1 = ti.xcom_pull(
task_ids="push_by_returning"
)

pulled_value_2 = ti.xcom_pull(
task_ids="push", key="push_key"
)

pulled_value_3 = ti.xcom_pull(
task_ids="push",
)
print(f"pulled_value_1 : {pulled_value_1}")
print(f"pulled_value_2 : {pulled_value_2}")
print(f"pulled_value_3 : {pulled_value_3}")

ซึ่งจะได้ผลลัพธ์จากการรันตามรูปนี้

[2023-01-28, 16:48:48 UTC] {logging_mixin.py:137} INFO - pulled_value_1 : {'a': 'b'}
[2023-01-28, 16:48:48 UTC] {logging_mixin.py:137} INFO - pulled_value_2 : [1, 2, 3]
[2023-01-28, 16:48:48 UTC] {logging_mixin.py:137} INFO - pulled_value_3 : None

ดูตัวอย่าง code ทั้งหมดได้ที่ 👇🏼

github: https://github.com/Infanna/airflow-task-xcom-example

ข้อจำกัดของ XComs

XComs สามารถเก็บข้อมูลได้หลายประเภทเช่น ข้อความ, ตัวเลข ขนาดเล็กได้แต่ไม่ควรส่งข้อมูลขนาดใหญ่เช่น dataframes เพราะอาจทำให้หน่วยความจำเต็มโดยข้อจำกัดของหน่วยความจำขึ้นอยู่กับฐานข้อมูลที่เลือกใช้

Figure 6: data base example

Reference

https://airflow.apache.org/docs/apache-airflow/stable/concepts/xcoms.html

--

--