Unleashing the Power of Datasets in Airflow: Revolutionize Your DAG Scheduling

Rajesh Ku. Rout
8 min readOct 12, 2023

--

In the realm of modern data engineering, orchestrating intricate workflows seamlessly and efficiently is paramount. Apache Airflow, a stalwart in the field, has been empowering developers with its robust capabilities. Now, the introduction of datasets as a core feature brings an unprecedented level of reactivity and intelligence to your Directed Acyclic Graphs (DAGs). Imagine a world where your data pipelines adapt dynamically to changes in data availability or updates, minimizing unnecessary executions and maximizing resource utilization. Welcome to the era of using datasets in Airflow.

In this blog post, we embark on a journey to uncover the boundless potential of leveraging datasets in Airflow. We’ll explore the nuts and bolts of data-aware scheduling, configure dataset sensors, orchestrate DAGs, and unlock the true power of adaptive workflows. Prepare to elevate your data engineering endeavors as we dive into the realm of datasets, where your DAGs evolve from static routines into dynamic and responsive data engines. The link to the official Airflow documentation serves as a guiding beacon throughout this exciting exploration. Let’s revolutionize the way we schedule and trigger DAGs with datasets in Airflow.

What is Data Aware Scheduling in Airflow?

The game-changing aspect of datasets is their ability to trigger DAGs in response to changes. Imagine a scenario where you’re processing user logs, and a new log file becomes available. Traditionally, you might run your pipeline on a schedule, regardless of whether new data exists. With data-aware scheduling, your pipeline can be primed to execute only when the dataset — in this case, the new log file — is updated. This eliminates redundant processing, ensuring your computational resources are allocated optimally.

Let’s understand how Datasets can be useful ?

Datasets can help resolve common issues. For example, consider a data engineering team with a DAG that creates a dataset and an analytics team with a DAG that analyses the dataset. Using datasets, the data analytics DAG runs only when the data engineering team’s DAG publishes the dataset.

Datasets pave the way for advanced coordination among DAGs. By configuring DAGs to listen for changes in specific datasets, you can craft master DAGs that orchestrate the flow of tasks intelligently. When a producer task updates a dataset, the master DAG can trigger consumer DAGs that depend on this fresh data. This orchestrated handover ensures that tasks are executed only when prerequisites are met, fostering efficient data flow across your pipelines.

Understanding Datasets in Airflow

An Airflow dataset is a stand-in for a logical grouping of data. Datasets may be updated by upstream “producer” tasks, and dataset updates contribute to scheduling downstream “consumer” DAGs.

A dataset is defined by a Uniform Resource Identifier (URI):

from airflow.datasets import Dataset
''' s3://dataset-bucket/example.csv :- Path of the file which will be used for
scheduling. '''
example_dataset = Dataset("s3://dataset-bucket/example.csv")

Airflow makes no assumptions about the content or location of the data represented by the URI. It is treated as a string, so any use of regular expressions (eg input_\d+.csv) or file glob patterns (eg input_2022*.csv) as an attempt to create multiple datasets from one declaration will not work.

We must pass the specific path of the file which you want to be modified, once the Parent DAG succeeded. If file doesn’t exist then it will create a new file at the same location you passed in the URI.

Why use datasets?

Datasets allow you to define explicit dependencies between DAGs and updates to your data. This helps you to:

  • Standardize communication between teams. Datasets can function like an API to communicate when data in a specific location has been updated and is ready for use.
  • Reduce the amount of code necessary to implement cross-DAG dependencies. Even if your DAGs don’t depend on data updates, you can create a dependency that triggers a DAG after a task in another DAG updates a dataset.
  • Get better visibility into how your DAGs are connected and how they depend on data. The Datasets tab in the Airflow UI shows a graph of all dependencies between DAGs and datasets in your Airflow environment.
  • Reduce costs, because datasets do not use a worker slot in contrast to sensors or other implementations of cross-DAG dependencies.

Putting Theory into Practice: Step-by-Step Implementation

Follow below steps to schedule your DAG using Datasets.

If you have not yet installed Airflow on your Windows computer, click this link to follow the instructions and connect to an instance of Airflow using the Visual Studio code provided below.

Connected to Ubuntu using remote server
  1. Lets create our first DAG — parent_dataset_dag under DAG folder
Create parent_dataset_dag.py file

I have a source file source_file.txt at path “/home/raj/airflow”. We can mention any file as per our use case.

Example:- If the purpose of your parent DAG is to retrieve data from the APIs, you can write the data down and include the file’s location in the URI so that, once the file is edited, the child DAGs can be triggered.

2. Copy below code in parent_dataset_dag.py file

from airflow import Dataset, DAG, task
from datetime import datetime, timedelta
import requests

# Variable to store the source file path. We can use Variables or secret variables to store the location of the file for more security.
my_file = Dataset('/home/raj/airflow/source_file.txt')

default_args = {
'owner':'Rajesh',
'retries': 1,
'start_date':datetime(2023, 1, 2),
'email': 'routr5953@gmail.com',
'email_on_failure ': False,

}

with DAG(
'Parent Dataset DAG',
default_args=default_args,
description='Parent DAG for Datasets',
schedule_interval=None,
tags=['Datasets'],
) as dag:

@task
def print_task():
print('This is my first task')

''' OUTLETS - This parameter is passed to indicate that this
task will update the Dataset once completed.
'''
@task(outlets = [my_file])
def get_data_from_api():
url = 'https://api.publicapis.org/entries' # public API

response = requests.get(url).json()

with open(my_file.uri, "a+") as file: # my_file.uri is the dataset name and .uri is the extension for datasets
file.write(str(response)) # Updating the Source File

print('\nData from the API is written in the Source file\n')

In the above code:-

  • outlets — You can reference the dataset in a task by passing it to the task’s outlets parameter. outlets is part of the BaseOperator, so it's available to every Airflow operator. This operator indicates that the task it is assigned to, will be updating the dataset once its executed successfully.
  • my_file.uri — Its the dataset name and .uri is the extension for dataset.

The DAG code above retrieves data from a public API, converts it to JSON, and writes it to the source file. The outletsargument, which we gave, indicates that the dataset will be updated following the execution of get_data_from_api() task.

For the purpose of scheduling further dependent DAGs, we generated a dataset using the Dataset method and specified the location of a file that will be updated later and will be used for scheduling other dependent DAGs.

Dataset created — my_file

3. Save the file and refesh Airflow UI to see the Dag.

Parent Dataset Dag

4. Similarly, we will create another DAG — Child_Dataset_dag

Create child_dataset_dag.py file

5. Copy below code in child_dataset_dag.py file

from airflow import Dataset, DAG
from airflow.decorators import task
from datetime import datetime, timedelta
import requests

# Variable to store the source file path which will be updated by the Parent DAG.
my_file = Dataset('/home/raj/airflow/source_file.txt')

default_args = {
'owner':'Rajesh',
'retries': 1,
'start_date':datetime(2023, 1, 2),
'email': 'routr5953@gmail.com',
'email_on_failure ': False,

}

with DAG(
'Child_Dataset_DAG',
default_args=default_args,
description='A Child DAG for Datasets',
schedule = [my_file], # we are adding [my_file] dataset as the schedule for the Child DAG
tags=['Datasets'],
) as dag:

@task
def print_task():
print('This is my child task')

@task()
def read():

with open(my_file.uri, "r+") as file:
print(file.read())
  • Schedule : [my_file]— It schedules everything according to the dataset we have produced. The DAG will only be run in the event that the dataset is changed or modified.

In the above code, We have created the same dataset with the same file location as we will be scheduling the child DAG as per the dataset updates/modification from Parent DAG.

In the schedule parameter, we are passing dataset name.

schedule = [my_file]

In both Parent and Child DAG, we need to define the dataset before using it as shown below.

6. Save the file and refresh Airflow UI to see both the DAGs

For the Child_Dataset_DAG, scheduling is based on Dataset as shown below.

Child DAG schedule is set to Dataset

Child_Dataset_DAG will be executed after Parent_Dataset_DAG has modified the file/Dataset. We can define a schedule for the Parent DAG based on our needs. At the moment, I have it set to None.

Parent DAG schedule set to None

7. Now lets trigger the Parent DAG and see how its executes the Child DAG.

Parent DAG triggered at 19:17:16

Child DAG also got triggered due to the modification of the dataset as shown below.

Child DAG executed at 19:17:22

For the Child DAG, we can see the logs for read() task as well.

It read the API data from the source file we had created at the beginning.

To check the dependency of DAGs, click on Datasets in the Airflow UI.

Dependency of DAGs with Dataset

In the above figure, we can see that Parent Dag will get executed first and modify the Dataset after successful execution. Modification of the file will lead to execution of the Child DAG.

In conclusion, this blog has taken us on a journey through the remarkable world of datasets in Apache Airflow, shedding light on the revolutionary concept of data-aware scheduling for Directed Acyclic Graphs (DAGs). We’ve explored how this feature empowers data engineers to build workflows that are not only robust and efficient but also responsive and adaptive.

By leveraging datasets, we have learned how to orchestrate data pipelines that dynamically respond to changes in data availability, reducing unnecessary executions and maximizing resource utilization. This shift from static routines to dynamic, intelligent DAGs opens up a new realm of possibilities in the field of data engineering.

Stay Connected on LinkedIn

For more exciting updates, discussions, and deeper dives into the world of data engineering and technology, I invite you to connect with me on LinkedIn. Follow me on LinkedIn for the latest news, articles, and discussions on data engineering, Airflow, and related topics. Thank you!

--

--

Rajesh Ku. Rout

Apache Airflow Global Champion Snowflake Squad Member 2024 --- Data Scientist - www.linkedin.com/in/rajeshrout97