一般直覺的 python全域變數使用
# 會印出 5
def test():
global a
a = 5
test()
print(a)
但在 Airflow中執行類似的 Code會出錯
# 會告知 a並未被賦值
def test():
global a
a = 5def output():
print(f"a={a}")with models.DAG(
...
) as dag:
declare = PythonOperator(
task_id="decalre",
python_callable=test,
)
print_a = PythonOperator(
task_id="print_a",
python_callable=output,
)
那我們賦值總可以了吧!
# 會印出 a=1
# 換句話說就是 test這個 function被執行後並沒辦法改變 a的值a = 1
def ㄏ():
global a
a = 5def output():
print(f"a={a}")with models.DAG(
...
) as dag:
declare = PythonOperator(
task_id="decalre",
python_callable=test,
)
print_a = PythonOperator(
task_id="print_a",
python_callable=output,
)
到這裡可能有些人已經抓狂,沒事,我們有 Xcom
# 最後會印出 test:test_variable
def xcom_set(**context):
context['ti'].xcom_push(key='test', value="test_variable")def xcom_get(**context):
test = context['ti'].xcom_pull(key='test')
print(f"test:{test}")with models.DAG(
...
) as dag:
variable_setting = PythonOperator(
task_id="variable_setting",
python_callable=xcom_set,
)
variable_getting = PythonOperator(
task_id="variable_getting",
python_callable=xcom_set,
)
那如果是要跨不同的 file傳遞參數呢? 由於 Xcom內容是每個 file獨立的,因此不適用。
我們需要將變數內容推到 Variable之中,Variable需要在 Airflow的環境建構時就宣告,你會在 variable.json之中找到類似以下的內容。
額外宣告一個你要拿來儲存的變數,需要注意的是,這裡宣告的變數只能是 STRING格式。