Analytics Vidhya
Published in

Analytics Vidhya

Airflow XCom pull and push under the hood: Multiple values, from different DAGs and etc

Some time ago I was asked by one of my colleges on not difficult question. But looks like, scope of this question not highlighted enough in official documentation.

Question was: “How can I get Xcom that pushed by task of another DAG?”

If you are here, I think, you know that is Xcom in Apache Airflow. If not — read official doc firstly https://airflow.apache.org/concepts.html?highlight=xcom#xcoms

Xcom in Apache Airflow UI

First, let’s understand that is Xcom as Python’s language essence: how it defined, how it finds values relative by default only to your DAG Run and etc.

Xcom as a Python Class

Xcom is data model that defined as a SQL-alchemy class with additional methods on it. If you will enter Apache Airflow sources and will take a look on it: https://github.com/apache/airflow/blob/v1-10-stable/airflow/models/xcom.py#L41 you will see, that I’m not lying to you :)

Now, step away from this code and return to how you as developer do Xcom pull and push? You usually use same-named methods in Jinja templates in operators like ‘{{ ti.xcom_push(…) }}’ or you use in in Python function (callable object) with PythonOperator or smth relative with context[‘ti’].xcom_pull(key=None, task_ids=’push_values’). And those both ways use the same methods under the hood: task’s instances methods pull and push.

Where is also no magic. Let’s go to Task Instance class.

Task Instances methods

xcom_push

Here we are: https://github.com/apache/airflow/blob/v1-10-stable/airflow/models/taskinstance.py#L1485

This is a method xcom_push of task instance, that you are using to send some values in Xcom table in Apache Airflow Backend Database. As you can see in this copy-pasted from sources part of code:

XCom.set(
key=key,
value=value,
task_id=self.task_id,
dag_id=self.dag_id,
execution_date=execution_date or self.execution_date)

It just call method ‘set’ of Xcom object with params. You can take a look on this method by your self (link on Xcom’s sources was upper), this is not matter a lot right now.

Matter that execution_date always exist as afield of you xcom-record and this is import. Second thing — you can pass different from execution date manually, so, yes, you can provide execution_date as an argument to xcom_push

context['ti'].xcom_push(key='some_key',  value='some_value'     execution_date=) # xcom_push with sending different execution date

xcom_pull

Now, xcom_pull time. Let’s go to the sources:

https://github.com/apache/airflow/blob/v1-10-stable/airflow/models/taskinstance.py#L1517

as you see, this code always send execution_date as param to the xcom_pull, so this is how you get Xcoms only from current DAGRun — because under the hood it sends two params — execution_date and dag_id. So this is how you get values only relative to your DAGRun. Xcom pull always by default search for records with 1-to-1 execution date and dag_id.

Split question “How can I get Xcom that pulled by task of another DAG?”

So, to answer this question we want to have answer on 2:

  1. How I can get Xcoms with execution_date, that different from current run
  2. How I can get Xcoms from different DAG (different dag_id)

and then we mix answers on those both questions.

Get Xcoms for previous runs

Now take a look on xcom_pull arguments:

https://github.com/apache/airflow/blob/v1-10-stable/airflow/models/taskinstance.py#L1524

You can see, that we can pass to Xcom Pull several arguments:

dag_id: Optional[str] = None   # answer on our question #2 include_prior_dates: bool = False  # answer on our question number #1

with dag_id arg all clear it takes 1 string — dag_id that pushed Xcom, that you want to obtain.

include_prior_dates is a flag. If it’s a ‘True’ Airflow will search records with the same execution date, but also with earliest. So if you have execution_date 2019–12–03 23:00:00 and set include_prior_dates=True, it can find xcom record, that was pushed previous, for example if our DAG runs hourly, at previous run in 2019–12–03 22:00:00.

When it can be useful? If you have a task, that at the start of your DAGRun take Xcom from previous run, for example, to get left border for data, that you want to process.

By default, tasks instance method xcom_pull return to you latest value that match the request. But sometime you want get more when one value, so, you need get_many method from Xcom class. Yes. You can use Xcom class methods directly, just with providing necessary args, for example:

from airflow.models import XComXCom.get_many(
execution_date=make_aware(datetime(2019, 12, 3, 0, 51, 00, 00)),
dag_ids=["write_to_xcom"], include_prior_dates=True)

You can find in Xcom class also methods, that maybe can be useful to you.

Be aware of ‘key’ Xcom argument

Pay attention to this description in docstring of Xcom pull method. By default Xcom search records with key = ‘return_value’, so if you want get record with all keys or just remove key-filter you must set it to key=None ! Don’t forget about it.

The default key is 'return_value', also
available as a constant XCOM_RETURN_KEY. This key is automatically
given to XComs returned by tasks (as opposed to being pushed
manually). To remove the filter, pass key=None.

Code samples

I prepared a code samples of usage in DAG, you can get them here (one dag write, second read, sample how to use Xcom.get_many): https://github.com/xnuinside/airflow_examples/blob/master/xcom_diff_dag_and_multiply/xcom_dag.py

read and write xcom in different DAGs

Click ‘claps’ if it was helpful (this is really just one way to understand was in useful or not and maybe stop to write such articles) and leave a comments if you have that to add.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store