Photo by Deva Darshan on Unsplash

Airflow DAG — Best Practices

In the blog post, we will see some best practices for authoring DAGs. Let’s start.

DAG as configuration file

Airflow scheduler scans and compiles DAG files at each heartbeat. If DAG files are heavy and a lot of top-level codes are present in them, the scheduler will consume a lot of resources and time to process them at each heartbeat. So it is advised to keep the DAGs light, more like a configuration file. As a step forward it will be a good choice to have a YAML/JSON-based definition of workflow and then generating the DAG, based on that. This has a double advantage. 1. DAGs, since are getting generated programmatically will be consistent and reproducible anytime. 2. Non-python users will also be able to use it.

We can separate non-configuration-related code blocks outside the DAG definition and use the template_searchpath attribute to add those. For example, if you are trying to connect to an RDS and execute some SQL command, that SQL command should be loaded from a file. And the location of the file should be mentioned in the template_searchpath. Similarly with Hive queries(.hql).

Invest in Airflow plugin system

Custom Hooks and Operators are great ways in making your pipelines easier to maintain, easier to debug, and easier to create. So it is good to have a proper plugin repo and maintain it to author custom plugins needed as per the organization’s requirement. While creating a plugin, be generic so that it is reusable across use cases. This helps in versioning as well as it helps in keeping workflows clean and mostly configuration details as opposed to implementation logic. Also, don’t perform heavy work/operation while initializing the class, push operations (like getting variable & connections) inside the execution method.

Do not perform data processing in DAG files.

Since DAGs are python-based, we will definitely be tempted to use pandas or similar stuff in DAG, but we should not. Airflow is an orchestrator, not an execution framework. All computation should be delegated to a specific target system. Follow the fire and track approach. Use the operator to start the task and the sensor to track the completion. Airflow is not designed for long-running tasks.

emr_add_step = EmrAddStepsOperator(
task_id='demo_step',
job_flow_id='<EMR_CLUSTER_ID>',
aws_conn_id='aws_default',
steps=<SPARK_STEPS>
)
emr_step_status_checker = EmrStepSensor(
task_id='check_step',
job_flow_id='<EMR_CLUSTER_ID>',
step_id="{{ task_instance.xcom_pull(task_ids='demo_step', key='return_value')[0] }}",
aws_conn_id='aws_default'
)

Delegate API or DB calls to operators

This is somewhat similar to the first point. API call or DB connection made at top-level code in DAG files overloads the scheduler & webserver. These call defined outside of operator is called on every heartbeat. So it is advisable to have these pushed down to a util/common (can be a python operator) operator.

Make DAGs/Tasks idempotent

DAG should produce the same data on every run. Read from a partition and write to a partition. Both should be immutable. Handle partition creation and deletion to avoid unknown errors.

Use single variable per DAG

Every time we access DAG variables it creates a connection to metadata DB. It may overload the Db if we are having multiple DAGs running with multiple variables being called. It's better to use a single variable per DAG with a JSON object. This will create a single connection. We can parse the JSON to get the desired key-value pair.

# Combining SSM keys in one
dag_specific_variable = '{
"variable_1" : "value_1",
"variable_2" : "value_2",
"variable_3" : "value_3"
}'
# Single call to get all three varaibles
dag_specific_params = Variable.get("dag_specific_variable", deserialize_json=True)
# Using these in DAG
{{ var.json.dag_specific_params.variable_1 }}

Tag the DAG

Having Tags in DAG helps in filtering and grouping DAGs. Make it consistent with your infrastructure’s current tagging system. Like tag based on BU, Project, App Category, etc.

Don’t Abuse XCom

XCom acts as a channel between tasks for sharing data. It uses backend DB to do so. Hence we should not pass a huge amount of data using this, as with a bigger amount of data the backend DB will get overloaded.

Use intermediate storage between tasks.

If data to be shared between two tasks are huge store it in an intermediate storage system. And pass the reference of it to the downstream task.

Use the power of Jinja templating

Many of the operators support template_fields. This tuple object defines which fields will get jinjaified.

class PythonOperator(BaseOperator):
template_fields = ('templates_dict', 'op_args', 'op_kwargs')

While writing your custom operator overrides this template_fields attribute.

class CustomBashOperator(BaseOperator):
template_fields = ('file_name', 'command', 'dest_host')

The above example is the fields ‘file_name’, ‘command’, ‘dest_host’ will be available for jinja templating.

You can access variables as well with the template.

{{ var.value.key_name }}

Params can also be templatized like below:

# Define param
params={
"param_key_1": "param_value_1",
"param_key_2": "param_value_2"
}
# access param using template
{{ params.param_key_1 }}

Implement DAG Level Access control

Leverage Flask App Builder views to have DAG level access control. Set the DAG owner to correct Linux user. Create a custom role to decide who can take DAG/Task actions.

Use static start_date

Static DAG start date helps in, correctly populating DAG runs and schedule.

Rename DAGs in case of structural change

Till the time the DAG versioning feature is implemented, in case of any structural change in DAG rename the DAG on changes. This will create a new DAG and all DAG history of the previous run for the old DAG version will be there without any inconsistency.

Some other best practices:

  • Set retries at the DAG level
  • Use consistent file structure
  • Choose a consistent method for task dependencies
  • Have notification strategy on failure

Keep an eye on the upcoming enhancements to airflow:

  • Functional DAG
  • DAG Serialization
  • Scheduler HA
  • Production grade REST APIs
  • Smart Sensors
  • Task Groups

Have fun with DAGs.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store