Airflow: 使用 Xcom / Variable傳遞參數

許博淳
數據共筆
Published in
Jun 19, 2022

Airflow 雖然是以 Python撰寫,在傳遞參數上卻不像 Python的 function那麼簡單,甚至全域變數和區域變數的使用也十分詭異,因此以下做一些我的統整,同時感謝網路上許多先進。

懶人包

同一個 DAG內傳遞變數:Xcom

跨 DAG傳遞變數:Variable

一般直覺的 python全域變數使用

# 會印出 5
def test():
global a
a = 5
test()
print(a)

但在 Airflow中執行類似的 Code會出錯

# 會告知 a並未被賦值
def test():
global a
a = 5
def output():
print(f"a={a}")
with models.DAG(
...
) as dag:
declare = PythonOperator(
task_id="decalre",
python_callable=test,
)
print_a = PythonOperator(
task_id="print_a",
python_callable=output,
)

那我們賦值總可以了吧!

# 會印出 a=1
# 換句話說就是 test這個 function被執行後並沒辦法改變 a的值
a = 1
def ㄏ():
global a
a = 5
def output():
print(f"a={a}")
with models.DAG(
...
) as dag:
declare = PythonOperator(
task_id="decalre",
python_callable=test,
)
print_a = PythonOperator(
task_id="print_a",
python_callable=output,
)

到這裡可能有些人已經抓狂,沒事,我們有 Xcom

# 最後會印出 test:test_variable
def xcom_set(**context):
context['ti'].xcom_push(key='test', value="test_variable")
def xcom_get(**context):
test = context['ti'].xcom_pull(key='test')
print(f"test:{test}")
with models.DAG(
...
) as dag:
variable_setting = PythonOperator(
task_id="variable_setting",
python_callable=xcom_set,
)
variable_getting = PythonOperator(
task_id="variable_getting",
python_callable=xcom_set,
)

那如果是要跨不同的 file傳遞參數呢? 由於 Xcom內容是每個 file獨立的,因此不適用。

我們需要將變數內容推到 Variable之中,Variable需要在 Airflow的環境建構時就宣告,你會在 variable.json之中找到類似以下的內容。

額外宣告一個你要拿來儲存的變數,需要注意的是,這裡宣告的變數只能是 STRING格式。

--

--