Dataform and Terraform: Automate SQL pipelines in production

Automate the orchestration of SQL pipelines in Google Cloud Platform using Dataform, Terraform, and Workflows.

Alejandro De La Cruz López
Astrafy

--

What is Dataform

Dataform, in a nutshell, allows you to orchestrate SQL workflows. This helps transform raw data into reliable datasets for analytics. It also provides a collaborative platform for managing SQL workflows, testing data quality, and documenting datasets.

Google bought Dataform in 2020 to integrate it with BigQuery, and since then they have been working to develop it entirely for GCP. Their latest big release 3.0 announced that Dataform will no longer support other data warehouses or databases apart from BigQuery, becoming an integrated part of the GCP platform. You can access it directly from the BigQuery service.

Its integration with BigQuery makes Dataform compelling over other options in the market for companies that already have their data stored in BigQuery.

How should I orchestrate Dataform

If you are using dbt, you either need an orchestrator, such as Airflow, or dbt cloud to execute the queries in your dbt models. This is a main concern since having your own infrastructure implies spending time and money on it. On the other hand, dbt cloud is quite expensive. Therefore, neither option is optimal.

On the contrary, Dataform is integrated into GCP (and it’s free). Therefore, it enables direct access to GCP orchestration tools. Dataform has an orchestration functionality, which allows you to orchestrate your Dataform pipelines. However, this feature is not customizable enough for most cases for the following reason: it is not possible to run the pipelines with dynamic variables such as date.

In the real world, most of the pipelines are incremental, since every day you are ingesting the new data from your sources. The straightforward approach is to use CURRENT_DATE() in the SQL query. However, if any run fails, re-executing it will mean running a different query since the date may be different. The recommended approach is to use a variable such as date. Then, when re-executing, it will ingest the data for the date provided as a variable and not a relative time related to the current date.

For this reason, using automated workflows is not suitable in most cases. However, for those projects that do not need to use dynamic variables, this approach has advantages:

  • Really simple and integrated inside Dataform
  • Scheduled queries on a cron interval
  • Can be fully automated with IaC (Terraform)

For a simple project, this is all we need, and having orchestration that easily is really convenient.

Other options to orchestrate Dataform are Airflow (or any other orchestrator) and Google Workflows. The first option is ideal but, as discussed before, it is time and cost expensive. The second one is the one we recommend and it will be explained in this article.

Orchestrating Dataform

There are several ways to execute Dataform. You can check a very informative comparison below from the article Mastering Dataform Execution in GCP: A Practical Guide with CI/CD Example.

Possibilities to run Dataform

Ideally, the best approach to follow is using an orchestrator. This is because Dataform does not provide a UI where you can see every day’s run with its results. This complicates debugging on which dates the model runs and can lead to issues. It also provides functionality to run custom code for other functionalities needed before, between, or after your SQL pipelines.

Yet, we recommend leveraging its integration with GCP and orchestrating it using Google Workflows. This allows to automate the whole deployment with Terraform with very few simple scripts. Besides, Google provides its documentation and an already-made workflow for this. This solution proposes a balance between simplicity, cost, features, and maintainability. Consequently, here is the architecture.

Production Dataform Architecture
Production Dataform Architecture

Demo: FinOps data product

To show the Dataform production environment, we will build a simple data product related to FinOps. This will consist of two models and a source. The source will be Google Cloud billing data table that contains the price for all used resources in our Google Cloud organization. You can check the code for the Dataform repository here.

Bootstrapping Dataform repository

You can check the official guidelines for the CLI here.

  • Before installing the Dataform CLI, install NPM.
  • Make sure to be inside the git folder you have created in your repository client (GitLab, GitHub, etc.)
# Install dataform
npm i -g @dataform/cli@^3.0.0-beta.4

# Initialize the repository in the current directory
dataform init . my-project EU

This will create a folder structure with workflow_settings.yaml file such as the following.

  • You can remove the default dataset entry to use the one you will set in your models
  • Set the project you want to use to store your models
  • Set the location of your BigQuery datasets
  • Set the defaultAssertionDataset to a dataset where you want to store your tests. Example: bqdts_finops_tests
dataformCoreVersion: 3.0.0-beta.4
defaultProject: my-project
defaultLocation: EU
defaultDataset: dataform
defaultAssertionDataset: dataform_assertions

The final structure is the following. The folders dm, int, sources, and stg are created manually to host there the models for each of the layers.

We will then build int_billing_data.sqlx and net_cost_grouped.sqlx.

Declaring sources and models

We declare the source as such.

config {
type: "declaration",
database: dataform.projectConfig.defaultProject,
schema: "bqdts_finops_lz",
name: "finops_lz",
}

# Then, you can access it with the ref function ${ref("finops_lz")}

Then, we build the models. Here are a few best practices we recommend:

  • Set the tag of the stage (int, stg, dm) so that it is possible to run a stage using the tag.
  • Set the type. It can be incremental, table, or view.
  • Set the schema where to write this model
  • Set the partition and clustering if needed
  • Set the assertions, these are tests. You can see the documentation here.
  • In the pre_operations, you need to remove the current partition to have a similar behavior as dbt for incremental models. First, delete the partition you are inserting so that you don’t repeat data in your final table.

Here is an example of the model int_billing_data.sqlx. The final model net_cost_grouped.sqlx uses the same columns but adds up cost and discount to get the net_cost. This way we have a simple production-ready finops data product.

config {
tags: ["int"],
type: "incremental",
schema: "bqdts_finops_int",
bigquery: {
partitionBy: "export_date",
clusterBy: ["project_id", "service_name", "service_description"],
},
assertions: {
nonNull: ["export_date"],
rowConditions: [
`service_name is not null`
]
}
}

pre_operations {
${when(incremental(), `DELETE FROM ${self()} WHERE export_date = "${dataform.projectConfig.vars.date}"`) }
}

SELECT
CAST(export_time as DATE) as export_date,
project.id as project_id,
service.description as service_name,
sku.description as service_description,
currency as currency,
SUM(cost) as cost,
IFNULL(SUM(
(
SELECT sum(credit.amount)
FROM UNNEST(credits) credit
WHERE credit.type != 'PROMOTION'
)
), 0) as discount,
from ${ref("finops_lz")}
${when(incremental(), `WHERE DATE(_PARTITIONTIME) = "${dataform.projectConfig.vars.date}"`) }
GROUP BY export_date, project_id, service_name, service_description, currency

Then, we are ready to deploy our infrastructure in Google Cloud to run our data product.

Deploying Dataform with Terraform

The repository contains a top-level folder in which shared resources will be created and a module inside the folder dataform. This module will contain all resources that need to be deployed per environment (dev, stg, prd). You can find all the code shown in this section in the following repository.

Terraform repository structure

To build the desired architecture, we need the following components.

1. Project and folder infrastructure

Starting with the basics, we will create a folder with three projects. The operations, development, and production project. Ideally we would also have a staging project, but we will omit it for this demo. The top folder and operations project are created in the top-level folder. The development and production projects are created inside the dataform module.

Project and folder structure

2. External repository

Dataform connects with an external repository to read the queries and other Dataform metadata. You can read the official documentation here. In a nutshell, you will need a key to access the repository in a secret. This is done in the ops project.

resource "google_secret_manager_secret" "gitlab_token" {
project = module.project_ops_demo.project_id
secret_id = "gitlab_token"

replication {
user_managed {
replicas {
location = "europe-west1"
}
}
}
}


resource "google_secret_manager_secret_version" "gitlab_token" {
secret = google_secret_manager_secret.gitlab_token.id

secret_data = var.dataform_demo_gitlab_token
}

3. BigQuery datasets

If they are not yet provisioned, you will need all the BigQuery datasets to create the tables with Dataform. Dataform creates the datasets by itself, but it is a best practice to do it with Terraform. We suggest a landing zone (lz), staging (stg), intermediate (int), and data mart (dm) strategy. A dataset for Dataform tests is also needed.

locals {
bq_datasets = toset(["bqdts_finops_lz", "bqdts_finops_stg", "bqdts_finops_int", "bqdts_finops_dm", "bqdts_finops_tests"])
}

resource "google_bigquery_dataset" "bq_datasets" {
for_each = toset(var.bq_datasets)
dataset_id = each.value
friendly_name = each.value
location = "EU"
project = module.project.project_id
}

4. Service accounts & IAM

You need two service accounts for dev, and two for production. One is responsible for executing the Google Workflow that will then trigger the Dataform pipeline. The second one is responsible for triggering the Dataform pipeline.

resource "google_service_account" "dataform_runner" {
account_id = "dataform-${var.env}-runner"
display_name = "Dataform ${var.env} runner"
project = module.project.project_id
description = "Default account which Dataform pipelines will run with"
}

resource "google_service_account" "workflow_dataform" {
project = module.project.project_id
account_id = "${var.env}-dataform-workflow-runner"
display_name = "${var.env} Dataform Workflow Runner"
description = "Runs dataform workflows in ${var.env}"
}

The permissions needed are the following.

  • Secret accessor to the Dataform default SA to access the secret for the repository
  • SA token creator to the Dataform default SA to the Dataform runner SA, so that it can run workflows
  • BigQuery jobUser and DataEditor to the Dataform runner SA
  • Workflows invoker, and Dataform editor to the Workflow runner SA
  • SA token creator to the Workflow runner SA to the Dataform runner SA so that the workflow can trigger the dataform pipeline

5. Dataform repository

This resource is connected to the repository where your SQL files will be stored. Also, you will be able to see this repository the Dataform UI with its runs and the result of each.

Dataform UI for workflow execution logs
resource "google_dataform_repository" "dataform_repository" {
provider = google-beta

name = "dataform_demo"
display_name = "dataform_demo"

project = module.project.project_id
region = "europe-west1"

service_account = google_service_account.dataform_runner.email
git_remote_settings {
url = "https://gitlab.com/demos2261901/dataform-demo.git"
default_branch = "main"
authentication_token_secret_version = var.gitlab_secret.id
}

workspace_compilation_overrides {
default_database = module.project.project_id
}
}

6. Workflow that triggers Dataform

The date variable is a Dataform variable used to run the pipeline in an incremental model. This way, the SQL queries know which data they need to retrieve and process. It defaults to use the last day in format YYYY-MM-DD

All the variables in the init step can be modified when triggering the workflow. There is a description of each of them:

  • date: Date to use in your incremental queries
  • project_id: Overwrites the default project.
  • git_commitish: Selects the git branch to use.
  • dataform_tags: Runs only a selection of tags.
  • dataform_targets: Runs only a selection of actions (models).
  • include_dependencies / include_dependents: Runs dependencies/dependents actions of selected tags/actions.
  • dataform_service_account: Dataform will perform all operations on BigQuery using this service account. If an empty string, default Dataform SA specified in your repository settings is used.
  • fully_refresh_incremental_tables: Recreates from scratch incremental tables.
  • wait_for_dataform_status_check: If true, the workflow will wait until Dataform transformations finish and return an error if Dataform does as well. If false, it will send the request to Dataform to execute the transformations and return success independently of Dataform’s result.
  • compile_only: Compile the Dataform code and do not execute it. Can be useful in a CI/CD pipeline to check there is no Dataform compilation errors before applying something.
locals {
branch = var.env == "prd" ? "main" : var.env
}

resource "google_workflows_workflow" "dataform" {
project = module.project.project_id
name = "dataform_workflow"
region = "europe-west1"
description = "Trigger dataform workflow"
service_account = google_service_account.workflow_dataform.id
call_log_level = "LOG_ERRORS_ONLY"

source_contents = <<-EOF
main:
params: [args]
steps:
- init:
assign:
- date: $${default(map.get(args, "date"), text.substring(time.format(sys.now() - 86400), 0, 10))} # Yesterday's date in format YYYY-MM-DD
- project_id: ${module.project.project_id}
- git_commitish: $${default(map.get(args, "git_commitish"), "${local.branch}")}
- dataform_region: europe-west1
- dataform_repository_name: ${google_dataform_repository.dataform_repository.name}
- dataform_repository_id: ${google_dataform_repository.dataform_repository.id}
- dataform_api_version: v1beta1
- dataform_tags: $${default(map.get(args, "dataform_tags"), [])}
- dataform_targets: $${default(map.get(args, "dataform_targets"), [])}
- dataform_service_account: ${google_service_account.dataform_runner.email}
- include_dependencies: $${default(map.get(args, "include_dependencies"), false)}
- include_dependents: $${default(map.get(args, "include_dependents"), false)}
- fully_refresh_incremental_tables: $${default(map.get(args, "fully_refresh_incremental_tables"), false)}
- wait_for_dataform_status_check: $${default(map.get(args, "wait_for_dataform_status_check"), true)}
- compile_only: $${default(map.get(args, "compile_only"), false)}
- createCompilationResult:
try:
call: http.post
args:
url: $${"https://dataform.googleapis.com/" + dataform_api_version + "/" + dataform_repository_id + "/compilationResults"}
auth:
type: OAuth2
body:
git_commitish: $${git_commitish}
codeCompilationConfig:
vars: { "date": "$${date}" }
defaultDatabase: $${project_id}
result: compilationResult
retry:
maxRetries: 2
interval: 10s
- earlyStopBeforeDataformWorkflowInvocation:
switch:
- condition: $${"compilationErrors" in compilationResult.body}
raise:
message: $${"Error while compiling Dataform repository :" + " " + compilationResult.body.name}
compilationErrors: $${compilationResult.body.compilationErrors}
- condition: $${compile_only}
return: "Dataform compilation successfully done. No errors found."
- createWorkflowInvocation:
call: http.post
args:
url: $${"https://dataform.googleapis.com/" + dataform_api_version + "/" + dataform_repository_id + "/workflowInvocations"}
auth:
type: OAuth2
body:
compilationResult: $${compilationResult.body.name}
invocationConfig:
includedTags:
- $${dataform_tags}
includedTargets:
- $${dataform_targets}
transitiveDependenciesIncluded: $${include_dependencies}
transitiveDependentsIncluded: $${include_dependents}
fullyRefreshIncrementalTablesEnabled: $${fully_refresh_incremental_tables}
serviceAccount: $${dataform_service_account}
result: workflowInvocation
- earlyStopBeforeDataformStatusCheck:
switch:
- condition: $${not wait_for_dataform_status_check}
return: $${"Dataform workflow invocation successfully created :" + " " + workflowInvocation.body.name}
- getInvocationResult:
call: http.get
args:
url: $${"https://dataform.googleapis.com/" + dataform_api_version + "/" + workflowInvocation.body.name}
auth:
type: OAuth2
result: invocationResult
- waitForResult:
call: sys.sleep
args:
seconds: 10
next: checkInvocationResult
- checkInvocationResult:
switch:
- condition: $${invocationResult.body.state == "RUNNING"}
next: getInvocationResult
- condition: $${invocationResult.body.state == "SUCCEEDED"}
return: $${"Dataform workflow invocation finished with status 'succeeded' :" + " " + invocationResult.body.name}
- condition: $${invocationResult.body.state == "CANCELLED" or invocationResult.body.state == "FAILED" or invocationResult.body.state == "CANCELING"}
steps:
- raiseException:
raise: $${"Error while running Dataform workflow :" + " " + invocationResult.body.name + " " + invocationResult.body.state}
EOF
}

Here’s an overview of what each step of the pipeline does:

  1. createCompilationResult: This initial step compiles the Dataform source code from the designated Git branch.
  2. earlyStopBeforeDataformWorkflowInvocation: If any errors are detected during the compilation phase, the workflow halts, displaying the encountered errors. Also, if compile_only is set to true, the workflow stops here.
  3. createWorkflowInvocation: Involves triggering the compiled Dataform code for invocation.
  4. earlyStopBeforeDataformStatusCheck: If you do not care about the Dataform transformation outputs, this step will stop the workflow execution before the status check.
  5. getInvocationResult: This step retrieves the status of the executed Dataform invocation. The possible results include “RUNNING”, “SUCCEEDED”, “FAILED”, or “CANCELLED”.
  6. waitForResult + checkInvocationResult: Continuously monitors the Dataform status obtained from the previous step. It iterates every 10 seconds until the status changes from “RUNNING”. Finally, it displays the definitive state of the Dataform process.

7. Scheduler

In order to run it automatically, we trigger the workflow every day at 8 am.

resource "google_cloud_scheduler_job" "daily_dataform" {
project = module.project.project_id
name = "daily_dataform"
region = "europe-west1"
description = "Gets today's date and trigger dataform daily workflow"
schedule = "0 8 * * *"
time_zone = "Europe/Paris"
attempt_deadline = "320s"

http_target {
http_method = "POST"
uri = "https://workflowexecutions.googleapis.com/v1/${google_workflows_workflow.dataform.id}/executions"
body = base64encode(
jsonencode({
"argument" : jsonencode({}),
"callLogLevel" : "CALL_LOG_LEVEL_UNSPECIFIED"
}
))

oauth_token {
service_account_email = google_service_account.workflow_dataform.email
scope = "https://www.googleapis.com/auth/cloud-platform"
}
}
}

Infrastructure deployed

Then, we have a Cloud scheduler that triggers the Workflow pipeline once a day at 8:00, which we see below (the first one was a test).

Google Workflow that triggers Dataform

The Dataform workflow is triggered right after the workflow.

Dataform workflow execution logs

We see the tables populated in the BigQuery production project.

BigQuery production project

Conclusion

As Dataform is integrated into Google Cloud, it is really easy to use it across different services, which makes it a great fit for organizations having data in BigQuery. Automating its deployment with Terraform is fairly easy as we have seen, and its maintenance requires little to no time thanks to its integrations.

While there are other tools in the market that perform similar tasks, it is definitively a good idea to select Dataform as the choice for SQL orchestrators in Google Cloud.

Thank you

Thank you for taking the time to read the article! I’d love to hear from you if you have any questions, comments, or suggestions.

We will keep writing about data topics on Google Cloud. Stay tuned for our next articles at https://medium.astrafy.io/

If you are looking for support on your Google Cloud data & ML use cases, feel free to reach out.

--

--

Writer for

MLOps Engineer passionate about bridging ML & Data Science. Focused on maintainable, scalable solutions & continuous learning. Here to share insights & grow.