Airflow Tricks — Xcom and SubDAG
I have been a data engineer for 2 years and have been maintaining most of data pipelines with Airflow. It is a very intuitive interface and simple to organize data pipelines. In this post, I would like to share a few tricks that I have been utilizing in my Airflow DAGs.
Trick #1 — Xcom
Xcom, an abbreviation of “cross-communication”, lets you pass messages between tasks. Although tasks are suggested to be atomic and not to share resources between them, sometimes exchanging messages between them become useful. Xcom has two side; pusher and puller. The pusher sends a message and the puller receives the message.
Example (BashOperator
for both pusher and puller):
# Get modified files in a folder since the last execution time
pusher = BashOperator(
task_id='get_files',
bash_command='files=$(find \"$(cd .; pwd)\" -newermt {{ prev_ds }} ! -newermt {{ ds }}); echo $files',
xcom_push=True, # This will send stdout to Xcom.
dag=dag)# Pull Xcom from a task, "get_files"
puller = BashOperator(
task_id='process_files',
bash_command='python process.py --files \"{{ task_instance.xcom_pull(task_ids='get_files') }}\"',
dag=dag)
Trick #2 — SubDAG
SubDAG is a pluggable module DAG that can be inserted into a parent DAG. This is useful when you have repeating processes within a DAG or among other DAGs.
Example:
- Create a SubDAG (subdag.py). SubDAG should be a function that returns a DAG object. The SubDAG’s dag_id must be formatted with
{parent dag id}.{SubDagOperator's task id}
.
def dag_preprocess(dag_id, schedule_interval, start_date, table_name):
dag = DAG(
dag_id=dag_id,
schedule_interval=schedule_interval,
start_date=start_date) preprocess_files = BashOperator(
task_id='preprocess_files',
bash_command='python preprocess_files.py',
dag=dag) preprocess_db_data = BashOperator(
task_id='preprocess_db',
bash_command='python preprocess_db.py --table {{ params. table}}',
params={
'table': table_name
},
dag=dag) return dag
2. Create a parent DAG.
from airflow.operators.subdag_operator import SubDagOperator
from subdug import dag_preprocessDAG_ID = 'parent_dag'
default_args = {
'owner': 'emmasuzuki',
'depends_on_past': False,
'start_date': datetime(2019, 11, 1)
}dag = DAG(DAG_ID, default_args=default_args, schedule_interval='@daily')SUBDAG_TASK_ID = 'preprocess'
preprocess = SubDagOperator(
task_id=SUBDAG_TASK_ID,
subdag=dag_preprocess('%s.%s' % (DAG_ID, SUBDAG_TASK_ID),
dag.schedule_interval,
default_args['start_date'],
'mytable'),
dag=dag)postprocess = BashOperator(
task_id='postprocess',
bash_command='bash postprocess.sh',
dag=dag)preprocess >> postprocess
This parent_dag
would look like this in Airflow UI.
If you click on preprocess
task, you will see an additional menu, “Zoom into Sub DAG” on popup.
Clicking on the button would reveal SubDAG contents. The UI and menus should like as same as your regular DAG.
One note about SubDag is that by default it uses SequentialExecutor; that means all processes will be executed in sequence regardless of an absence of a task dependency.
it is possible to specify an executor for the SubDAG. It is common to use the SequentialExecutor if you want to run the SubDAG in-process and effectively limit its parallelism to one. Using LocalExecutor can be problematic as it may over-subscribe your worker, running multiple tasks in a single slot
ref: https://airflow.apache.org/docs/stable/concepts.html?highlight=subdag
It is safe to go with their default, however, if your SubDAG is very simple and this pitfall does not apply, you can specify an executor as
from airflow.executors.local_executor import LocalExecutor...preprocess = SubDagOperator(
task_id=SUBDAG_TASK_ID,
subdag=dag_preprocess('%s.%s' % (DAG_ID, SUBDAG_TASK_ID),
dag.schedule_interval,
default_args['start_date'],
'mytable'),
executor=LocalExecutor(),
dag=dag)
SubDAG is useful for simplifying DAGs by extracting any common processes and increase reusability of the processes.
These are very short list of Airflow tricks but Airflow doesn’t need tricks. Most of Airflow features are straight forward and work quite well on my data needs. I am pleased with the low cost of pipeline monitoring.