Revolutionize Data Workflow with Airflow ETL Pipeline in Python

Chirag K
Simform Engineering
9 min readApr 19, 2023

Explore how to design scalable workflows with Airflow in Python to manage the ETL pipeline.

Python ETL pipeline in AIrflow

Unlock the power of programmable and scalable workflows with Airflow! Say goodbye to the headache of managing ETL pipelines and data workflows manually. With Airflow, defining tasks and dependencies is a breeze — and scheduling and monitoring their execution is effortless. You’ll love the simplicity and ease of use of this powerful platform!

Overview:

  • Install airflow
  • Prepare ETL code
  • Dockerise Airflow
  • Schedule DAG
  • Airflow Variables & Connections
  • Airflow Trigger Rules
  • Task Instance
  • Airflow Use Cases
  • Airflow Advantages & Disadvantages

Install Airflow

1. Using pip: Airflow can be easily installed using pip, the Python package manager. The command to install Airflow using pip is pip install apache-airflow

2. Using conda: Airflow can also be installed using conda, the package manager for Anaconda. The command to install Airflow using conda is conda install -c conda-forge airflow

3. From source: Airflow can also be installed by downloading the source code and installing it manually. The steps to install Airflow from a source are as follows:

  • Clone the Airflow repository from GitHub
  • Install the required dependencies
  • Run the command python setup.py install

4. Using Docker: Airflow can also be installed using a pre-configured Docker image. The steps to install Airflow using Docker are as follows:

  • Pull the Airflow Docker image
  • Start the Docker container
  • Configure and run the Airflow service

5. Using Helm: Airflow can also be installed using Helm, a package manager for Kubernetes. The steps to install Airflow using Helm are as follows:

  • Add the Airflow Helm repository
  • Install the Airflow chart
  • Configure and run the Airflow service

Note: Please ensure your system meets the requirement for Airflow before installing it. Check the prerequisites from Here.

Prepare ETL code

An ETL pipeline in Airflow typically consists of several tasks, defined as Operators in Airflow, strung together to form a Directed Acyclic Graph (DAG) of tasks.

Here is an example of an ETL pipeline:

  • Extracts data from a source system (e.g. a database or API)
  • Transforms the data
  • Loads it into a target system (e.g. a data warehouse or data lake)
from datetime import datetime, timedelta
from airflow.decorators import dag, task

default_args = {
'owner': 'me',
'retries': 1,
'retry_delay': timedelta(minutes=5),
}

@dag(
default_args=default_args,
schedule_interval="@daily",
start_date=datetime(2023, 1, 1),
catchup=False
)
def sample_etl_pipeline():

@task()
def extract_data():
"""
Extraction part of ETL pipeline
We can extract data from many sources such as S3, MYSQL, postgres,
DynamoDB, etc.
"""
return True

@task()
def transform_data(extracted_data_path):
"""
This is the transformation part of any pipeline
We can apply any transformation to the data retrieved by extraction
"""
return True

@task()
def load_data(transformed_data_path):
"""
Load data.
We can load the data to any database as per your requirements
"""
return True

extracted_data_path = extract_data()
transformed_data = transform_data(extracted_data_path)
load_data(transformed_data)
dag_run = sample_etl_pipeline()

Dockerise Airflow

Docker-compose.yaml

download the latest docker-compose file from airflow.

DockerFile

FROM apache/airflow:2.5.0
USER 0
User airflow
RUN pip install numpy \
&& pip install pandas \
&& pip install datetime \
&& pip install airflow-code-editor

USER airflow

Run Airflow Server

To run the airflow server using Docker

docker-compose up airflow-init

airflow-init is the initialization service that is used to run database migrations and create the first user account.

You can define the username _AIRFLOW_WWW_USER_USERNAME and password _AIRFLOW_WWW_USER_PASSWORD in docker-compose.yaml file as below

docker-compose build

It will build the services in the docker-compose.yaml file.

docker-compose up

Docker command to start and run an entire app on a standalone host that contains multiple services. It also creates volumes and networks and attaches to the containers that are defined in a file called ‘docker-compose.yam

It contains multiple services like:

airflow-scheduler: It Monitors all tasks and DAGs, triggering the task instances once their dependencies are complete

airflow-webserver: It is the User Interface (UI) of Airflow, which can be used to get an overview of the overall health of various Directed Acyclic Graphs (DAG) as well as assist in visualizing different components and states of each DAG. The web server is available at http://localhost:8080.

airflow worker: The worker that executes the tasks given by the scheduler.

flower: The Flower app for monitoring the environment. It’s available at http://localhost:5555.

Postgres: The database.

redis: The Redis broker that forwards messages from the scheduler to the worker

For verification of running containers, you can use docker ps -a which will list all the running containers:

After the execution of all of the above steps, you will get the server running on http://localhost:8080

Enter the username and password which you have defined in your docker-compose.yaml file Once you log in to this page you will be redirected to the home page. Your DAG/DAGs will be present on the same page as shown below:

To start this DAG you have to enable the trigger button so that the DAG will start executing its tasks.

Schedule DAG

  • We can provide the start_date and schedule_interval for scheduling the dag.
  • Airflow provides many convenient names like ‘@daily’, ‘@hourly ’, ‘@once’ etc.
  • You can also provide the cron jobs for scheduling jobs at a specific time

Airflow Variables

Airflow variables are the best approach to saving and retrieving various contents.

You can store and modify different kinds of content like JSON, credentials, and plain text.

In addition, JSON settings files can be uploaded through the UI using the import button (Admin -> Variables)

Variables create a connection to the metadata database every time you call the Variable so it would be great if your DAG has lesser Variables. If you have more content to store in the Variable then it is recommended to store all DAG content/ configurations within a single airflow Variable.

E.g. We have three variables:var1 = {val:[“v1”,”v2”]}, var2 = [“a”,”b”,”c”] and var3 = ”data_value”.

If we want to call them in a normal way as var1 = Variable. get(“var1”), var2 = Variable. get(“var2”), and var3 = Variable. get(“var3”), so in this manner when you call a variable it will make the Database connection.

For these three variables, it will create three variables in DB. To optimize these we can create one Variable as main_var = {var1:{val:[“v1”,”v2”]}, var2:[“a”,”b”,”c”], var3:”data_value”} and call them as variable_data = Variable.get(“main_var”, deserialize_json = True).

So using this approach we can reduce the DB connections from 3 to 1. we can access all three variables using var1 = variable_data[“var1”], var2 = variable_data[“var2”], and var3 = variable_data[“var3”]

Airflow Connections

Airflow connections are used to connect with other tools in the data ecosystem

You need to provide connection_id/ connection_type/ hostname/ username/ password/ port/ schema etc.

Passwords in connections will be stored encrypted in a connection table

Airflow even provides the functionality to test the connection so that you can verify that the connection you made is correct or if any issues are there with the connection.

You can access the connections by specifying the relevant conn_id.

Airflow Trigger Rules

Airflow will wait for all the parent/upstream tasks for successful completion before it runs that task. However, this is just the default behavior of the Airflow, and you can control it using the trigger_rule the argument to a Task.

Task Instance

Airflow Task Instances are defined as a representation for, “a specific run of a Task”

Each Airflow Task Instance has a follow-up loop that indicates which state the Airflow Task Instance falls upon.

Some of the states of Task Instances are as follows:

Two Methods to Trigger Airflow DAGs

1. Trigger DAG using schedule: You have to specify the schedule Trigger so that it can run automatically using that.

2. Trigger DAG manually: Trigger DAG manually using Airflow UI or with the CLI

  • In Airflow web interface click on DAG page, enable the toggle button of Trigger DAG
  • We can also specify the DAG run with configuration

Airflow Use Cases

  • ETL processes(Extract, Transform, Load): Airflow is used to schedule and automate ETL processes from various sources and systems.
  • Reporting and analytics: Schedule and automate report generation and data analysis tasks.
  • Data pipelines: Airflow can be used for building and maintaining complex data pipelines, which involve multiple steps, such as extracting data from various sources, cleaning and transforming it, and loading it into a data warehouse or other storage systems.
  • Machine learning workflows: Airflow can be used to orchestrate and manage machine learning workflows, such as model training, evaluation, and deployment.
  • Customer analytics: Airflow can be used to automate the process of collecting and analyzing customer data, such as web analytics, customer feedback, and customer support data. Workflows can be created to extract and transform the data, and to generate reports and visualizations for analysis.
  • Marketing campaigns: Airflow can be used to automate marketing campaigns, such as ad targeting, email marketing, social media promotions, and ad targeting.
  • Social media monitoring: Airflow can be used to automate the process of monitoring social media channels for mentions of a brand or product. Workflows can be created to trigger alerts or other actions based on certain keywords or phrases.

Airflow advantages

  • A large number of hooks: extensibility and simple integrations of hooks.
  • Concurrent: Task concurrency and multiple schedulers
  • Open-source approach: Active and continuously growing community
  • Monitoring: Apache Airflow has a great UI, where you can see the status of your DAG, check run times, check logs, re-run tasks, and much more.
  • Community Support: Airflow has a large and active community, which means that users can benefit from the knowledge and expertise of other users.

Airflow disadvantages

  • No versioning of workflows: When you delete tasks from your DAG script, they disappear from Airflow UI as well as all the metadata about them. You won’t be able to roll back to the previous modification of your pipeline.
  • Debugging: Debugging Airflow workflows can be challenging, and it may take some time to locate and fix errors in the workflow.
  • No native Windows support: It is not straightforward to run Airflow natively on Windows. But this can be done by using Docker.

Takeaways

  • Airflow’s DAG-based architecture and intuitive user interface make it easy to create, schedule, and monitor your workflows. Airflow provides a wide range of built-in operators and integrations with popular data tools, such as Apache Spark and AWS.
  • Airflow’s Python API allows you to programmatically define and execute your workflows, making it easy to integrate Airflow into your existing data workflows.
  • Airflow provides a number of advanced features, such as dynamic task generation, trigger rules, and SLA monitoring, that can help you optimize your data workflows and ensure high-quality data processing.

Here are some reference links you might find helpful after learning Airflow with Python:

  1. Airflow Plugins: https://airflow.apache.org/docs/apache-airflow/stable/plugins.html
  2. Building a Custom Airflow Operator: https://towardsdatascience.com/building-a-custom-airflow-operator-4f13266d6e7c
  3. Airflow Python API reference: https://airflow.apache.org/docs/apache-airflow/stable/_api/index.html
  4. Airflow Extending: https://airflow.apache.org/docs/apache-airflow/stable/extending.html

Conclusion

Airflow is a powerful tool for managing ETL workflows in Python. With the ability to define tasks as Python functions or call external Python scripts, Airflow provides a seamless way to integrate your existing Python code into your data workflows.

Airflow also provides a range of built-in operators and integrations for popular Python libraries such as Pandas and NumPy, making it easy to incorporate these tools into your workflows. Additionally, Airflow’s ability to dynamically generate tasks and handle complex task dependencies makes it well-suited for more complex ETL workflows in Python. Overall, Airflow’s Python API and seamless integration with Python make it a powerful tool for managing ETL workflows in Python.

Get the edge you need to succeed — follow Simform Engineering for crucial updates on tools and technologies!

--

--