Unleashing the Power of Datasets in Airflow: Revolutionize Your DAG Scheduling
In the realm of modern data engineering, orchestrating intricate workflows seamlessly and efficiently is paramount. Apache Airflow, a stalwart in the field, has been empowering developers with its robust capabilities. Now, the introduction of datasets as a core feature brings an unprecedented level of reactivity and intelligence to your Directed Acyclic Graphs (DAGs). Imagine a world where your data pipelines adapt dynamically to changes in data availability or updates, minimizing unnecessary executions and maximizing resource utilization. Welcome to the era of using datasets in Airflow.
In this blog post, we embark on a journey to uncover the boundless potential of leveraging datasets in Airflow. We’ll explore the nuts and bolts of data-aware scheduling, configure dataset sensors, orchestrate DAGs, and unlock the true power of adaptive workflows. Prepare to elevate your data engineering endeavors as we dive into the realm of datasets, where your DAGs evolve from static routines into dynamic and responsive data engines. The link to the official Airflow documentation serves as a guiding beacon throughout this exciting exploration. Let’s revolutionize the way we schedule and trigger DAGs with datasets in Airflow.
What is Data Aware Scheduling in Airflow?
The game-changing aspect of datasets is their ability to trigger DAGs in response to changes. Imagine a scenario where you’re processing user logs, and a new log file becomes available. Traditionally, you might run your pipeline on a schedule, regardless of whether new data exists. With data-aware scheduling, your pipeline can be primed to execute only when the dataset — in this case, the new log file — is updated. This eliminates redundant processing, ensuring your computational resources are allocated optimally.
Let’s understand how Datasets can be useful ?
Datasets can help resolve common issues. For example, consider a data engineering team with a DAG that creates a dataset and an analytics team with a DAG that analyses the dataset. Using datasets, the data analytics DAG runs only when the data engineering team’s DAG publishes the dataset.
Datasets pave the way for advanced coordination among DAGs. By configuring DAGs to listen for changes in specific datasets, you can craft master DAGs that orchestrate the flow of tasks intelligently. When a producer task updates a dataset, the master DAG can trigger consumer DAGs that depend on this fresh data. This orchestrated handover ensures that tasks are executed only when prerequisites are met, fostering efficient data flow across your pipelines.
Understanding Datasets in Airflow
An Airflow dataset is a stand-in for a logical grouping of data. Datasets may be updated by upstream “producer” tasks, and dataset updates contribute to scheduling downstream “consumer” DAGs.
A dataset is defined by a Uniform Resource Identifier (URI):
from airflow.datasets import Dataset
''' s3://dataset-bucket/example.csv :- Path of the file which will be used for
scheduling. '''
example_dataset = Dataset("s3://dataset-bucket/example.csv")
Airflow makes no assumptions about the content or location of the data represented by the URI. It is treated as a string, so any use of regular expressions (eg input_\d+.csv
) or file glob patterns (eg input_2022*.csv
) as an attempt to create multiple datasets from one declaration will not work.
We must pass the specific path of the file which you want to be modified, once the Parent DAG succeeded. If file doesn’t exist then it will create a new file at the same location you passed in the URI.
Why use datasets?
Datasets allow you to define explicit dependencies between DAGs and updates to your data. This helps you to:
- Standardize communication between teams. Datasets can function like an API to communicate when data in a specific location has been updated and is ready for use.
- Reduce the amount of code necessary to implement cross-DAG dependencies. Even if your DAGs don’t depend on data updates, you can create a dependency that triggers a DAG after a task in another DAG updates a dataset.
- Get better visibility into how your DAGs are connected and how they depend on data. The Datasets tab in the Airflow UI shows a graph of all dependencies between DAGs and datasets in your Airflow environment.
- Reduce costs, because datasets do not use a worker slot in contrast to sensors or other implementations of cross-DAG dependencies.
Putting Theory into Practice: Step-by-Step Implementation
Follow below steps to schedule your DAG using Datasets.
If you have not yet installed Airflow on your Windows computer, click this link to follow the instructions and connect to an instance of Airflow using the Visual Studio code provided below.
- Lets create our first DAG —
parent_dataset_dag
under DAG folder
I have a source file source_file.txt
at path “/home/raj/airflow”. We can mention any file as per our use case.
Example:- If the purpose of your parent DAG is to retrieve data from the APIs, you can write the data down and include the file’s location in the URI so that, once the file is edited, the child DAGs can be triggered.
2. Copy below code in parent_dataset_dag.py
file
from airflow import Dataset, DAG, task
from datetime import datetime, timedelta
import requests
# Variable to store the source file path. We can use Variables or secret variables to store the location of the file for more security.
my_file = Dataset('/home/raj/airflow/source_file.txt')
default_args = {
'owner':'Rajesh',
'retries': 1,
'start_date':datetime(2023, 1, 2),
'email': 'routr5953@gmail.com',
'email_on_failure ': False,
}
with DAG(
'Parent Dataset DAG',
default_args=default_args,
description='Parent DAG for Datasets',
schedule_interval=None,
tags=['Datasets'],
) as dag:
@task
def print_task():
print('This is my first task')
''' OUTLETS - This parameter is passed to indicate that this
task will update the Dataset once completed.
'''
@task(outlets = [my_file])
def get_data_from_api():
url = 'https://api.publicapis.org/entries' # public API
response = requests.get(url).json()
with open(my_file.uri, "a+") as file: # my_file.uri is the dataset name and .uri is the extension for datasets
file.write(str(response)) # Updating the Source File
print('\nData from the API is written in the Source file\n')
In the above code:-
outlets
— You can reference the dataset in a task by passing it to the task’soutlets
parameter.outlets
is part of theBaseOperator
, so it's available to every Airflow operator. This operator indicates that the task it is assigned to, will be updating the dataset once its executed successfully.my_file.uri
— Its the dataset name and .uri is the extension for dataset.
The DAG code above retrieves data from a public API, converts it to JSON, and writes it to the source file. The outlets
argument, which we gave, indicates that the dataset will be updated following the execution of get_data_from_api()
task.
For the purpose of scheduling further dependent DAGs, we generated a dataset using the Dataset method and specified the location of a file that will be updated later and will be used for scheduling other dependent DAGs.
3. Save the file and refesh Airflow UI to see the Dag.
4. Similarly, we will create another DAG — Child_Dataset_dag
5. Copy below code in child_dataset_dag.py
file
from airflow import Dataset, DAG
from airflow.decorators import task
from datetime import datetime, timedelta
import requests
# Variable to store the source file path which will be updated by the Parent DAG.
my_file = Dataset('/home/raj/airflow/source_file.txt')
default_args = {
'owner':'Rajesh',
'retries': 1,
'start_date':datetime(2023, 1, 2),
'email': 'routr5953@gmail.com',
'email_on_failure ': False,
}
with DAG(
'Child_Dataset_DAG',
default_args=default_args,
description='A Child DAG for Datasets',
schedule = [my_file], # we are adding [my_file] dataset as the schedule for the Child DAG
tags=['Datasets'],
) as dag:
@task
def print_task():
print('This is my child task')
@task()
def read():
with open(my_file.uri, "r+") as file:
print(file.read())
Schedule : [my_file]
— It schedules everything according to the dataset we have produced. The DAG will only be run in the event that the dataset is changed or modified.
In the above code, We have created the same dataset with the same file location as we will be scheduling the child DAG as per the dataset updates/modification from Parent DAG.
In the schedule parameter, we are passing dataset name.
In both Parent and Child DAG, we need to define the dataset before using it as shown below.
6. Save the file and refresh Airflow UI to see both the DAGs
For the Child_Dataset_DAG
, scheduling is based on Dataset as shown below.
Child_Dataset_DAG
will be executed after Parent_Dataset_DAG
has modified the file/Dataset. We can define a schedule for the Parent DAG based on our needs. At the moment, I have it set to None.
7. Now lets trigger the Parent DAG and see how its executes the Child DAG.
Child DAG also got triggered due to the modification of the dataset as shown below.
For the Child DAG, we can see the logs for read()
task as well.
To check the dependency of DAGs, click on Datasets
in the Airflow UI.
In the above figure, we can see that Parent Dag will get executed first and modify the Dataset after successful execution. Modification of the file will lead to execution of the Child DAG.
In conclusion, this blog has taken us on a journey through the remarkable world of datasets in Apache Airflow, shedding light on the revolutionary concept of data-aware scheduling for Directed Acyclic Graphs (DAGs). We’ve explored how this feature empowers data engineers to build workflows that are not only robust and efficient but also responsive and adaptive.
By leveraging datasets, we have learned how to orchestrate data pipelines that dynamically respond to changes in data availability, reducing unnecessary executions and maximizing resource utilization. This shift from static routines to dynamic, intelligent DAGs opens up a new realm of possibilities in the field of data engineering.
Stay Connected on LinkedIn
For more exciting updates, discussions, and deeper dives into the world of data engineering and technology, I invite you to connect with me on LinkedIn. Follow me on LinkedIn for the latest news, articles, and discussions on data engineering, Airflow, and related topics. Thank you!