First Airflow DAG: Getting started

Lakshay Gupta
Simpplr Technology
5 min readApr 10, 2024

--

Introduction

In this tutorial we will learn how to develop a simple DAG with group of tasks using different operators.

Terminologies

Task

Smallest unit of work in an airflow DAG is called a task. A task in airflow UI can also be visualized as a node in the graph.

Task Instance

A particular task run for a DAG is called as task instance. An instance is uniquely identified with a combination of DAG, task and timestamp. A task instance can be in one of the following states.

  1. success
  2. failed
  3. skipped
  4. running
  5. up for retry

Operators

Operator is a behaviour of a task. Using this behaviour we can design our task to do some some operation. Following are types of operator.

  • BashOperator - runs a bash command
  • PythonOperator - pass control to a python method
  • EmailOperator - sends an email
  • SimpleHttpOperator - sends an HTTP request
  • MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator- runs a SQL command
  • Sensor - waits for a certain time, file, database row, S3 key, etc…

XComs

It is not recommended but airflow gives flexibility to share data between two tasks/operators via Xcoms. They let you communicate by exchanging messages. Xcoms can be pushed or pulled(sent or received).

DAG

Directed acyclic graph is a group of tasks visualised as a directed relationship.

First Airflow DAG

To develop a DAG job, we first need to setup airflow in local. Read my another story here on how to setup airflow development environment.

Step 1: Open airflow.cfg file and check the location of dags_folder. You will find that dags folder doesn’t exist. Go ahead and create a new ‘python package’ under airflow folder.

Step 2: Start airflow using ‘airflow standalone’ and set an environment variable to be used in our DAG.

  1. Goto Admin -> Variables

2. Click ‘Add a new record’

3. Add a variable as shown in screenshot and click ‘Save’

Step 3: Create a new python file in dags folder with name ‘harry_potter.py’ and paste below code. Ideally, airflow scheduler should be able to pick the newly created DAG, however if you do not see your DAG, restart airflow using ‘airflow standalone’ command.

from datetime import datetime, timedelta
from airflow.models import Variable
from airflow import DAG
from airflow.models.param import Param
from airflow.operators.empty import EmptyOperator
from airflow.operators.python import PythonOperator, BranchPythonOperator

"""
Variable while testing from local are read from environment variables that we set in
IntelliJ or from Airflow Variables when deployed in production.

In this case we are reading a variable called 'spell' and we will have to set this variable in environment as
'AIRFLOW_VAR_SPELL' so that airflow runtime can identify that this is an environment variable for airflow DAG.
"""
spell: str = Variable.get("spell")

"""
Create set of default args to pass when declaring a DAG
"""
args = {
"depends_on_past": False,
"email_on_failure": False
}

"""
There are different ways to declare a DAG which we will see in another tutorial, however we will choose the most
easiest one.
"""
dag = DAG(
"SPELLS", # DAG name
default_args=args,
schedule=timedelta(days=1), # Runs after 1 day
start_date=datetime(2024, 1, 1), # Start the DAG from Jan 01st, 2024
catchup=False, # Run all past job if deploying later than start_date declared above
tags=["fiction"], # Create tags to differentiate between DAGs
params={
"message": Param("The Chamber of Secrets", type="string", maxLength=30) # Pass run time variables and set default run time variable
}
)


def cast_spell(**context) -> str:
"""
Cast spell is a method that will be executed using a python operator. context variable is used to access
params passed at run time.
:param context:
:return: str
"""
if spell == 'alohomora':
return "open"
return "access_denied"


def aparecium(**context):
"""
Another method that reads from the context of DAG and prints a message in log
:param context:
:return: N
"""
message = context["params"]["message"]
print(f"Your have unlocked {message}")


with dag:
cast_spell = BranchPythonOperator(
task_id='cast_spell',
python_callable=cast_spell,
dag=dag,
)

# task_id has to be made of alphanumeric characters, dashes, dots and underscores exclusively
access_denied = EmptyOperator(task_id="access_denied")

aparecium = PythonOperator(
task_id="aparecium",
python_callable=aparecium,
op_kwargs={"action": "Disapparate"},
provide_context=True
)

cast_spell.set_downstream([access_denied, aparecium])

if __name__ == "__main__":
dag.test(run_conf={"message": 'The Goblet of Fire'})

Step 4: Open http://localhost:8080 and filter DAG by the tag ‘fiction’. By default the DAG is paused, click on button on left of the DAG to unpause it. Click play button on the right hand side under Actions to trigger the DAG.

Step 5: Click on the SPELLS hyperlink and open the DAG to view the graph.

Step 6: View logs in IntelliJ to debug any issues.

Conclusion:

Try out this hand-on guide to build you first airflow DAG and reach out in comments or linkedin for any questions. You can also drop me an email at lakshg08[at]gmail[dot]com.

Want to discuss the world of Harry Potter, you know where to find me.

Happy Learning!!!

--

--

Lakshay Gupta
Simpplr Technology

Developer at heart, Tester in mind, Data Engineering Manager/Architect at job