Moving fast with airflow

Simple Guide To Get Started With Apache Airflow

Karthik R
The Pythoneers

--

Photo by oğuz can on Unsplash

What is Airflow?

Apache Airflow is an open-source platform for orchestrating complex workflows and data pipelines. It allows users to define, schedule, and monitor workflows, making managing and automating data processing tasks easier.

In simple terms, you can define pipelines using Python, which performs tasks. These tasks can be executing scripts to connecting or integrating with other services like AWS EC2, S3 buckets, spark, and lambda, and also can use Azure services, and many others as well, to view the complete list visit here

What makes airflow run

You will be interacting with airflow using a web server (UI) to monitor your pipelines or trigger them. These pipelines are workflows or DAGs that are written in Python.

As you can see in the diagram DAG author writes code to construct pipelines that reach the Operations User, who is an end-user, once DAG files are written.

Scheduler:

schedules the execution of Directed Acyclic Graphs (DAGs) and their individual tasks. It continuously monitors the metadata database to determine when a DAG or task should be executed based on its schedule (daily, hourly, or date-based). It creates and places task instances into the queue, making them ready for execution by the workers.

Triggerer:

initiates the execution of a DAG run or a specific task. Triggers can be manual or automatic. Once a schedule point is reached as we saw from the scheduler above, the triggerer executes or pushes it to the executor. Triggers can be scheduled, manual, or externally driven. You trigger manual ones, sensors can be used as triggers which are externally driven to airflow.

Executors:

Let’s talk about some main ones which cover the basic working of them. Executors are where the task as a unit runs. Executors can be local or remote. Executors help in parallelism of tasks so that multiple tasks can be run at the same time. SequentialExecutor is a local executor type and as parallelism set to 1. LocalExecutor can use many workers to achieve parallelism using the local machine, this executor can't scale horizontally. CeleryExecutor is used in production-based systems to achieve a high degree of efficiency and can handle larger workloads by scaling horizontally.

It uses Celery, a distributed task queue system, to distribute and parallelize the execution of tasks across multiple worker nodes.

Metadata DB:

The metadata database tracks the state, status, and history of DAG runs and task executions. It stores essential information about DAGs, tasks, DAG runs, task instances, task logs, and other related metadata. This information is used for tracking the state of workflow executions, recording historical data, and providing visibility into the execution history.

Apache Airflow supports multiple relational database management systems (RDBMS) as the backend for the metadata database. Commonly used databases include PostgreSQL, MySQL, SQLite, and Microsoft SQL Server.

Webserver UI:

The web server allows users to visualize Directed Acyclic Graphs (DAGs), trigger workflow runs, monitor task execution, and gain insights into the status and history of their workflows.

Users can visualize the structure of DAGs, see the dependencies between tasks, and review the status of previous and ongoing DAG runs. The DAG view in the UI provides an interactive representation of the workflow. It shows the tasks, their dependencies, and the status of each task. Users can drill down into individual tasks to view logs, metadata, and other details.

In UI you can see many such DAGs which are nothing but a workflow, a collection of a set of tasks. You can turn them on or off as per your wish. Once turned on they will run as per the scheduled interval.

Let's setup Airflow using Docker

We have discussed up to this point how airflow works behind the scenes briefly. Now we will look to write a simple Workflow and understand it, using Docker.

Before you get started install Docker according to your current OS, visit here if you don't have Docker. Also, we need VS code. Next, create a sample project directory and open that in VS code.

# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
#

# Basic Airflow cluster configuration for CeleryExecutor with Redis and PostgreSQL.
#
# WARNING: This configuration is for local development. Do not use it in a production deployment.
#
# This configuration supports basic configuration using environment variables or an .env file
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow.
# Default: apache/airflow:2.8.0
# AIRFLOW_UID - User ID in Airflow containers
# Default: 50000
# AIRFLOW_PROJ_DIR - Base path to which all the files will be volumed.
# Default: .
# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode
#
# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested).
# Default: airflow
# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account (if requested).
# Default: airflow
# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
# Use this option ONLY for quick checks. Installing requirements at container
# startup is done EVERY TIME the service is started.
# A better way is to build a custom image or extend the official image
# as described in https://airflow.apache.org/docs/docker-stack/build.html.
# Default: ''
#
# Feel free to modify this file to suit your needs.
---
x-airflow-common:
&airflow-common
# In order to add custom dependencies or upgrade provider packages you can use your extended image.
# Comment the image line, place your Dockerfile in the directory where you placed the docker-compose.yaml
# and uncomment the "build" line below, Then run `docker-compose build` to build the images.
image: ${AIRFLOW_IMAGE_NAME:-apache/airflow:2.8.0}
# build: .
environment:
&airflow-common-env
AIRFLOW__CORE__EXECUTOR: LocalExecutor
AIRFLOW__DATABASE__SQL_ALCHEMY_CONN: postgresql+psycopg2://airflow:airflow@postgres/airflow
AIRFLOW__CORE__FERNET_KEY: ''
AIRFLOW__CORE__DAGS_ARE_PAUSED_AT_CREATION: 'true'
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
AIRFLOW__API__AUTH_BACKENDS: 'airflow.api.auth.backend.basic_auth,airflow.api.auth.backend.session'
# yamllint disable rule:line-length
# Use simple http server on scheduler for health checks
# See https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/logging-monitoring/check-health.html#scheduler-health-check-server
# yamllint enable rule:line-length
AIRFLOW__SCHEDULER__ENABLE_HEALTH_CHECK: 'true'
# WARNING: Use _PIP_ADDITIONAL_REQUIREMENTS option ONLY for a quick checks
# for other purpose (development, test and especially production usage) build/extend Airflow image.
_PIP_ADDITIONAL_REQUIREMENTS: ${_PIP_ADDITIONAL_REQUIREMENTS:-}
volumes:
- ${AIRFLOW_PROJ_DIR:-.}/dags:/opt/airflow/dags
- ${AIRFLOW_PROJ_DIR:-.}/logs:/opt/airflow/logs
- ${AIRFLOW_PROJ_DIR:-.}/config:/opt/airflow/config
- ${AIRFLOW_PROJ_DIR:-.}/plugins:/opt/airflow/plugins
user: "${AIRFLOW_UID:-50000}:0"
depends_on:
&airflow-common-depends-on
postgres:
condition: service_healthy

services:
postgres:
image: postgres:13
environment:
POSTGRES_USER: airflow
POSTGRES_PASSWORD: airflow
POSTGRES_DB: airflow
volumes:
- postgres-db-volume:/var/lib/postgresql/data
healthcheck:
test: ["CMD", "pg_isready", "-U", "airflow"]
interval: 10s
retries: 5
start_period: 5s
restart: always

airflow-webserver:
<<: *airflow-common
command: webserver
ports:
- "8080:8080"
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8080/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully

airflow-scheduler:
<<: *airflow-common
command: scheduler
healthcheck:
test: ["CMD", "curl", "--fail", "http://localhost:8974/health"]
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully

airflow-triggerer:
<<: *airflow-common
command: triggerer
healthcheck:
test: ["CMD-SHELL", 'airflow jobs check --job-type TriggererJob --hostname "$${HOSTNAME}"']
interval: 30s
timeout: 10s
retries: 5
start_period: 30s
restart: always
depends_on:
<<: *airflow-common-depends-on
airflow-init:
condition: service_completed_successfully

airflow-init:
<<: *airflow-common
entrypoint: /bin/bash
# yamllint disable rule:line-length
command:
- -c
- |
if [[ -z "${AIRFLOW_UID}" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: AIRFLOW_UID not set!\e[0m"
echo "If you are on Linux, you SHOULD follow the instructions below to set "
echo "AIRFLOW_UID environment variable, otherwise files will be owned by root."
echo "For other operating systems you can get rid of the warning with manually created .env file:"
echo " See: https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#setting-the-right-airflow-user"
echo
fi
one_meg=1048576
mem_available=$$(($$(getconf _PHYS_PAGES) * $$(getconf PAGE_SIZE) / one_meg))
cpus_available=$$(grep -cE 'cpu[0-9]+' /proc/stat)
disk_available=$$(df / | tail -1 | awk '{print $$4}')
warning_resources="false"
if (( mem_available < 4000 )) ; then
echo
echo -e "\033[1;33mWARNING!!!: Not enough memory available for Docker.\e[0m"
echo "At least 4GB of memory required. You have $$(numfmt --to iec $$((mem_available * one_meg)))"
echo
warning_resources="true"
fi
if (( cpus_available < 2 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough CPUS available for Docker.\e[0m"
echo "At least 2 CPUs recommended. You have $${cpus_available}"
echo
warning_resources="true"
fi
if (( disk_available < one_meg * 10 )); then
echo
echo -e "\033[1;33mWARNING!!!: Not enough Disk space available for Docker.\e[0m"
echo "At least 10 GBs recommended. You have $$(numfmt --to iec $$((disk_available * 1024 )))"
echo
warning_resources="true"
fi
if [[ $${warning_resources} == "true" ]]; then
echo
echo -e "\033[1;33mWARNING!!!: You have not enough resources to run Airflow (see above)!\e[0m"
echo "Please follow the instructions to increase amount of resources available:"
echo " https://airflow.apache.org/docs/apache-airflow/stable/howto/docker-compose/index.html#before-you-begin"
echo
fi
mkdir -p /sources/logs /sources/dags /sources/plugins
chown -R "${AIRFLOW_UID}:0" /sources/{logs,dags,plugins}
exec /entrypoint airflow version
# yamllint enable rule:line-length
environment:
<<: *airflow-common-env
_AIRFLOW_DB_MIGRATE: 'true'
_AIRFLOW_WWW_USER_CREATE: 'true'
_AIRFLOW_WWW_USER_USERNAME: ${_AIRFLOW_WWW_USER_USERNAME:-airflow}
_AIRFLOW_WWW_USER_PASSWORD: ${_AIRFLOW_WWW_USER_PASSWORD:-airflow}
_PIP_ADDITIONAL_REQUIREMENTS: ''
user: "0:0"
volumes:
- ${AIRFLOW_PROJ_DIR:-.}:/sources

airflow-cli:
<<: *airflow-common
profiles:
- debug
environment:
<<: *airflow-common-env
CONNECTION_CHECK_MAX_COUNT: "0"
# Workaround for entrypoint issue. See: https://github.com/apache/airflow/issues/16252
command:
- bash
- -c
- airflow

# You can enable flower by adding "--profile flower" option e.g. docker-compose --profile flower up
# or by explicitly targeted on the command line e.g. docker-compose up flower.
# See: https://docs.docker.com/compose/profiles/

volumes:
postgres-db-volume:

I have modified docker-compose to not use Redis and am using a local executor to set this up locally. You can look at the original and read all about setting up using docker in detail here

mkdir -p ./dags ./logs ./plugins ./config

next, run the above in your root dir

Let's write a simple code and run using Docker

In your DAG folder create a file and you can use the below code

from datetime import datetime, timedelta

from airflow import DAG
from airflow.operators.bash import BashOperator


with DAG(
dag_id='my_dag',
description='my first basic dag',
start_date=datetime(2024, 1, 20, 2),
schedule_interval='@daily'
) as dag:
task1 = BashOperator(
task_id='first_task',
bash_command="echo hello from first task"
)

task2 = BashOperator(
task_id='second_task',
bash_command="echo hello from second task"
)

task3 = BashOperator(
task_id='thrid_task',
bash_command="echo hello from third task"
)

task1 >> [task2, task3]

That's how you declare a DAG, you can read more about using it here.

In the above code, we are giving a basic name and description to the DAG, we are defining tasks inside as discussed before we have many operators, even custom ones to write our code. Here we are using the BASH operator. Workflow is scheduled daily and it starts from the date 20th Jan 2024 (when you're running if the date is Jan 24, 2024, it’ll run 3 times to your current date, make sure you change it, if you want to have fewer runs). You can customize it according to your requirements.

There are 3 tasks defined in total.

task1 >> task2 >> task3

defining it like this makes the task run sequentially, one after the other

task1 >> [task2 , task3]

we are making task 2 and task 3 run parallel after the completion of task 1

How the project demo looks in VS Code.

docker-compose up -d

run this in your root next and let docker set everything up automatically. Wait for a few minutes, run docker ps, and verify all the containers have started. visit localhost:8080 and use the user and password as airflow, If you have not changed this part in the docker-compose file.

You can see the DAG completed 3 runs when it was enabled on 24th Jan 2024. It's scheduled daily so it’ll run daily.

Clicking on the DAG you can select graph to see the structure of the DAG. As we had set up Task 2 and Task 3 to run in parallel after completion of Task 1, the graph looks like this. you can get more insights like the execution time of each task and its logs generated, when the task was started etc.

You can select each task from the graph and check logs for that particular instance. here you can see the log of the first job instance.

So that’s it, the basic overview of Airflow and its working.

See you again. Feel free to ask questions. Thanks for reading.

--

--