Airflow Tricks — Xcom and SubDAG

Emma Suzuki
Analytics Vidhya
Published in
3 min readDec 6, 2019

--

I have been a data engineer for 2 years and have been maintaining most of data pipelines with Airflow. It is a very intuitive interface and simple to organize data pipelines. In this post, I would like to share a few tricks that I have been utilizing in my Airflow DAGs.

Trick #1 — Xcom

Xcom, an abbreviation of “cross-communication”, lets you pass messages between tasks. Although tasks are suggested to be atomic and not to share resources between them, sometimes exchanging messages between them become useful. Xcom has two side; pusher and puller. The pusher sends a message and the puller receives the message.

Example (BashOperator for both pusher and puller):

# Get modified files in a folder since the last execution time
pusher = BashOperator(
task_id='get_files',
bash_command='files=$(find \"$(cd .; pwd)\" -newermt {{ prev_ds }} ! -newermt {{ ds }}); echo $files',
xcom_push=True, # This will send stdout to Xcom.
dag=dag)
# Pull Xcom from a task, "get_files"
puller = BashOperator(
task_id='process_files',
bash_command='python process.py --files \"{{ task_instance.xcom_pull(task_ids='get_files') }}\"',
dag=dag)

Trick #2 — SubDAG

SubDAG is a pluggable module DAG that can be inserted into a parent DAG. This is useful when you have repeating processes within a DAG or among other DAGs.

Example:

  1. Create a SubDAG (subdag.py). SubDAG should be a function that returns a DAG object. The SubDAG’s dag_id must be formatted with {parent dag id}.{SubDagOperator's task id}.
def dag_preprocess(dag_id, schedule_interval, start_date, table_name):
dag = DAG(
dag_id=dag_id,
schedule_interval=schedule_interval,
start_date=start_date)
preprocess_files = BashOperator(
task_id='preprocess_files',
bash_command='python preprocess_files.py',
dag=dag)
preprocess_db_data = BashOperator(
task_id='preprocess_db',
bash_command='python preprocess_db.py --table {{ params. table}}',
params={
'table': table_name
},
dag=dag)
return dag

2. Create a parent DAG.

from airflow.operators.subdag_operator import SubDagOperator
from subdug import dag_preprocess
DAG_ID = 'parent_dag'
default_args = {
'owner': 'emmasuzuki',
'depends_on_past': False,
'start_date': datetime(2019, 11, 1)
}
dag = DAG(DAG_ID, default_args=default_args, schedule_interval='@daily')SUBDAG_TASK_ID = 'preprocess'
preprocess = SubDagOperator(
task_id=SUBDAG_TASK_ID,
subdag=dag_preprocess('%s.%s' % (DAG_ID, SUBDAG_TASK_ID),
dag.schedule_interval,
default_args['start_date'],
'mytable'),
dag=dag)
postprocess = BashOperator(
task_id='postprocess',
bash_command='bash postprocess.sh',
dag=dag)
preprocess >> postprocess

This parent_dag would look like this in Airflow UI.

Tasks in parent DAG

If you click on preprocess task, you will see an additional menu, “Zoom into Sub DAG” on popup.

SubDAGOperator click menu

Clicking on the button would reveal SubDAG contents. The UI and menus should like as same as your regular DAG.

Tasks in SubDAG

One note about SubDag is that by default it uses SequentialExecutor; that means all processes will be executed in sequence regardless of an absence of a task dependency.

it is possible to specify an executor for the SubDAG. It is common to use the SequentialExecutor if you want to run the SubDAG in-process and effectively limit its parallelism to one. Using LocalExecutor can be problematic as it may over-subscribe your worker, running multiple tasks in a single slot

ref: https://airflow.apache.org/docs/stable/concepts.html?highlight=subdag

It is safe to go with their default, however, if your SubDAG is very simple and this pitfall does not apply, you can specify an executor as

from airflow.executors.local_executor import LocalExecutor...preprocess = SubDagOperator(
task_id=SUBDAG_TASK_ID,
subdag=dag_preprocess('%s.%s' % (DAG_ID, SUBDAG_TASK_ID),
dag.schedule_interval,
default_args['start_date'],
'mytable'),
executor=LocalExecutor(),
dag=dag)

SubDAG is useful for simplifying DAGs by extracting any common processes and increase reusability of the processes.

These are very short list of Airflow tricks but Airflow doesn’t need tricks. Most of Airflow features are straight forward and work quite well on my data needs. I am pleased with the low cost of pipeline monitoring.

--

--

Emma Suzuki
Analytics Vidhya

A bit of many technology, full of curiosity. Mobile, Web, Data Engineer.