Orchestrating Data Pipelines with Prefect on GCP Infrastructure

Paul Nwosu
13 min readFeb 17, 2023

So far, we have explored a variety of tools and methods for building data pipelines, allowing us to learn the fundamental principles of this important process. For the fifth installment of my “Building Data Applications on Google Cloud” series, I delved into the world of Prefect — a data orchestration tool that is rapidly gaining popularity in the industry.

In this project, we will be using Prefect to orchestrate the data pipeline that we built in our previous project, we will then build the pipeline into a Docker image and deploy it on Google’s Compute Engine infrastructure.

Project Structure

  • Introduction to Orchestration
  • Introduction to Prefect
  • Setting up your Prefect Cloud Environment
  • Writing your First Flow
  • Deploying the Flow on Google Cloud Environment
  • Summary

Previous Articles

Introduction to Orchestration

In the context of data pipelines, orchestration refers to the process of managing and coordinating the various tasks and dependencies required to move data from source to destination. Orchestration involves designing, scheduling, and executing a series of automated data processing steps to ensure that the data is transformed, enriched, and delivered to the desired destination in a timely and reliable manner. Data orchestration involves integrating different technologies, platforms, and tools into a cohesive workflow to streamline data processing, increase efficiency, and reduce errors.

Here are some tools commonly used for data orchestration and a brief description of each:

  1. Apache Airflow: A popular open-source platform for creating, scheduling, and monitoring complex workflows. Airflow is highly customizable and integrates with a wide range of data processing and storage technologies.
  2. Apache NiFi: An easy-to-use, open-source platform for building data pipelines. NiFi features a visual interface for designing and monitoring data flows, as well as support for data enrichment, filtering, and routing.
  3. Apache Beam: An open-source platform for building batch and streaming data processing pipelines. Beam provides a unified programming model for processing data in different languages and on different execution engines, including Apache Spark, Google Cloud Dataflow, and Apache Flink.
  4. AWS Step Functions: A fully managed service for building serverless workflows. Step Functions lets you design, test, and deploy workflows that integrate with a wide range of AWS services, including AWS Lambda, AWS Batch, and Amazon SageMaker.
  5. Azkaban: An open-source workflow manager for Hadoop. Azkaban provides a web-based interface for creating and managing workflows, as well as support for scheduling and parallel execution.
  6. Luigi: An open-source Python framework for building data pipelines. Luigi provides a simple interface for defining dependencies between tasks, as well as support for scheduling and error handling.

Introduction to Prefect

Prefect is another tool for data orchestration that has gained popularity in recent years. It is an open-source workflow automation system that enables users to build, schedule, and monitor complex data pipelines. Prefect is designed to be flexible and easy to use, with a focus on developer productivity.

One of the key features of Prefect is its ability to define workflows as code, using a simple and intuitive Python-based API. This makes it easy to version control, test workflows, and enables developers to quickly iterate and make changes. Prefect also provides a web-based dashboard for monitoring and managing workflows, with detailed logging and visualization capabilities.

Prefect supports a wide range of data processing and storage technologies, including databases, messaging systems, cloud storage, and more. It also has built-in support for distributed computing, allowing workflows to be executed across multiple machines or cloud instances.

In addition to the open-source version, Prefect offers a commercial platform with additional features such as advanced monitoring, collaboration tools, and enterprise-level support. Overall, Prefect is a powerful and flexible tool for data orchestration, suitable for organizations of all sizes and industries.

You can read more on Prefect here: https://docs.prefect.io/

Setting up your Prefect Cloud Environment

Before we can begin using Prefect, it’s important to first understand what Prefect Cloud is. Prefect Cloud is a platform that provides an easy, powerful, and scalable way to automate and monitor managed workflows built with Prefect, without the need to worry about moving your data. Essentially, it’s an Orchestration as a Service tool that enables you to deploy your pipelines and monitor their progress in real-time.

To prepare your environment for Prefect workloads, follow these steps:

Create a Workspace

Log into https://app.prefect.cloud/ preferably with your Google Account. A workspace is an isolated environment within Prefect Cloud for your flows and deployments. You can use workspaces to organize or compartmentalize your workflows.

The above link will help you get started in no time.

Installing Prefect

To get started with Prefect, you can follow the installation instructions provided in the link above. It’s worth noting that this is part of my Google Cloud Series, and we’ll be working with a virtual machine hosted on Google Cloud.

For the purpose of this tutorial, I’ll be installing Prefect within a virtual environment on my remote machine, as the tool can be easily installed using pip.

To begin, simply git clone the project repository using the following command:

$ git clone https://github.com/paulonye/prefect_gcp.git

Navigate into the directory of the folder and create a virtual environment using python -m venv env. This will create a env/ folder.

To activate the virtual environment, use:

$ source env/bin/activate

To confirm that the virtual environment is working, you should see the name of the virtual environment you set in front of your terminal:

Install the libraries and modules (including Prefect and all of its dependencies) in the virtual environment using:

$ pip install -r requirements.txt

To confirm that Prefect has been installed in the virtual environment, run the command prefect --version .

Log into Prefect Cloud from the Terminal

Use the prefect cloud login Prefect CLI command to log into Prefect Cloud from your environment.

$ prefect cloud login

The prefect cloud login command, used on its own, provides an interactive login experience. Using this command, you may log in with either an API key or through a browser. Select the API key.

To set an API key:

  • Log into your Prefect cloud account
  • Click on the icon found in the lower left corner, and select the settings symbol.
  • Click on the API Key tab and follow the instructions to create a key.

Once you have gotten the API key, go back to your terminal and paste the key. Prefect automatically saves this key in the .prefect folder located in the home directory so that you can log into your Prefect workspace anytime.

Writing your First Flow

A flow is the most basic Prefect object that is a container for workflow logic and allows you to interact and understand the state of the workflow. Flows are like functions, they take inputs, perform work, and return an output. We can start by using the @flow decorator to a main_flow function.

from prefect import flow, task

Flows contain tasks, which can be added by adding the @task decorator. Tasks are not required for flows but tasks are special because they receive metadata about upstream dependencies and the state of those dependencies before the function is run, which gives you the opportunity to have a task wait on the completion of another task before executing.

@flow(name='Flow-Postgres-ETL')
def main(url: str, tablename: str):

url = url

tablename = tablename

data = capture_data(url)

df = trans_df(data)

batch(df, tablename)

The code snippet above is taken from the pipeline.py file in the Git repository. The name of the flow in this file is “Flow-Postgres-ETL”, with the main function serving as its host.

In this file, there are three tasks defined as separate functions. By dividing the workflow into multiple functions, we can gain greater visibility into the tasks that are running and identify any that may be causing failures. This approach helps to simplify troubleshooting and debugging during the pipeline’s execution.

  • Extract the Data from Yahoo Finance Website (task 1)
  • Transform the Data by cleaning up certain columns (task 2)
  • Load the Data into a Postgres Database (task 3)

Getting Started with Prefect Blocks

The third task in our pipeline involves loading data into a Postgres database, which requires the integration of a Prefect Block.

Blocks are a fundamental component of Prefect, they enable the storage of configurations and provide an interface for interacting with external systems. There are several types of blocks available, and users can even create their own. Block names are immutable, which allows them to be reused across multiple flows. Additionally, blocks can be built upon or installed as part of integration collections, which are pre-built tasks and blocks that can be installed via pip.

For our project, we’ll be using the SqlAlchemy block to set our database configuration. This block allows us to set up the configuration for our database without having to input the details directly into our code or as environment variables, as we did in our previous tasks.

To create a SqlAlchemy block, you need to do the following:

  • Navigate to your Prefect Cloud Workspace
  • Click on the Name of your Workspace
  • Navigate to the Blocks Tab, and click on the + icon to create a new block
  • Prefect provides a list of blocks available for use, scroll down to find the SqlAlchemy block.
  • Click on Add +, and input the necessary details such as the external IP address of the database, port, username, password, and database name. Other details include setting a postgres+psycopg2 sync driver. The name of the block should be postgres-connector as it will be referenced in the pipeline.py .

The final configuration should look like this:

Note: You should not expose your Postgres credentials to anyone to prevent unwanted access to your database.

You can also create a block from a python file using code. Prefect provides you with a lot of documentation that you can use to facilitate your data pipeline and integrate it with other tools.

In case you don’t find the SqlAlchemy block on the blocks page, you can create it from the terminal by running:

prefect block register -m prefect_sqlalchemy

Check out this link to understand how to utilize more Prefect blocks.

Go through the pipeline.py script in detail to understand how the code works. It’s pretty straightforward to read and understand.

Test that the code works by running python pipeline.py script on the terminal.

Deploying the Flow on Google Cloud Environment

In Prefect, a deployment is a server-side concept that encapsulates a flow, enabling it to be scheduled and triggered via API. The deployment stores metadata about the location of your flow’s code and how it should be executed.

There are several methods to create a deployment. You can build the deployment on the command line or deploy using a Python file. For our project, we’ll be using the latter approach.

You should be able to find a deployment.py file in the Git repository. This file contains the code we’ll use to create a deployment. Here’s a snippet:

from prefect.deployments import Deployment
from pipeline import main
from prefect.infrastructure.docker import DockerContainer

docker_block = DockerContainer.load("zoom")

docker_dep = Deployment.build_from_flow(
flow=main,
name="docker-flow",
infrastructure=docker_block,
)

if __name__ == "__main__":
docker_dep.apply()

Before running the deployment file, we need to configure how we want our deployment to look.

Setting up the Infrastructure

Prefect uses infrastructure to create the environment for a user’s flow to execute. Infrastructure can only be used with a Deployment.

There are various tools we can use as our Prefect infrastructure, for this project, we are going to be using a docker container to host our code and its dependencies.

A) Build the docker image using the Dockerfile :

docker build -t prefectdocker:tag1 .

B) Create a block for your docker container: Go to your Prefect cloud workflow user interface and navigate to the blocks tab to create a block for your docker container.

C) Set the configuration details for the docker-container block; set the block name to ‘zoom’, and then add the name of the docker image you built in the image row. Leave all other settings as default and create the block.

Your Final block should look like this:

Build the Deployment

You can review the deployment.py file to gain a better understanding of how the Docker block is used. Once you're ready to deploy, simply run the command python deployment.py in your terminal.

To check your deployment, navigate to the “Deployments” tab in your Prefect workspace. If you haven’t made any changes to the deployment.py file, you should see a deployment named "Flow-postgres-ETL:docker-flow". Click on the "docker-flow" portion to access the details.

To configure the deployment, click the three dots at the top-right corner of the UI and select “Edit”. Scroll down to the “Parameters” section and enter the default URL and table name found in the pipeline.py file. Note that this parameter can be updated at any time. Finally, save your changes.

To start the deployment, you will need to configure a Prefect agent and it will automatically pick up the scheduled deployment. You should see the deployment status change from "Scheduled" to "Running" in the Flow runs tab.

Running the Deployment

When it comes to running a deployment, it’s quite different from running a flow from a Python script. In order to run a deployment, you’ll need to launch a Prefect agent on your environment. This agent will then pick work queues from your Prefect workspace and execute the flow in your terminal.

In other to run our deployment conveniently, we could create a TMUX session. TMUX is a terminal multiplexer that allows you to manage multiple terminal sessions within a single window or SSH session. With TMUX, you can also detach from a session and reattach later, which makes it possible to start a long-running process in a TMUX window, detach from the session, and later reattach to the same session to check on the progress of the process.

Once you’ve created a TMUX session, you can deploy your Prefect agent on the terminal. Follow the commands below to get started with TMUX.

tmux
#this opens a new terminal window

source emv/bin/activate
#activate the virtual environment where you installed prefect

prefect agent start -q "default"
#deploy the prefect agent to pickup workqueues using the default tag

ctrl+D, B
#detach from the tmux window

tmux ls
#list the tmux sessions currently running

tmux attach -t 0
#reattach to the tmux session

tmux kill-session -t 0
#kill the session

Once you have configured your agent in your running environment, go back to your Prefect workspace to monitor the workflow.

Note: Tmux usually comes preinstalled with your remote instance

For more information on deployments, ensure to check out the Prefect docs here.

Note: Authentication is a crucial aspect of your workflows for both Prefect Cloud and Google Cloud Environments, and it is essential to ensure that your environments are authenticated properly.

For Prefect, you will need to configure agents and work pools that can create flow runs for deployments in remote environments. To achieve this, you must ensure that the PREFECT_API_URL is correctly set for the environment in which your agent is running. If you want the agent to communicate with Prefect Cloud from a remote execution environment such as a VM or Docker container, it is crucial to configure the PREFECT_API_URL in that environment. Use the prefect config view command to view your credentials.

For your Google Cloud environment, you need to make sure that you can access your cloud resources from your terminal. You can confirm that you are in the correct project by using the gcloud config listcommand. You might also need to authenticate to allow access from third party applications like python using the gcloud auth application-default login .

Scheduling the Deployment

Go to your Prefect workspace and navigate to the Deployment tab to check your deployment. Click on the “docker-flow” portion and locate the three dots at the upper right corner of the UI, then select “Edit”.

Scroll down to the “Schedule” section, click on “Add”, select the “Cron” tab, and set your cron job according to your desired schedule. After setting your cron job, remember to save your changes.

Summary

In this project, we set up an ETL pipeline using Prefect, a workflow management system. We created a Prefect project, defined a flow in a Python file, and deployed it to a Docker container in a remote environment. We then authenticated our environment to ensure secure access to both our Prefect Cloud and Google Cloud resources. Finally, we scheduled our deployment to run at specific intervals using a cron job in the Prefect UI. This project provides a good starting point for building more complex data pipelines using Prefect.

In conclusion, building automated ETL pipelines is essential for efficient and effective data analysis. Prefect and Docker provide powerful tools to simplify the creation of such pipelines. With Prefect, you can create, schedule, and monitor your workflows with ease, while Docker enables you to package your code and its dependencies into standardized units for easy deployment. By combining Prefect and Docker, you can build robust, reliable, and scalable ETL pipelines that can handle a wide range of data sources and transformations.

--

--