A data engineering project with Prefect, Docker, Terraform, Google CloudRun, BigQuery and Streamlit

Orchestrate data pipelines to find late buses in real-time — from raw data to live data visualisation

Ryan Lamb
17 min readMar 16, 2023
Photo by chan lee on Unsplash

This article demonstrates how to set up a data pipeline with Google Cloud Services, orchestrate it with Prefect and present the transformed data to users in a live web app with Streamlit. The transformed data is also consolidated in BigQuery, ready for further analysis.

This workflow is useful because it provides an example of how data professionals, from data engineers to data scientists, can easily create and manage the full end-to-end process in delivering live, actionable insights to end-users. The combination of modern tools used, such as Prefect and Streamlit, makes it accessible to a wide range of skillsets. Even if you’re just starting out and want to learn the ropes, this entire project can be built and run for free (for a fixed period of time) using Google’s generous free trial.

The data used is from the UK Bus Open Data Service and the final app displays late-running buses in real-time on an interactive map. The workflow can, however, be replicated with any data of your choice — I’ve purposely chosen to focus more on the workflow and pipeline development process and less on the data itself.

By the end of this project you’ll have:

  • Created and managed cloud resources with Terraform.
  • Changed ETL functions into tasks and flows that will be orchestrated with Prefect.
  • Created Prefect blocks to manage authentication and interaction with Google Cloud resources.
  • Created deployments to manage flow scheduling.
  • Created a Docker image for the script execution environment.
  • Pushed the Docker image to Google Artifact Registry.
  • Updated the Prefect deployment for Cloud Run to run the scripts.
  • Set up the Prefect agent on a Compute Instance.
  • Created a data visualisation app with Streamlit.

The live app can be viewed here and the full project code here.

Image by author

1a. Project setup: Development environment

I use VS Code as my editor and there are a number of useful tools and libraries to set up before you begin development. Some examples:

  1. Black

Black is a Python code formatter. It helps enforce a consistent code style (especially helpful if you have multiple developers working together), and speeds up development as you can spend less time formatting your code and more time coding.

Once Black is installed, you can simply run it on your Python file or a whole directory:

black {source_file_or_directory}

If you are using VS Code, you can set Black to automatically format your code whenever you save. Once Black is installed, go to VS Code settings, then check Editor: Format on Save:

Image by author

2. Flake8

Flake8 is a code linter that will check your code’s compliance to PEP8 coding conventions.

Note that there may be some conflicts between Black and flake8, so we can create a setup.cfg file to adjust some of flake8’s settings so that it is compatible with Black:

[flake8]
ignore = E203, E266, E501, W503
max-line-length = 88
max-complexity = 18
select = B,C,E,F,W,T4
exclude =
venv
.venv
.git,
__pycache__,
docs/source/conf.py,
old,
build,
dist

3. Pre-commit hooks

Pre-commit is a framework that allows you to run scripts before committing your code. If the script fails then the commit isn’t made. We can run Black and flake8 as pre-commit hooks, so that whenever code is committed, Black will automatically format it and flake8 will check its compliance. If flake8 fails, then the commit is denied.

Once pre-commit is installed, you will need to define your pre-commit config file. Create a .pre-commit-config.yaml that will define the sources where the hooks are taken from. You can also reference local repos which you can read more about here.

repos:
- repo: https://github.com/psf/black
rev: stable
hooks:
- id: black
language_version: python3.6
- repo: https://gitlab.com/pycqa/flake8
rev: 3.7.9
hooks:
- id: flake8

Once you’ve configured your yaml file, run pre-commit install to install your hooks in your git directory. Now whenever you commit your code, both Black and flake8 will first run and you can make amendments as necessary.

4. Gitignore templates

If you aren’t certain about which files to include in your .gitignore file, you can use one of these comprehensive gitignore templates.

5. Pip-tools

Using pip-compile from pip-tools is helpful when generating your requirements.txt file. This will make sure that builds are predictable and deterministic. You create a requirements.in file containing only your direct project dependencies, then running

pip-compile .\requirements.in

will generate a requirements.txt file that will contain the modules listed in your requirements.in file as well as the transitive dependencies of those modules. For example:

Image by author

1b. Project setup: Google Cloud account

For this project we’ll be using the following services from Google Cloud:

  • Artifact Registry to store our Docker image
  • Cloud Run to run our Python scripts
  • Compute Engine to run our Prefect agent
  • Cloud Storage to store our data
  • BigQuery for our serverless data warehouse

Google offers a generous free trial that you can use to replicate this project or create your own using a wide variety of services.

Account setup and authentication

  1. Create a new project with a unique Project ID.

2. In the same project, create a new Service Account under IAM. A service account can be used to manage access to Google Cloud services.

3. In the new service account, click on Actions then Manage keys. Create a new key which will be downloaded as a JSON file — keep it safe!

Image by author

4. Install the Google Cloud SDK

5. Set your credentials environment variable to the location of your JSON key file:

export GOOGLE_APPLICATION_CREDENTIALS="your_key.json"

6. Authenticate with:

gcloud auth application-default login

which will redirect you to a browser login to sign in to your Google account. Once confirmed, you are authenticated and are able to locally interact with your Google Cloud environment.

Permissions and APIs

We need to add permissions to the Service Account so that we can interact with the various services we require. We are going to add broad admin permissions, but typically you would want to make permissions as restrictive as possible e.g. restrict access to a bucket/resource only.

  1. Go to IAM then next to your Service Account click on Edit Principal and assign the following roles:
Image by author

2. We still need to enable the APIs for the services we are going to use so that we can interact with them. Go to each of these APIs and enable them:

2. Infrastructure-as-Code with Terraform

Terraform is an open-source tool from Hashicorp that allows us to set up infrastructure resources from configuration files in a standardised way.

For example, instead of creating a Storage Bucket by clicking through multiple steps in the GCS console, we can simply define our bucket settings in a file and then create or destroy the bucket by running the file with Terraform. This gives us reproducibility and allows us to easily share infrastructure configurations. If we need to change anything in our infrastructure, we just update our file and apply it.

Hashicorp recommend a standard module structure — for our example we are going to have four files in our root Terraform directory:

  • .terraform.version
  • main.tf
  • variables.tf
  • terraform.tfvars

.terraform.version

This just contains the version of Terraform that we have installed e.g. v1.3.7

main.tf

This is the primary entrypoint and is where we define the resources to be created.

We first define the minimum Terraform version required and the backend to be used (where the state snapshot is stored).

Next we define our service provider (Google in this case). This is like importing a library with modules — Terraform will use the ‘Google’ library to import predefined resource types such as the Google Cloud Storage Bucket. We also add the GCS project ID that we created earlier, the region and, optionally, we can provide credentials (we don’t have to in our case as we have already authenticated with GCS).

Terraform registry providers. Image taken by author from Terraform website

We can then define the various resources that we want created. Terraform’s registry provides templates for creating resources with the various cloud providers. For example, here is the template for creating a storage bucket on GCS and here is a template for managing a container in an Azure storage account.

For this project I will only be creating the storage bucket and artifact registry with Terraform. The Compute Engine and BigQuery configuration will be discussed in later steps.

For our storage bucket below, we define its name and region, and for our artifact registry we define its ID, region and format (Docker in our case). These variables are prefixed with var and the actual values for these are declared in our separate tfvars file.

variables.tf

This file contains the declarations for our variables. Each input variable is declared with a variable block:

It contains the name of the variable, the type, a brief description and a default value if required. We can set our variable values in this file but I have put them in a separate .tfvars file.

terraform.tfvars

This is our variable definition file and is just one way of setting variables in the root module.

Terraform execution steps

Once we have our Terraform files set up, we can execute with the following steps. Note that there is a useful Terraform .gitignore template that can be added to your .gitignore file.

  1. terraform init

This initializes the working directory containing configuration files — you need to run this first after creating your Terraform files.

2. terraform plan

This checks your local changes and proposes an execution plan. For example, if you’ve added a new resource to your main.tf file, you should be able to see it by running this command. It essentially tells you what changes it will make, without executing them.

3. terraform apply

Once you’ve reviewed the plan and are happy to proceed with the changes, run this command to approve the proposed plan and apply the changes to your provider account e.g. create the bucket and artifact registry in GCP.

4. terraform destroy

This removes all of our resources that are listed in our configuration files — running this will delete the bucket and artifact registry from GCP (and their contents!).

3. Orchestrating ETL with Prefect

For this project I use data from the UK’s Bus Open Data Service. My goal is to compare live bus location data against the bus timetables in order to see which buses are running late in real-time.

Extract, transform and load data

Typically you would extract and store your raw data first and then transform it (e.g. see the Medallion Architecture), but in this smaller scale project I extracted and transformed the data together in single scripts:

  1. bus_live_locations.py : Pulls the live bus locations from the Open Bus Data GTFS feed for the area specified by the bounding box coordinates (Leeds, UK in this case). The data is saved as a parquet file and uploaded to the Google storage bucket.
  2. bus_timetables.py: Gets the latest bus timetable GTFS data for the relevant region, transforms it and loads the parquet file to the bucket.
  3. compare_bus_times.py: Gets the bus live location and timetable files from the bucket, transforms them and calculates which buses are currently late to arrive at their next bus stop. The output is a small csv file containing a list of late buses and their associated trip information. The csv is uploaded to the bucket.
  4. write_to_bq.py: Gets the late bus csv file and appends the data to our BigQuery table.

The above are relatively straightforward data extraction and transformation steps, and the scripts can be run individually in a local environment, but our aim is to have them run on a set schedule in our cloud environment. This is where our orchestration tool, Prefect, comes in.

Prefect overview and setup

Prefect is an orchestration tool that offers both a locally hosted UI and a cloud based UI (Prefect Cloud) to view and manage your workflows.

Authentication with Prefect Cloud

  1. Install Prefect on your local environment and also install the Prefect GCP module: pip install -U prefect and pip install 'prefect_gcp[cloud_storage]'.
  2. Sign in to Prefect Cloud.
  3. Create a new workspace and generate an API key.
  4. Log in to Prefect Cloud from your local environment: prefect cloud login. You will have the option of authenticating with the above API key or with a browser sign-in.
  5. Now when you run a flow locally (discussed below) you will see the flow execution in your Prefect Cloud UI.

Flows and tasks

In Prefect, flows and tasks are the building blocks of your data workflow. A flow is like a function that can take inputs and produce outputs. Tasks are typically single, discrete pieces of work within a flow. For example, our function to load the bus timetable data to the GCS bucket is a single task. Flows can call tasks or other flows (known as subflows).

The great thing about Prefect is that we can make our existing functions a task or a flow simply by using the task or flow decorator. Below you will see that the load_live_locations_to_gcs function has the @task() decorator and the get_live_bus_locations function is defined as a flow with @flow() — this flow calls the load_live_locations_to_gcs and get_live_gtfs tasks.

That’s it! Now when we run this flow we can observe the function’s execution in Prefect’s UI. Prefect automatically adds logging so that you can view your flow run time and final state.

Image by author

Creating a main flow with subflows

As mentioned, a flow can call another flow (a subflow). I’ve imported the flows from three separate scripts that need to run on the same schedule and are dependent on each other. The parent flow will then call each subflow separately, execute and log it — so we don’t need to explicitly run or schedule the other scripts. The wait_for command can be used to force Prefect to wait for the previous flow run to complete before progressing.

Here we get the live bus location data, compare it against the scheduled bus times and write the results to BigQuery.

Here we can see the three separate functions successfully being executed as subflows as part of the master flow run:

Image by author

Prefect blocks

Prefect blocks are a handy feature that allow us to store configuration data to interact with external systems. For example, authenticating with GCP or interacting with a bucket. Blocks can be managed in the UI or through code.

If a block doesn’t appear in your Prefect UI then you need to register it with the block register command e.g. prefect block register -m prefect_gcp.

Once you’ve created your block you can use it by simply calling it in your script. Here I load the GCS Bucket block and upload my data to the bucket:

The bucket block uses the credentials block for authentication. The same blocks can be used in multiple flows, so instead of having to manage credentials and changes to resource names in our code, we can simply change these details in our blocks.

Below are the six blocks used. The use of Cloud Run and GitHub blocks are discussed in the next section.

Image by author

Prefect deployment

A deployment allows us to trigger and schedule our flows instead of running them manually.

Build the deployment file with:

prefect deployment build path-to-script:flow-function-name

We can use the -n option to specify the name we want to call the deployment. This will create a deployment yaml file. One can provide default parameter values in the deployment file which will then be used by flows associated with this deployment.

Create the deployment:

prefect deployment apply name-of-deployment-yaml

and you will see your new deployment listed in the Prefect UI Deployments section.

Prefect scheduling

Schedules can be set for any flow deployment. The schedule can be defined with the deployment file or in the UI itself:

Image by author

Recap so far

Up to now we have:

  • Created and managed cloud resources with Terraform.
  • Changed our data ETL functions into tasks and flows that will be orchestrated with Prefect.
  • Created Prefect blocks to manage our authentication and interaction with our Google Cloud resources.
  • Created our deployments to manage flow scheduling.

Next steps:

  • Create a Docker image for our script execution environment
  • Push this Docker image to Google Artifact Registry
  • Update our Prefect deployment for Cloud Run to run our scripts
  • Set up the Prefect agent on a VM so that it can run in our cloud environment and not locally
  • Create our data visualisation app with Streamlit

4. Creating the execution environment with Docker and Cloud Run

There are multiple ways to have our scripts running in a cloud environment — we will be using Docker and Google Cloud Run.

Our Docker image will contain our base environment for execution and we’ll use Cloud Run to run the container. It’s important to remember the distinction between the flow source (which is the actual flow code e.g. our bus_live_locations.py script) and the flow dependencies. The flow source code is stored in and referenced from a repository (Github in our case). The flow dependencies are captured in our requirements.txt file and are installed in our base environment in the Docker image.

To put it simply: We store our scripts in a Github repo, we install Python and our required packages in a Docker image and we run them with Cloud Run. We can then update our data pipeline scripts on Github without having to build a new image each time.

Build Docker image and push to Artifact Registry

  1. Our Dockerfile to install Python and project packages. Note that we don’t copy over any of our scripts — just our Python version and our package requirements.
FROM  prefecthq/prefect:2.7.7-python3.9
COPY requirements.txt .
RUN pip install -r requirements.txt --trusted-host pypi.python.org

2. We will store our image in Artifact Registry. Before I can push images to the registry I need to authenticate:

gcloud auth configure-docker europe-west6-docker.pkg.dev

3. Build the Docker image with our registry address:

docker build -t bus-tracker:v1 .

4. Tag the image with the registry name in the format

docker tag SOURCE_IMAGE[:TAG] TARGET_IMAGE[:TAG]

for example:

docker tag bus-tracker:v1 europe-west6-docker.pkg.dev/bus-tracking-376121/bus-tracking/bus-tracker:v1

5. Push the image to the registry in the format

docker push HOST-NAME/PROJECT-ID/REPOSITORY/IMAGE

for example:

docker push europe-west6-docker.pkg.dev/bus-tracking-376121/bus-tracking/bus-tracker:v1

You’ll then see your Docker image available in Artifact Registry.

Setup Cloud Run and GitHub blocks

  1. Create a Cloud Run block and provide it with our Docker image location. The fields that you should complete are:
  • Type: cloud-run-job
  • Image name: This is the full location of your image in artifact registry (same as provided when pushing your image)
  • Region: Use the same region as the resource
  • Credentials: Use your GCP Credentials block

2. Create a GitHub block and in the Repository field put the full URL to your project repo. An access token isn’t required if you are using a public repo.

Update Prefect deployment

We now have to update our deployment to let Prefect know that we will be running our scripts from GitHub and using Cloud Run.

prefect deployment build master_flow.py:master_flow -n master__flow -sb github/bus-tracker  -ib cloud-run-job/bus-tracker-cloudrun -o prefect-master-flow-deployment --apply

where

  • master_flow.py is our flow script
  • master_flow is the name of the flow within the script
  • -sb is the name of the storage block of our flow code i.e. GitHub block
  • -ib is the name of our infrastructure block i.e. Cloud Run block
  • -o is the name of the deployment yaml file

5. Running Prefect agent on Google Compute Engine

The last piece of the puzzle — up until now we have been using our local system to run the Prefect agent. The agent is responsible for checking for flow runs that are ready to run and starting their execution.

In order for us to have the project fully run in the cloud we need to set up our agent on a Google Compute Engine instance. Danilo Drobac has an excellent summary of this process which I have mostly referenced here.

  1. Create the compute instance:
gcloud compute instances create prefect-agent \
--image=ubuntu-2004-focal-v20230104 \
--image-project=ubuntu-os-cloud \
--machine-type=e2-micro \
--zone=europe-west6-a \
--service-account=[SERVICE-ACCOUNT-NAME]

2. SSH into the VM. You can use the direct SSH connection button in the console:

Image by author

3. Create a new shell script to install the required packages — we’ll need to install Python and Prefect, then log in to Prefect with our Prefect API key:

#!/bin/bash
sudo apt-get update -y
sudo apt-get upgrade -y
sudo apt-get install -y \
ca-certificates \
curl \
gnupg \
lsb-release \
software-properties-common \
python3-dateutil
sudo ln-s /usr/bin/python3 /usr/bin/python
wget https://bootstrap.pypa.io/get-pip.py
sudo python3 get-pip.py
PATH="$HOME/.local/bin:$PATH"
exportPATH
pip3 install prefect prefect-gcp
prefect cloud login -k <INSERT_PREFECT_API_KEY>

4. Save the file and make it executable:

sudo chmod+x install_script.sh

5. Execute the script:

./install_script.sh

6. Start the agent. We’ll use tmux so that the agent continues to run after we kill the SSH connection. Open a tmux session with tmux then start the agent

prefect agent start -q default

Press ctrl/cmd+b then d to exit the tmux session

Image by author

7. Confirm that the session is still running with tmux list-sessions

We now have our Prefect agent running on a Compute Engine instance, checking for any new flow runs to update our data.

6. Data visualisation app with Streamlit

Streamlit is an open-source library that lets you rapidly and easily build data web apps in pure Python. You can self-host or use Streamlit’s Cloud service to deploy your app for free. To create the bus map I used the Streamlit-Folium library that integrates Streamlit with the mapping library Folium.

To demonstrate how easy it is — this interactive map app below can be made with 4 lines of code:

Image by author

The final project app has more components however, as it connects directly to the latest bus data csv file in the Google bucket. It allows the user to refresh the map data and view delayed buses in (almost) real-time.

Deploying the app on Streamlit Cloud is as simple as pointing the service to your app code on GitHub and clicking deploy. The live app can be viewed here.

Image by author

7. Go-live steps

To recap, here are the high-level project steps to go from setup to live web app:

  1. Create your Google Cloud account with necessary permissions and APIs enabled
  2. Use Terraform to create a storage bucket and artifact registry on Google Cloud service
  3. Run the Create BigQuery script to create the BigQuery table
  4. Push the latest Docker image to Artifact Registry
  5. Ensure that the Prefect Deployment files are up-to-date and that the latest ETL scripts have been pushed to your GitHub repo
  6. Start the Prefect agent in Compute Instance
  7. Set the flow run schedule(s) in Prefect Cloud
  8. Deploy the Streamlit app

8. Conclusion

This workflow has shown how you can extract value from modern data and infrastructure tools to create end-to-end pipelines that deliver actionable, live insights to users. The tools used, from GCS to Prefect and Streamlit, can be implemented by data professionals with various backgrounds and skill levels. With minimal tweaks, the process can be replicated and applied to any appropriate dataset, making it a beneficial toolset for a wide range of users.

Hope you enjoyed this walkthrough and it helps you in developing your data pipelines and projects. All relevant project files can be found here.

9. Resources

  1. Bus Open Data Service
  2. Black
  3. flake8
  4. pre-commit
  5. Terraform
  6. Prefect
  7. Google Cloud Services
  8. Visual Studio Code
  9. Docker
  10. Streamlit
  11. Project source code

--

--

Ryan Lamb

Business consultant | Data scientist | Chartered Management Accountant (CIMA) — let’s connect! ryanericlamb.com/