Airflow — XCOM chats
Hello Hello!!!
This is the third aritcle of my Airflow Series. in first two I covered Dynamic DAG generation and Custom Plugin writting. Although at the beginning I had no plan to write one about XCOM also, but then again I know how much helpful XCOM variable can be if one can use it in proper ways. Hence this article, hope it will be a worthwhile read.
As we already know (again, assuming reader has basic knowledge of Airflow), Airflow tasks are idempotent, i.e. tasks can’t directly communicate between each other. And that can be annoying at times.
let’s take an simple example, suppose our Airflow DAG is expected to load an file from AWS S3 bucket to DynamoDB collection. But if the requirement is such, where the source bucket and path is fixed, but the file name is variable based on date. Then we can add a file-availability sensor on the path for any file and then load the same file via a data-load operator task. But by default we will not be able to pass the file name to the data-load operator, which is detected by the file-availability sensor. Then we might need to scan for the file name again in the source bucket/path. A better solution? — XCOM variables.
XCOM variables are meant to hold simple and small sized meta-data, the idea of passing actual data via XCOM variable is frowned upon and thoroughly discouraged.
Airflow provides two simple APIs — xcom_push
and xcom_pull
viatask_instance
object. One can access the the task_instance object from the context supplied to the task via airflow, as show below:
task_instance = context[‘task_instance’]
We will study the XCOM communication between s3_sensor
and data_processor
tasks of the Dynamic DAG generated via https://bitbucket.org/saumalya75/airflowoncontainer/src/master/airflow_home/dags/configurable_data_pipeline_demo.py file. Here I am just adding the task codes:
Push: XCOM basically stores the data in key-value pairs, so whenever you try to push (store) any data in XCOM, you have to provide one key. Airflow automatically pushes the key under the current task key in XCOM.
Checkout the global method_trigger_file_to_xcom.
I am using this method to push several values like bucket_name
, file_name
to XCOM. As you can see, I am using task_instance.xcom_push
method to do so, and this method is taking to keyword arguments: key
and value.
Pull: It is a little bit trickier to pull (extract) the data from XCOM. While extracting the data from XCOM, you have to pass the task_id of the task from where the data was actually pushed, along with the key.
The logic for pulling data from XCOM is available is the _extract_from_xcom
static method of S3SensorFromXcom
class. Notice, we are providing one positional argument -task_id
and one positional argument -key
. The task_id is the id of the sensor task. Notice in data_processor
task takes xcom_source_task_id
as input and the task id of s3_sensor
task.
As we just saw XCOM variables can be really handy if we can integrate properly with our application. I always encourage to use XCOM to pass small meta-data or flag values.
Credit where it’s due:
Although I published the article, it was a joint venture with rohith karnati. He is a python enthusiast, DevOps ninja and Airflow developer. Kudos to his efforts.
All related code is available in this BitBucket repository. Clone it, Fork it, Use it.
Quick note: I have discussed about dynamic DAG creation and custom plugins creation in other articles, which are here and there.
P.S.: Any suggestions will be highly appreciated. I am available at saumalya75@gmail.com
and linkedin.com/in/saumalya-sarkar-b3712817b .