End to End Serverless ELT with Google Cloud, dbt and Terraform

Youness DRISSI SLIMANI
8 min readMay 14, 2024

--

Introduction

In this article, we will delve into the development of a Serverless Extract, Load, Transform (ELT) pipeline using Google Cloud Services, dbt, and Terraform. The use of these technologies allows us to build a robust, scalable, and cost-effective data pipeline.

Use Case

The ELT pipeline we’ve developed leverages several Google Cloud Services including Google Cloud Storage (GCS), BigQuery, Pub/Sub, Cloud Workflows, Cloud Run, and Cloud Build. We also use dbt for data transformation and Terraform for infrastructure as code.

Google Cloud Services

Google Cloud Storage (GCS)

GCS is used as the initial landing zone for our data. It’s a scalable and durable object storage service, perfect for unstructured data.

BigQuery

BigQuery is Google Cloud’s fully managed, petabyte-scale, and cost-effective analytics data warehouse that lets you run analytics over vast amounts of data in near real-time.

Pub/Sub

Google Cloud Pub/Sub is a messaging service for exchanging event data among services and applications. In our pipeline, it’s used to trigger certain processes when new data arrives.

Cloud Workflows

Cloud Workflows is a fully managed, serverless API offering from Google Cloud that helps in orchestrating and automating Google Cloud and HTTP-based services. It’s used in our pipeline to manage and coordinate tasks.

Cloud Run

Cloud Run is a managed compute platform that enables you to run stateless containers that are invocable via HTTP requests. It’s used in our pipeline for running our dbt transformations.

Cloud Build

Cloud Build is a service that executes your builds on Google Cloud. We use it to automate deployments of our dbt project using github.

Data Transformation with dbt

dbt (data build tool) is a command-line tool that enables data analysts and engineers to transform data in their warehouse more effectively. In our pipeline, dbt is used for transforming the extracted data and loading it into BigQuery.

Infrastructure as Code with Terraform

Terraform is an open-source infrastructure as code software tool that provides a consistent CLI workflow to manage hundreds of cloud services. We use Terraform to automate the creation and management of our Google Cloud resources.

Structure of the project

Setting and deploy the project

Create a Service Account

To interact with Google Cloud Services programmatically, you need to create a service account. This account is an identity attached to your project, and it is used by the Google Cloud SDK and other Google Cloud client libraries to authenticate calls to Google Cloud APIs.

Here are the steps to create a service account with an admin role:

  1. Navigate to the Google Cloud Console.
  2. Select your project.
  3. Go to ‘IAM & Admin’ > ‘Service Accounts’.
  4. Click on ‘Create Service Account’.
  5. Enter a name for the service account, and click ‘Create’.
  6. In the ‘Grant this service account access to project’ section, select the ‘Owner’ role. This role provides full access to all resources in the project.
  7. Click ‘Continue’ and then ‘Done’.

Remember to download the JSON key file for this service account and keep it secure. This key file will be used to authenticate your service account when deploying your project.

Setting the project

After creating your service account and downloading the JSON key, you need to move this key to infra folder. This is generally done for better organization and easier access.

The provider.tf file is where you specify your Terraform provider - in this case, Google Cloud Platform (GCP). You need to update this file with the path to the JSON key of the service account you just moved. Here's how:

  1. Open the provider.tf file in a text editor.
  2. Locate the credentials field in the provider "google" block.
  3. Update the credentials field with the path to your service account JSON key. It should look something like this (replace your-key.json with your actual file name):
provider "google" {
credentials = file("/path/to/your-infrastructure-project/your-key.json")
project = "your-gcp-project-id"
}

Deploy services in Google Cloud

Navigate to infra folder, we gonna deploy the project using Terraform :

  1. Initialize your Terraform workspace, which will download the provider plugins for Google Cloud: terraform init
  2. Plan the deployment and review the changes: terraform plan
  3. If everything looks good, apply the changes: terraform apply

N.B: Before deploying your project using Terraform, it’s important to ensure that all the necessary values in your Terraform files are correctly set. This includes the name of your project, location, and any other variables that your infrastructure might depend on.

Once you’ve deployed your project using Terraform, you can verify the creation of services in the Google Cloud Console. Here’s how you can check each service:

Google Cloud Storage (GCS): Navigate to the ‘Storage’ section in the Google Cloud Console. Here, you should see your newly created storage buckets.

Cloud Workflows: Go to the ‘Workflows’ section in the Google Cloud Console. You should be able to see your workflows listed here. You can click on each workflow to see its details and status.

BigQuery Dataset and Tables: Navigate to the ‘BigQuery’ section in the Google Cloud Console. On the left-hand side, you should see your project name. Expand it, and you should see your datasets. Click on a dataset to expand it and see the tables within it.

Cloud Run set up

Now, we are ready to build and deploy our container inside Cloud Build. We can do it straight from Cloud Run. Create a new service:

Choose ‘Set up with Cloud Build’ and connect to GitHub and choose your repository with dbt project

Your Github repository is dbt-etl folder refer to my Github repository

Enter your ^master$ or ^main$ branch, location to the Dockerfile and click “Save”

The service was created and Docker image should be deployed. Now we need to enter our credentials that will be accessed inside our image. The profiles.yml file in dbt-etl folder, where one of the key-value pairs is keyfile: /secrets/dbt-service-keyfile ? We need to store the json keyfile somewhere. And because we’re in GCP why not use Secret Manager for this. Find the product and enable its API if needed.

Upload the saved JSON keyfile:

Now, go back to Cloud Run, click on your created service, then go to ‘Edit & Deploy New Revision’ > ‘Volumes’:

Click Done and then Deploy.

DBT project

Structure of the project

Data model

Transformation

-- dim_customer.sql

-- Create the dimension table
WITH customer_cte AS (
SELECT DISTINCT
{{ dbt_utils.generate_surrogate_key(['CustomerID', 'Country']) }} as customer_id,
Country AS country
FROM {{ source('retail_dsy', 'raw_invoice') }}
WHERE CustomerID IS NOT NULL
)
SELECT
t.*,
cm.iso
FROM customer_cte t
LEFT JOIN {{ source('retail_dsy', 'raw_country') }} cm ON t.country = cm.nicename
-- dim_datetime.sql

-- Create a CTE to extract date and time components
WITH datetime_cte AS (
SELECT DISTINCT
InvoiceDate AS datetime_id,
CASE
WHEN LENGTH(InvoiceDate) = 16 THEN
-- Date format: "DD/MM/YYYY HH:MM"
PARSE_DATETIME('%m/%d/%Y %H:%M', InvoiceDate)
WHEN LENGTH(InvoiceDate) <= 14 THEN
-- Date format: "MM/DD/YY HH:MM"
PARSE_DATETIME('%m/%d/%y %H:%M', InvoiceDate)
ELSE
NULL
END AS date_part,
FROM {{ source('retail_dsy', 'raw_invoice') }}
WHERE InvoiceDate IS NOT NULL
)
SELECT
datetime_id,
date_part as datetime,
EXTRACT(YEAR FROM date_part) AS year,
EXTRACT(MONTH FROM date_part) AS month,
EXTRACT(DAY FROM date_part) AS day,
EXTRACT(HOUR FROM date_part) AS hour,
EXTRACT(MINUTE FROM date_part) AS minute,
EXTRACT(DAYOFWEEK FROM date_part) AS weekday
FROM datetime_cte
-- dim_product.sql
-- StockCode isn't unique, a product with the same id can have different and prices
-- Create the dimension table
SELECT DISTINCT
{{ dbt_utils.generate_surrogate_key(['StockCode', 'Description', 'UnitPrice']) }} as product_id,
StockCode AS stock_code,
Description AS description,
UnitPrice AS price
FROM {{ source('retail_dsy', 'raw_invoice') }}
WHERE StockCode IS NOT NULL
AND cast(UnitPrice as float64) > 0
-- fct_invoices.sql

-- Create the fact table by joining the relevant keys from dimension table
WITH fct_invoices_cte AS (
SELECT
InvoiceNo AS invoice_id,
InvoiceDate AS datetime_id,
{{ dbt_utils.generate_surrogate_key(['StockCode', 'Description', 'UnitPrice']) }} as product_id,
{{ dbt_utils.generate_surrogate_key(['CustomerID', 'Country']) }} as customer_id,
cast(Quantity as float64) AS quantity,
cast(Quantity as float64) * cast(UnitPrice as float64) AS total
FROM {{ source('retail_dsy', 'raw_invoice') }}
WHERE cast(Quantity as float64) > 0
)
SELECT
invoice_id,
dt.datetime_id,
dp.product_id,
dc.customer_id,
quantity,
total
FROM fct_invoices_cte fi
INNER JOIN {{ ref('dim_datetime') }} dt ON fi.datetime_id = dt.datetime_id
INNER JOIN {{ ref('dim_product') }} dp ON fi.product_id = dp.product_id
INNER JOIN {{ ref('dim_customer') }} dc ON fi.customer_id = dc.customer_id

FastAPI to run DBT

I use FastAPI that trigger the the dbt run command using Python's subprocess module. If the command succeeds, it returns the output and error (if any) as a JSON response. If the command fails, it raises an HTTPException with a 500 status code and the error message.

@app.get("/run")
def run_dbt():
logger.info("Running dbt run command")
# Run the dbt run command and capture the output
result = subprocess.run(["dbt", "run"], capture_output=True, text=True)

# Check the return code of the command
if result.returncode == 0:
# The command succeeded, return a 200 status code and the output
logger.info("dbt run command succeeded")
logger.info("dbt run output: " + result.stdout)
logger.info("dbt run error: " + result.stderr)

return {"output": result.stdout, "error": result.stderr}
else:
# The command failed, raise an HTTPException with a 500 status code and the error message
error_message = f"dbt run command failed with error: {result.stderr} stdoud : {result.stdout}"
logger.error(error_message)
raise HTTPException(status_code=500, detail=error_message)

This endpoint is called in Google Cloud Workflow :

# workflow.yaml

# call api in cloud run to trigger dbt
- trigger-dbt:
call: http.get
args:
url: "https://<your-cloud-run-service-url>/run"
result: dbtResult

Testing

Finally to test the workflow from end to end, we can lunch the script upload_include_dataset_to_gcs.sh.

Once the files uploaded to gcs it’s trigger the Google Cloud Workflow and load the data to tables in Bigquery.

Conclusion

This article has provided a comprehensive and practical example of an ELT pipeline utilizing Google Cloud Workflows and dbt.

The benefits of using Workflows are numerous. It is serverless, making it cost-effective and lightweight, which aligns perfectly with the requirements of an ELT pipeline.

For GCP Developers, the learning curve for the Workflow system is relatively short. This is because it operates on API calls and leverages native Google Cloud APIs, making it a familiar environment for developers to work in.

All the code shared on this article is accessible from my Github repository :

--

--