The Startup
Published in

The Startup

Photo by Deva Darshan on Unsplash

Airflow DAG — Best Practices

DAG as configuration file

Invest in Airflow plugin system

Do not perform data processing in DAG files.

emr_add_step = EmrAddStepsOperator(
emr_step_status_checker = EmrStepSensor(
step_id="{{ task_instance.xcom_pull(task_ids='demo_step', key='return_value')[0] }}",

Delegate API or DB calls to operators

Make DAGs/Tasks idempotent

Use single variable per DAG

# Combining SSM keys in one
dag_specific_variable = '{
"variable_1" : "value_1",
"variable_2" : "value_2",
"variable_3" : "value_3"
# Single call to get all three varaibles
dag_specific_params = Variable.get("dag_specific_variable", deserialize_json=True)
# Using these in DAG
{{ var.json.dag_specific_params.variable_1 }}

Tag the DAG

Don’t Abuse XCom

Use intermediate storage between tasks.

Limit the use of PythonOperator

Use the power of Jinja templating

class PythonOperator(BaseOperator):
template_fields = ('templates_dict', 'op_args', 'op_kwargs')
class CustomBashOperator(BaseOperator):
template_fields = ('file_name', 'command', 'dest_host')
{{ var.value.key_name }}
# Define param
"param_key_1": "param_value_1",
"param_key_2": "param_value_2"
# access param using template
{{ params.param_key_1 }}

Implement DAG Level Access control

Use static start_date

Rename DAGs in case of structural change

Use AsyncOperators in newer versions

  • Set retries at the DAG level
  • Use consistent file structure
  • Choose a consistent method for task dependencies
  • Have notification strategy on failure
  • Functional DAG
  • DAG Serialization
  • Scheduler HA
  • Production grade REST APIs
  • Smart Sensors
  • Task Groups



Get smarter at building your thing. Follow to join The Startup’s +8 million monthly readers & +760K followers.

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store