A Beginner’s Guide to Apache Airflow with GCP Composer.
An effective approach to orchestrating data workflows.
Table of Content
- Who Should Read This Guide
- Introduction to Apache Airflow
- The need for Apache Airflow
- Basic Terminology in Apache Airflow
- Types of Tasks
- Airflow Deployment Architecture
- Setup and Installation
- Creating a Simple ETL Workflow with Airflow
- Conclusion
- References
Who Should Read This Guide?
If you are reading this, you likely fall into one of three categories:
- A data professional already familiar with Apache Airflow and would not mind another perspective.
- A data professional experienced with traditional schedulers, curious about learning about Apache Airflow, and eager to explore its capabilities.
- A lifelong learner, always seeking to expand your knowledge and excited to discover the potential of this powerful framework.
This beginner’s guide will be presented in two series. The first of the series is an introduction to the core concepts of Apache Airflow, while the second part would explore more advanced topics that further highlight the powerful capabilities of Apache Airflow as a must-have tool in the arsenal of a data professional.
Introduction to Apache Airflow
What is Airflow?
Apache Airflow is an open-source platform designed for orchestrating complex workflows and data pipelines. It allows users to programmatically author, schedule, and monitor workflows using Directed Acyclic Graphs (DAGs). The official documentation defines it as:
An open-source platform for developing, scheduling, and monitoring batch-oriented workflows.
In today’s data-driven world, Apache Airflow stands out as a versatile workflow orchestration tool. By allowing users to define workflow as code, it empowers organizations to create custom, efficient pipelines that precisely match their diverse business needs and complex data processes.
The Need for Apache Airflow
Just like with every other data product or tool that emerges in the market to solve a particular problem or address the limitations that other tools face when addressing a particular situation, Airflow offers a robust alternative to traditional schedulers when dealing with workflow orchestration and scheduling, effectively tackling several key limitations such as:
- Error handling.
- inability to handle complex workflows.
- Changes are not traceable.
- Environment limited, such as SQL Server Agent used to schedule SSIS Packages.
- Processing Historic Data.
Basic Terminology in Airflow
- DAG: Short for Directed Acyclic Graph, this is the core concept of Airflow and it is representative of the workflow, which consists of one or more tasks, the dependencies of these tasks, and relationships that determine in what order they run.
- Tasks: Basic unit of execution in Airflow. These building blocks come together to form the high-level workflow represented as a DAG. Tasks have relationship between one another, defined as an upstream or downstream dependency which determines the order in which the tasks runs. In the above image, the tasks are identified as A,B,C,D.
- Relationships: A crucial aspect of working with tasks in Airflow is defining the relationships between them, which are referred to as dependencies. These are known as upstream and downstream tasks. First, you define the tasks themselves and then establish how they depend on one another.
There are two ways of declaring dependencies between tasks
- Using the >> and << (bitshift) operators
task_a >> task_b >> [task_c, task_d]
#here task_a runs before task_b and task_c,and task_d but task_c and task_d can run parallely.
2. The more explicit set_upstream and set_downstream
task_a.set_downstream(task_b) #points to the downstream(next) task
task_c.set_upstream(task_b) #points upward to the previous task
Types of Tasks
Operators
Airflow provides several types of operators either directly or via their partner providers, such as:
- Action Operators: For performing specific actions, like executing a Bash command (`BashOperator`), running a Python function (`PythonOperator`) or sending an Email (`EmailOperator`).
- Transfer Operators: For moving data between systems, such as Azure Blob Storage to GCS (`AzureBlobStorageToGCSOperator`).
- Custom Operators: The functionality of operators can be extended by the creation of custom operators via plugin to define specific task behaviors.
Sensors
Sensors are a special type of operator that waits for a certain condition to be met before moving forward in the workflow. It could be a FileSensor waiting for a file to appear at a specific location , a TimeSensor waiting to execute at a specified time of the day or an ExternalTaskSensor that monitors the status of an external task.
Airflow Deployment Architecture
Discussing the architecture of Airflow is quite important as it provides more knowledge of the various components and their functions and how this supports the entire Airflow system in orchestrating complex workflows.
At the minimal, an Airflow installation consists of the following components:
- Scheduler: This component handles the triggering of scheduled workflows and submitting the tasks that make up the workflows to the executor. Depending on the deployment architecture, the executor runs with the scheduler process and not as a separate component.
- Queue (Message Broker): Serves as a medium through which tasks are dispatched to workers based on the order of their execution (relationship) as defined by schedule time or other task dependencies.
- Webserver: Provides a user-friendly interface to monitor, trigger, and debug workflows and tasks.
- Dag Folder: This folder contains the Dag files (python scripts) that contain the workflow as code. The files in this folder are parsed by the scheduler before they get executed
- Metadata Database: This is a relational database that serves as the central component that stores information about the state of DAGs, tasks, DAG runs, task instances, and retries. The scheduler updates the metadata database with DAG run details and task statuses. Workers also update the metadata database with task execution results (running, failed or successful). The webserver reads from this database to display information to the user.
In the basic deployment architecture:
- Users author DAG files containing workflow code and upload them in the DAG folder.
- The Scheduler periodically scans the DAG folder for new or updated files.
- The Scheduler parses DAG files, extracting DAG information such as schedule intervals, task parameters, and task dependencies.
- If parsing is successful, the Scheduler updates the Metadata database with DAG information.
- Based on task dependencies, the scheduler queues individual tasks for execution.
- The Worker executes tasks and updates the Metadata database with task states (running, failed, successful). Also, based on the task parameters stored in the Metadata database, the worker can retry a failed task.
- The Web Server accesses the Metadata Database to display workflow and task information to users.
In the above architecture the Scheduler, Web Server and Worker are on a single node.
In the multi-node deployment architecture:
- The scheduler and webserver are hosted on a different node from the workers. This is the major difference between the simple and multi-node deployment architectures.
- Standalone DAG processor that parses the files in the DAG folder and communicates DAG information to the metadata database; in this scenario, the scheduler does not have access to the DAG files.
- The scheduler continually checks the metadata database for DAG schedules to determine when to run a workflow and the tasks within the workflow. When a task is ready to be executed based on its schedule or dependencies, the task is placed in the queue.
- Worker nodes listen to the queue for available tasks, in this deployment, multiple worker nodes can be deployed to handle tasks in parallel, providing scalability.
The components still serve the same functions in both deployments. however the multi-node deployment is geared towards isolation for security purposes and scalability for large workflows that need parallelization.
Setup and Installation
Apache Airflow, being an open source framework, can be installed on your local environment for free using Docker. However, for the purpose of this post, which aims to introduce Airflow without dwelling much on the setting up and environment configuration, I will be using the GCP offering of Apache Airflow through the service called Composer.
Before you can spin up the composer environment in GCP, you must have a Google cloud account. If you do not have one, that’s okay, as you can quickly create one and get $300 in free credit to try out some of the services offered. Here is the Link to creating a Google cloud account.
After setting up your Google Cloud account, follow the steps below to set up a composer environment.
- Click on Create Environment. I used Composer 3.
- The next page is for the configuration of the composer environment. Give it a name, go with the default configuration, and click Create.
- When the environment has been created, you will see the following:
- Clicking on the DAGs folder link, you will see the cloud storage bucket associated with the Composer environment. This is where you will upload the DAG file. In this case (simple_ETL.py). It comes with a default DAG file (airflow_monitoring.py)
- Clicking the Airflow webserver link on the landing page for the composer environment. You see the DAGs available based on the DAG files present in the bucket environment. The name Daily_CSV_Download is the name given to the DAG declaration in the simple_ETL.py. More on this later.
Apache Airflow being open source, can be installed for free on your local environment using Docker, and for those who are interested in looking into this, I will suggest taking a look at this less than 10 minute video on how to do this.
Creating a Simple ETL Workflow with Airflow.
I will be simulating an ETL scenario to give you a proper understanding of how a workflow as code defined as a DAG will be implemented using Airflow.
Overview Of The Sample Scenario
To simplify the process of simulating the frontend of an e-commerce application that records daily transactions in a database, I will use a CSV file uploaded to GitHub daily as the primary data source. The workflow will then consist of:
- Downloading the file into a Google Cloud Storage (GCS) staging bucket.
- Transform the retrieved data and load it into a temporary GCS bucket, where it is held before being transferred to a BigQuery table.
- Create a table in BigQuery to load the data.
- Perform housekeeping by moving the raw data from the staging bucket to an archive bucket, and then delete the temporarily created bucket.
Sample Code
I will be giving a detailed breakdown of the code block sections to help understand what the above workflow as a code is doing.
Imports
The import section consists of familiar libraries and modules such as requests, pandas, re etc. It also consists of modules you may not be familiar with, essentially operators that help to define the behavior and execution logic of a task.
Airflow consists of basic operators such as the PythonOperator and also operators provided by providers such as Google Cloud or Azure. These other operators from other providers are tailored to fit the services they offer to enable efficient connection and execution; for example, the GCSToGCSOperator executes a transfer operation between two GCS buckets. The BigQueryCreateEmptyTableOperator helps create a table in a specified dataset in BigQuery.
Declaring a DAG
A DAG can be declared in three ways, one of which is shown in the code above
Firstly, using the with statement (context manager), which will add anything inside it to the DAG implicitly, as seen in the code above.
Secondly, as a standard constructor, where the DAG is passed into any operators that define a task in the workflow.
Thirdly, using the “@dag” decorator to turn a function into a DAG generator.
When declaring the DAG, there are important parameters that must be passed to the DAG such as dag_id, start_date, schedule_interval, cathup. The default_args is a dictionary of default parameters that can be passed to both DAGs and Operators, which allows for setting common parameters like start_date, retries. A look at the base operator class from which all operators inherit shows an overlap in parameters between DAG and the operators, and some people wrongly assume that the DAG class inherits from the base operator.
These overlap in parameters are designed to provide consistency and ease of use when defining workflows and tasks. By allowing similar parameters, Airflow enables users to set global defaults at the DAG level and override them at the operator (task) level as needed. This makes sense since the DAG is the high level plan made up of several tasks, and so parameters can be set at a high level, which then flows down but can be customized at the individual task level. A simple example would help clarify this.
After the DAG has been declared, come the tasks, which are defined by the different operators.
From simple_etl.py there are 10 tasks in this DAG defined with 6 different operators (the PythonOperator is used twice to call two different functions, and the DummyOperator is used twice to define two dummy tasks).
Task 0
Task 0 is defined by a dummy operator; think of a dummy operator as a placeholder in the workflow. It does not define any task, but it is used mainly to mark a specific point in the workflow; think of it as a print statement. Here it “prints” out the start of the workflow.
Task 1
The next task is defined by a PythonOperator; this operator works by calling a python function, The python function called by this task is the download_csv_to_gcs function
Task 2
Task 2 is defined by the GCS0bjectExistenceSensor; this is an example of a sensor operator and in this workflow checks if a file is present in the present in a bucket before proceeding with the workflow.
Task 3
Task 3 is defined by the GCSCreateBucketOperator, this operator creates a new bucket, which serves as the temporary bucket that holds the transformed data.
Task 4
This is defined by another PythonOperator that calls the transform_csv function that, does two simple transformations (cleaning the product_id column and splitting the name column into first name and last name) to the csv file in the staging bucket and moves this file to the temporary bucket created in task 3
Task 5
Creates an empty BigQuery table where the csv file would be loaded into; if the table does not exist with the BigQueryCreateEmptyTableOperator, quite a mouthful if you ask me. This operator takes parameters such as the table_id, the dataset the table would be created in, and the schema_fields. etc.
Task 6
The GCSToBigQueryOperator loads the file from the temporary GCS bucket into the newly created table in the task.”
Task 7
This task deletes the temporary GCS bucket that holds the transformed data before it is loaded into the BigQuery table. Since the bucket is no longer needed after the data transfer, it is removed to free up resources. The bucket will be recreated during the next run of the DAG.
Task 8
This task uses the GCSToGCSOperator to transfer the raw files from the staging bucket to an archive bucket. This practice is common in ETL processes, as it ensures that raw data is preserved for potential reprocessing and historical record-keeping. By moving files from the staging area to the archive, organizations maintain a reliable backup of the original data.
Task 9
This uses the dummy operator to “print” out the end of the task.
Relationship definition
The order of execution of the tasks and their dependency is defined as below:
From the Airflow webserver page, clicking on the Daily_Csv_Download DAG, the following page is displayed, providing information on the DAG such as the tasks name, order of execution etc.
I manually triggered the DAG to give some errors (I omitted the create temp bucket task), i.e., there was no bucket to load the transformed file into.
Rerunning the DAG with the creat_temp_bucket task now present as seen below gives the following:
The green boxes opposite every task indicate that all the tasks were successful. By clicking on the Details tab, you get information such as how many times the DAG has been run, the number of successes or failures, how long it took the DAG to run, e.t.c.
The daily_sales_table is populated with the transformed data as shown below and every time data is uploaded to the git repo and the DAG runs, that data is appended to this table.
Conclusion
In this article, we’ve explored Apache Airflow as a powerful tool for orchestrating complex workflows and data pipelines, making it an essential asset for data professionals. This guide has introduced you to the fundamental concepts of Airflow, including its architecture, core components, and basic terminology. By understanding how to define workflows as code using Directed Acyclic Graphs (DAGs), you can leverage Airflow to automate and streamline your data processes.
See you in my next blog post, and you can always connect with me on LinkedIn. Always happy to connect. Cheers :)
References
[1] Apache Airflow. ([Accessed September 20, 2024]). Apache Airflow Documentation: Stable. Retrieved from https://airflow.apache.org/docs/apache-airflow/stable/index.html
[2] Apache Airflow. ([Accessed September 22, 2024]). DAGs [Stable]. Retrieved from https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#dags
[3] Apache Airflow. ([Accessed September 22, 2024]). Tasks [Stable]. Retrieved from https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#tasks
[4] Apache Airflow. ([Accessed September 20, 2024]). Relationships [Stable]. Retrieved from https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/tasks.html#relationships
[5] Apache Airflow. ([Accessed August 28, 2024]). Operators [Stable]. Retrieved from https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/operators.html#operators
[6] Apache Airflow. ([Accessed September 20, 2024]). Sensors [Stable]. Retrieved from https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/sensors.html