Unveiling the Power of Named Mapping in Airflow: A Sneak Peak into the Latest Feature
Apache Airflow released 2.9.0 is released with some exciting features and one I was looking for was “Named Mapping” or “Custom Names for Task Mapping”.
I am currently working on project where I need to process workflows for multiple counties in the world. So I used dynamic task mapping to generate dynamic task for each country. But dynamic task mapping assigns integer number for each instance and when I try to visualize mapped tasks I need to click every individual tasks to identify which country task it is processing.
As shown above we don't have direct way to identify country. This get worst when we have 100s of dynamic tasks in DAG run.
Solution:
Here, Named Mapping came to rescue. This latest feature form Airflow allows us to provide human readable/custom names to mapped tasks which will be directly visible on UI. In our case it will be country name.
This is done by providing a Jinja template for the task with
map_index_template
. This template will get rendered when task is executed with task context .
BashOperator.partial(dag=named_mapping_workflow_dag,
task_id="print_country_name_with_named_mapping",
bash_command='echo "Processing country: {{' 'task.env.country_name}}"',
map_index_template="{{task.env.country_name}}"
).expand(env=get_counties.output)
How:
Lets take same example and use named mapping:
import datetime as dt
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python import PythonOperator
# named-mapping workflow dag
with DAG(
dag_id="named-mapping-example",
schedule=None,
start_date=dt.datetime(year=2022, month=10, day=27),
end_date=None,
tags=["learning", "examples", "named-mapping"]
) as named_mapping_workflow_dag:
def get_countries():
return [{"country_name": "IND"}, {"country_name": "USA"}, {"country_name": "CAN"}, {"country_name": "GBR"},
{"country_name": "FRA"}, {"country_name": "ITA"}, {"country_name": "DEU"}, {"country_name": "AUS"},
{"country_name": "BRA"}, {"country_name": "MEX"}]
get_counties = PythonOperator(dag=named_mapping_workflow_dag, task_id="get_countries",
python_callable=get_countries, do_xcom_push=True)
print_country_name = BashOperator.partial(dag=named_mapping_workflow_dag,
task_id="print_country_name",
bash_command='echo "Processing country: {{task.env.country_name}}"'
).expand(env=get_counties.output)
print_country_name_with_named_mapping = BashOperator.partial(dag=named_mapping_workflow_dag,
task_id="print_country_name_with_named_mapping",
bash_command='echo "Processing country: {{'
'task.env.country_name}}"',
map_index_template="{{task.env.country_name}}"
).expand(env=get_counties.output)
get_counties >> [print_country_name, print_country_name_with_named_mapping]]
Lets decode the above code:
- get_countries: Python operator task to provide input country list.
- print_country_name: BashOperator task print country name without named mapping feature.
- print_country_name_with_named_mapping: BashOperator task print country name using named mapping feature.
Here for every country provided by “get_countries” a task instance is created for both bash operators and we will see how both shows mapped tasks while running.
“print_country_name” task craetes dynamic mapping without named mapping so we can see integers are assigned for each tasks, which make hard to guess which task is processsing which country
“print_country_name_with_named_mapping” created dynamic task mapping with processing country names. This is how it looks when we used named mapping feature.
Now this looks more clear and we can easily identify individual task with correct country name.
You can find complete source code : https://github.com/harshalpagar/airflow-examples/blob/master/dags/named_mapping.py
Feel free to post your queries or suggestions.
Thank for reading and Happy Learning 👌 !!