Photo by Deva Darshan on Unsplash

Airflow DAG — Best Practices

Amit Singh Rathore
The Startup
Published in
5 min readFeb 23, 2021

--

In the blog post, we will see some best practices for authoring DAGs. Let’s start.

DAG as configuration file

The Airflow scheduler scans and compiles DAG files at each heartbeat. If DAG files are heavy and a lot of top-level codes are present in them, the scheduler will consume a lot of resources and time to process them at each heartbeat. So it is advised to keep the DAGs light, more like a configuration file. As a step forward it will be a good choice to have a YAML/JSON-based definition of workflow and then generate the DAG, based on that. This has a double advantage. 1. DAGs, since are getting generated programmatically will be consistent and reproducible anytime. 2. Non-python users will also be able to use it.

We can separate non-configuration-related code blocks outside the DAG definition and use the template_searchpath attribute to add those. For example, if you are trying to connect to an RDS and execute some SQL command, that SQL command should be loaded from a file. And the location of the file should be mentioned in the template_searchpath. Similarly with Hive queries(.hql).

Note: While trimming the top-level code, also convert top-level import to local imports inside the python callable.

Invest in Airflow plugin system

Custom Hooks and Operators are great ways in making your pipelines easier to maintain, easier to debug, and easier to create. So it is good to have a proper plugin repo and maintain it to author custom plugins needed as per the organization’s requirement. While creating a plugin, be generic so that it is reusable across use cases. This helps in versioning as well as it helps in keeping workflows clean and mostly configuration details as opposed to implementation logic. Also, don’t perform heavy work/operation while initializing the class, push operations (like getting variable & connections) inside the execution method.

Do not perform data processing in DAG files.

Since DAGs are python-based, we will definitely be tempted to use pandas or similar stuff in DAG, but we should not. Airflow is an orchestrator, not an execution framework. All computation should be delegated to a specific target system. Follow the fire and track approach. Use the operator to start the task and the sensor to track the completion. Airflow is not designed for long-running tasks.

emr_add_step = EmrAddStepsOperator(
task_id='demo_step',
job_flow_id='<EMR_CLUSTER_ID>',
aws_conn_id='aws_default',
steps=<SPARK_STEPS>
)
emr_step_status_checker = EmrStepSensor(
task_id='check_step',
job_flow_id='<EMR_CLUSTER_ID>',
step_id="{{ task_instance.xcom_pull(task_ids='demo_step', key='return_value')[0] }}",
aws_conn_id='aws_default'
)

Delegate API or DB calls to operators

This is somewhat similar to the first point. API call or DB connection made at top-level code in DAG files overloads the scheduler & webserver. These call defined outside of the operator is called on every heartbeat. So it is advisable to have these pushed down to a util/common (can be a python operator) operator.

Give a thought on the external systems

While creating tasks do remember that when tasks interact with external systems. When you are doing backfilling or running concurrent multiple DAG runs, you should consider the DAG and Task concurrency & their combined effective concurrency to match the capacity that external system can handle.

Make DAGs/Tasks idempotent

DAG should produce the same data on every run. Read from a partition and write to a partition. Both should be immutable. Handle partition creation and deletion to avoid unknown errors.

Use single variable per DAG

Every time we access DAG variables it creates a connection to metadata DB. It may overload the Db if we are having multiple DAGs running with multiple variables being called. It's better to use a single variable per DAG with a JSON object. This will create a single connection. We can parse the JSON to get the desired key-value pair.

# Combining SSM keys in one
dag_specific_variable = '{
"variable_1" : "value_1",
"variable_2" : "value_2",
"variable_3" : "value_3"
}'
# Single call to get all three varaibles
dag_specific_params = Variable.get("dag_specific_variable", deserialize_json=True)
# Using these in DAG
{{ var.json.dag_specific_params.variable_1 }}

Tag the DAG

Having Tags in DAG helps in filtering and grouping DAGs. Make it consistent with your infrastructure’s current tagging system. Like tag based on BU, Project, App Category, etc.

Don’t Abuse XCom

XCom acts as a channel between tasks for sharing data. It uses backend DB to do so. Hence we should not pass a huge amount of data using this, as with a bigger amount of data the backend DB will get overloaded.

Use intermediate storage between tasks.

If the data, to be shared between two tasks, is huge store it in an intermediate storage system. And pass the reference of it to the downstream task. We can even leverage an xcom_custom_backend. Remember Airflow is not a data storage solution.

Note: When using KubernetesPodOperator use @task.kubernetes since it encapsulates the xcom logic, else we need to use script in the docker file to get the templated task xcom (env_vars = { "INPUT_DATA" : '''{{ti.xcom_pull(task_ids="extract_data",key="return_value")}}'''}) and write results in JSON for the sidecar container to read and populate db.

Limit the use of PythonOperator

Limit the use of PythonOperator, prefer built-in operators. If Operator is not available then use KubernetesPodOperator. This will ensure that your operations are performed in a consistent and isolated environment, which can improve the performance and scalability of your pipeline.

Use the power of Jinja templating

Many of the operators support template_fields. This tuple object defines which fields will get jinjaified.

class PythonOperator(BaseOperator):
template_fields = ('templates_dict', 'op_args', 'op_kwargs')

While writing your custom operator overrides this template_fields attribute.

class CustomBashOperator(BaseOperator):
template_fields = ('file_name', 'command', 'dest_host')

The above example is the fields ‘file_name’, ‘command’, ‘dest_host’ will be available for jinja templating.

You can access variables as well with the template.

{{ var.value.key_name }}

Params can also be templatized like below:

# Define param
params={
"param_key_1": "param_value_1",
"param_key_2": "param_value_2"
}
# access param using template
{{ params.param_key_1 }}

Implement DAG Level Access control

Leverage Flask App Builder views to have DAG level access control. Set the DAG owner to correct Linux user. Create a custom role to decide who can take DAG/Task actions.

Use static start_date

Static DAG start date helps in, correctly populating DAG runs and schedule.

Rename DAGs in case of structural change

Till the time the DAG versioning feature is implemented, in case of any structural change in DAG rename the DAG on changes. This will create a new DAG and all DAG history of the previous run for the old DAG version will be there without any inconsistency.

Use AsyncOperators in newer versions

Operators consume worker slots during the run of the triggered task. If the task is long-running then airflow will run out of slots although airflow itself is not doing much work. To avoid this scenario we should use the Async Operators. These operators do not consume worker slots for long.

Some other best practices:

  • Set retries at the DAG level
  • Use consistent file structure
  • Choose a consistent method for task dependencies
  • Have notification strategy on failure

Keep an eye on the upcoming enhancements to airflow:

  • Functional DAG
  • DAG Serialization
  • Scheduler HA
  • Production grade REST APIs
  • Smart Sensors
  • Task Groups

Have fun with DAGs.

--

--

Amit Singh Rathore
The Startup

Staff Data Engineer @ Visa — Writes about Cloud | Big Data | ML