Serverless ELT with GCS, BigQuery and Cloud Workflows
1. Explanation of the use case presented in this article
This article presents a complete use case with a Serverless ELT pipeline with Cloud Storage, BigQuery and Cloud Workflows.
We proposed previously the same use case with Airflow and Cloud Composer, and we wanted to revisit it with a Serverless approach.
Google Cloud offers 2 services for tasks and pipeline orchestration :
- Cloud Composer based on a GKE Cluster and Airflow
- Cloud Workflows a serverless component based in a
Yaml
syntaxe and on API calls to interact with Google Cloud services.
For Workflows, developers can check the official documentation to have code samples and also the Google Cloud official documentation for the concerned API, example for the googleapis.bigquery.v2.jobs.insert
:
Workflow :
Google Cloud API :
Both solutions are interesting and teams in projects, can choose the one that best suits their needs.
We illustrate some pros and cons, but we will propose a dedicated article to give more details on this comparison.
Cloud Workflows
Pros
- Present the advantage to be serverless and no need to manage an infra
- Lightweight, we only need to write the
Yaml
code and deploy the workflow with agcloud
command - Cost effective
- Simple and based on API calls (standard Google Cloud APIs)
Cons
- The
Yaml
code of Workflows is not testable easily with unit tests - Currently, we don’t have the possibility to retry a failed task
- Sometimes the
Yaml
code can be verbose, because we need to add polling and other orchestration logic, not provided natively by the concerned APIs - The subworkflows and separate logics can’t be included and shared natively, when placed in external files or other Github repositories
Cloud Composer
Pros
- Based on Apache Airflow, a popular open source library with a large community
- Based on
Python
and easier thatYaml
to write unit tests - Many operators given natively to interact with Google Cloud services
- Easy to write a DAG and orchestration logic based on operators
- Can retry a failed tasks
- Monitoring proposed by default
Cons
- Based on a GKE cluster, we need to configure and manage it
- A GKE cluster with Airflow has a cost with, at minimum, several hundred per month
- By default, the PyPi packages are globals and not isolated per DAG
Here you can see the diagram of this use case :
I also created a video on this topic in my GCP Youtube channel, feel free to subscribe to the channel to support my work for the Google Cloud community :
English version
French version
Some explanations :
- Extract : a input raw file is uploaded to Cloud Storage, this file has a
ndjson
format - Load : the raw file is loaded to a BigQuery raw table
- Transform : the transform part is applied with SQL query in BigQuery and the result is loaded in a final domain table
- Backup processed files : Workflows invokes Cloud Build to move processed files from the input to the destination bucket. The advantage is to have access to the 𝕘𝕔𝕝𝕠𝕦𝕕 𝕔𝕝𝕚, because it’s not possible natively with the storage API to move several files (objects) between buckets.
The deployment and CI CD part are done with Cloud Build.
Firstly the use case is execute with gcloud commands and bash scripts :
- Deploy the workflow
- Run the workflow with runtime arguments
- Create a scheduler and cron job with Cloud Scheduler to launch the workflow with runtime arguments
Then the use case is execute with Terraform :
- Deploy the workflow
- Create a scheduler and cron job with Cloud Scheduler to launch the workflow with runtime arguments
2. Structure of the project
2.2 Execute the ELT Workflow with Cloud Build and gcloud commands
The first part of this use case will deploy/run the resources with gcloud commands :
- Create and deploy the workflow
- Run the workflow directly with runtime arguments
- Create a cron job with Cloud Scheduler, that will execute the workflow with runtime arguments
2.2.1 Workflow code logic
Create and deploy the Workflow
A root folder called workflow
contains the workflow to create : team_league_elt_gcs_file_schema.yaml
The other file serves only to illustrate how using a BigQuery schema directly in the Yaml
We created a configuration file for our Workflow, that will be passed as runtime arguments : workflow/config/workflow_config.json
{
"team_stats_raw_bq_create_disposition": "CREATE_NEVER",
"team_stats_raw_bq_write_disposition": "WRITE_APPEND",
"team_stats_raw_bq_source_format": "NEWLINE_DELIMITED_JSON",
"team_stats_raw_bq_schema_uri": "gs://mazlum_dev/workflows/team_league/schema/team_stats_raw_table_schema.json",
"dataset": "mazlum_test",
"team_stat_table": "team_stat",
"team_stat_raw_table": "team_stat_raw",
"team_stats_raw_files_hot_source_uri": "gs://mazlum_dev/workflows/team_league/elt/hot/*.json",
"team_stats_raw_files_cold_destination_uri": "gs://mazlum_dev/workflows/team_league/elt/cold/"
}
The elements like the source and destination buckets are given (hot/cold). The elements for raw and domain tables are also configured.
The workflow code logic in Yaml
:
main:
params: [ workflowConfig ]
steps:
- init:
assign:
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- location_id: "global"
- loadTeamStatsRawToBq:
call: googleapis.bigquery.v2.jobs.insert
args:
projectId: ${project_id}
body:
configuration:
load:
destinationTable:
datasetId: ${workflowConfig.dataset}
projectId: ${project_id}
tableId: ${workflowConfig.team_stat_raw_table}
referenceFileSchemaUri: ${workflowConfig.team_stats_raw_bq_schema_uri}
sourceFormat: ${workflowConfig.team_stats_raw_bq_source_format}
sourceUris: ${workflowConfig.team_stats_raw_files_hot_source_uri}
createDisposition: ${workflowConfig.team_stats_raw_bq_create_disposition}
writeDisposition: ${workflowConfig.team_stats_raw_bq_write_disposition}
result: loadTeamStatsRawToBqResult
- runQueryTransformToTeamStatsDomainAndLoadToBQ:
call: googleapis.bigquery.v2.jobs.query
args:
projectId: ${project_id}
body:
useLegacySql: false
query: INSERT INTO `mazlum_test.team_stat`
(
teamName,
teamScore,
teamSlogan,
teamTotalGoals,
topScorerStats,
bestPasserStats,
ingestionDate
)
SELECT
team_stats.teamName,
team_stats.teamScore,
team_slogan.teamSlogan,
sum(scorer.goals) as teamTotalGoals,
ARRAY_AGG(
STRUCT(
scorer.scorerFirstName AS firstName,
scorer.scorerLastName AS lastName,
scorer.goals AS goals,
scorer.games AS games
)
ORDER BY scorer.goals DESC LIMIT 1
)[OFFSET(0)] AS topScorerStats,
ARRAY_AGG(
STRUCT(
scorer.scorerFirstName AS firstName,
scorer.scorerLastName AS lastName,
scorer.goalAssists AS goalAssists,
scorer.games AS games
)
ORDER BY scorer.goalAssists DESC LIMIT 1
)[OFFSET(0)] AS bestPasserStats,
current_timestamp() as ingestionDate
FROM `mazlum_test.team_stat_raw` team_stats
INNER JOIN `mazlum_test.team_slogan` team_slogan ON team_stats.teamName = team_slogan.teamName,
UNNEST(team_stats.scorers) AS scorer
GROUP BY
team_stats.teamName,
team_stats.teamScore,
team_slogan.teamSlogan
result: queryResult
- copyProcessedFilesToColdBucket:
call: googleapis.cloudbuild.v1.projects.builds.create
args:
projectId: ${project_id}
parent: ${"projects/" + project_id + "/locations/" + location_id}
body:
serviceAccount: ${sys.get_env("GOOGLE_CLOUD_SERVICE_ACCOUNT_NAME")}
options:
logging: CLOUD_LOGGING_ONLY
steps:
- name: gcr.io/google.com/cloudsdktool/cloud-sdk:455.0.0-slim
script: ${"gsutil cp " + workflowConfig.team_stats_raw_files_hot_source_uri + " " + workflowConfig.team_stats_raw_files_cold_destination_uri}
result: resultCloudBuildCopy
- returnResult:
return: ${resultCloudBuildCopy}
Some explanations for the code logic in the workflow :
The program start with a top start element called main
Runtime arguments
We can pass runtime arguments with params
, that corresponds to the configuration showed previously :
params: [ workflowConfig ]
List of steps
Then the rest of the code is a list of steps :
steps:
- init:
.....
- loadTeamStatsRawToBq:
.....
Reusable variables
The first step prepares reusable variables accessible in the other steps :
main:
params: [ workflowConfig ]
steps:
- init:
assign:
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- location_id: "global"
Load raw files to BigQuery raw table
The second step corresponds to the load part of our ELT pipeline, we use the standard API to execute a load job to BigQuery. In this case, the load job will write all the input raw files (objects) to the BigQuery raw table.
A important detail here, we need to pass the schema of the BigQuery table, we can do it in the two following ways :
- Pass the schema directly in the
Yaml
code - Reference a
Json
schema from a Cloud Storage bucket
In this first example, the schema is referenced from Cloud Storage via referenceFileSchemaUri
main:
params: [ workflowConfig ]
steps:
- init:
assign:
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- location_id: "global"
- loadTeamStatsRawToBq:
call: googleapis.bigquery.v2.jobs.insert
args:
projectId: ${project_id}
body:
configuration:
load:
destinationTable:
datasetId: ${workflowConfig.dataset}
projectId: ${project_id}
tableId: ${workflowConfig.team_stat_raw_table}
referenceFileSchemaUri: ${workflowConfig.team_stats_raw_bq_schema_uri}
sourceFormat: ${workflowConfig.team_stats_raw_bq_source_format}
sourceUris: ${workflowConfig.team_stats_raw_files_hot_source_uri}
createDisposition: ${workflowConfig.team_stats_raw_bq_create_disposition}
writeDisposition: ${workflowConfig.team_stats_raw_bq_write_disposition}
result: loadTeamStatsRawToBqResult
The second example with the schema added directly in the Yaml
:
main:
params: [ workflowConfig ]
steps:
- init:
assign:
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- location_id: "global"
- loadTeamStatsRawToBq:
call: googleapis.bigquery.v2.jobs.insert
args:
projectId: ${project_id}
body:
configuration:
load:
destinationTable:
datasetId: ${workflowConfig.dataset}
projectId: ${project_id}
tableId: ${workflowConfig.team_stat_raw_table}
schema:
fields:
- name: "teamScore"
type: "INTEGER"
mode: "NULLABLE"
- name: "teamName"
type: "STRING"
mode: "NULLABLE"
- name: "scorers"
type: "RECORD"
mode: "REPEATED"
fields:
- name: "games"
type: "INTEGER"
mode: "NULLABLE"
- name: "goalAssists"
type: "INTEGER"
mode: "NULLABLE"
- name: "goals"
type: "INTEGER"
mode: "NULLABLE"
- name: "scorerLastName"
type: "STRING"
mode: "NULLABLE"
- name: "scorerFirstName"
type: "STRING"
mode: "NULLABLE"
sourceFormat: ${workflowConfig.team_stats_raw_bq_source_format}
sourceUris: ${workflowConfig.team_stats_raw_files_hot_source_uri}
createDisposition: ${workflowConfig.team_stats_raw_bq_create_disposition}
writeDisposition: ${workflowConfig.team_stats_raw_bq_write_disposition}
result: loadTeamStatsRawToBqResult
The Json
format is more standard and shareable between different resources (gcloud commands, Terraform, Airflow…), we prefer using this approach in the rest of the code.
Transform raw to domain data with a SQL query and BigQuery
This step corresponds to the transform part of our ELT pipeline. In this case, the transform part is done with a SQL query executed from BigQuery.
In this first execution, the SQL query was directly added in the YAML, but in the execution with Terraform in the next section, we will pass the query as template string (placeholder).
main:
params: [ workflowConfig ]
steps:
- init:
assign:
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- location_id: "global"
- loadTeamStatsRawToBq:
...
- runQueryTransformToTeamStatsDomainAndLoadToBQ:
call: googleapis.bigquery.v2.jobs.query
args:
projectId: ${project_id}
body:
useLegacySql: false
query: INSERT INTO `mazlum_test.team_stat`
(
teamName,
teamScore,
teamSlogan,
teamTotalGoals,
topScorerStats,
bestPasserStats,
ingestionDate
)
SELECT
team_stats.teamName,
team_stats.teamScore,
team_slogan.teamSlogan,
sum(scorer.goals) as teamTotalGoals,
ARRAY_AGG(
STRUCT(
scorer.scorerFirstName AS firstName,
scorer.scorerLastName AS lastName,
scorer.goals AS goals,
scorer.games AS games
)
ORDER BY scorer.goals DESC LIMIT 1
)[OFFSET(0)] AS topScorerStats,
ARRAY_AGG(
STRUCT(
scorer.scorerFirstName AS firstName,
scorer.scorerLastName AS lastName,
scorer.goalAssists AS goalAssists,
scorer.games AS games
)
ORDER BY scorer.goalAssists DESC LIMIT 1
)[OFFSET(0)] AS bestPasserStats,
current_timestamp() as ingestionDate
FROM `mazlum_test.team_stat_raw` team_stats
INNER JOIN `mazlum_test.team_slogan` team_slogan ON team_stats.teamName = team_slogan.teamName,
UNNEST(team_stats.scorers) AS scorer
GROUP BY
team_stats.teamName,
team_stats.teamScore,
team_slogan.teamSlogan
result: queryResult
Move processed files from a hot bucket to a cold bucket
In the last step of the pipeline, we want to move the input and processed files from an input to a destination bucket (hot to cold).
The standard storage.api
doesn’t support the move of several objects between buckets.
To solve this issue, we can use a Cloud Build job from Workflows. The advantage is to have access to the gcloud
cli that supports the move of several objects between buckets.
This system is really powerful, because from Cloud Build, we have access to many tools offered by the Docker ecosystem.
In this example, we use a gsutil cp
to copy the objects from the hot to the cold bucket, with the use of a wildcard
: gs://mazlum_dev/workflows/team_league/elt/hot/*.json
main:
params: [ workflowConfig ]
steps:
- init:
assign:
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- location_id: "global"
- loadTeamStatsRawToBq:
...
- runQueryTransformToTeamStatsDomainAndLoadToBQ:
...
- copyProcessedFilesToColdBucket:
call: googleapis.cloudbuild.v1.projects.builds.create
args:
projectId: ${project_id}
parent: ${"projects/" + project_id + "/locations/" + location_id}
body:
serviceAccount: ${sys.get_env("GOOGLE_CLOUD_SERVICE_ACCOUNT_NAME")}
options:
logging: CLOUD_LOGGING_ONLY
steps:
- name: gcr.io/google.com/cloudsdktool/cloud-sdk:455.0.0-slim
script: ${"gsutil cp " + workflowConfig.team_stats_raw_files_hot_source_uri + " " + workflowConfig.team_stats_raw_files_cold_destination_uri}
result: resultCloudBuildCopy
The last step of the pipeline, returns the result of the Cloud Build operation :
main:
params: [ workflowConfig ]
steps:
- init:
assign:
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- location_id: "global"
- loadTeamStatsRawToBq:
...
- runQueryTransformToTeamStatsDomainAndLoadToBQ:
..
- copyProcessedFilesToColdBucket:
...
- returnResult:
return: ${resultCloudBuildCopy}
2.2.2 Set environment variables
#!/bin/bash
export PROJECT_ID={{project}}
export LOCATION={{location}}
# Workflow
export WORKFLOW_NAME=team-league-elt-gcs-schema
export WORKFLOW_SOURCE=workflow/team_league_elt_gcs_file_schema.yaml
export WORKFLOW_SA={{sa_email}}
export WORKFLOW_CONFIG_FILE_PATH=./workflow/config/workflow_config.json
# Workflow Scheduler
export WORKFLOW_URI=https://workflowexecutions.googleapis.com/v1/projects/gb-poc-373711/locations/europe-west1/workflows/$WORKFLOW_NAME/executions
export WORKFLOW_SCHEDULER_NAME=team-league-elt-gcs-schema-cron-job
export WORKFLOW_SCHEDULER_INTERVAL="0 0 1 * *"
export WORKFLOW_SCHEDULER_TIME_ZONE=Europe/Paris
export WORKFLOW_SCHEDULER_SA={{sa_email}}
# Terraform
export TF_STATE_BUCKET=gb-poc-terraform-state
export TF_STATE_PREFIX=testmazlum
export GOOGLE_PROVIDER_VERSION="= 5.8.0"
2.2.3 Deploy the Workflows with Cloud Build
In this section we will deploy the Workflow with Cloud Build :
deploy-workflow.yaml
file :
We have a step from the cloud-sdk
Docker image, then we invoke bash
script
steps:
- name: gcr.io/google.com/cloudsdktool/cloud-sdk:455.0.0-slim
entrypoint: 'bash'
args:
- '-c'
- |
./scripts/deploy_workflow.sh
env:
- 'PROJECT_ID=$PROJECT_ID'
- 'LOCATION=$LOCATION'
- 'WORKFLOW_NAME=$_WORKFLOW_NAME'
- 'WORKFLOW_SOURCE=$_WORKFLOW_SOURCE'
- 'WORKFLOW_SA=$_WORKFLOW_SA'
deploy_workflow.sh
script :
#!/usr/bin/env bash
set -e
set -o pipefail
set -u
echo "############# Deploying the workflow... #######################"
gcloud workflows deploy "$WORKFLOW_NAME" \
--source="$WORKFLOW_SOURCE" \
--location "$LOCATION" \
--service-account "$WORKFLOW_SA"
echo "############# The workflow was deployed successfully #######################"
The gcloud workflows deploy
command allows the deployment of the Workflow. We give :
- The Workflow name
- The Workflow source that corresponds the workflow file path :
workflow/team_league_elt_gcs_file_schema.yaml
- The location
- The service account dedicated to the Workflow
After the deployment, the workflow appears in the dedicated menu in Google Cloud :
2.2.4 Run the Workflows with Cloud Build
In this section we will run the Workflow with Cloud Build. The Workflow takes runtime arguments.
run-workflow.yaml
file :
- We have a step from the
cloud-sdk
Docker image - We install a tool called
jq
because we need to parse theJson
config file and get it as string - We invoke the
run_workflow.sh
bash script
steps:
- name: gcr.io/google.com/cloudsdktool/cloud-sdk:455.0.0-slim
entrypoint: 'bash'
args:
- '-c'
- |
apt-get -yq install jq \
&& ./scripts/run_workflow.sh
env:
- 'PROJECT_ID=$PROJECT_ID'
- 'LOCATION=$LOCATION'
- 'WORKFLOW_CONFIG_FILE_PATH=$_WORKFLOW_CONFIG_FILE_PATH'
- 'WORKFLOW_NAME=$_WORKFLOW_NAME'
The run_workflow.sh
script :
#!/usr/bin/env bash
set -e
set -o pipefail
set -u
echo "############# Running the workflow... #######################"
echo "############# The config is #######################"
cat "$WORKFLOW_CONFIG_FILE_PATH"
escaped_json_config=$(jq -c . <"$WORKFLOW_CONFIG_FILE_PATH")
gcloud workflows run "$WORKFLOW_NAME" \
--location "$LOCATION" \
--data="$escaped_json_config"
echo "############# The workflow was run successfully #######################"
jq
allows to read the config file and return it as string in one line, the command line with hardcoded parameters looks like :
gcloud workflows run team-league-elt-gcs-schema \
--location europe-west1 \
--data='{"workflow_name":"team_league_workflow_elt","team_stats_raw_bq_create_disposition":"CREATE_NEVER","team_stats_raw_bq_write_disposition":"WRITE_APPEND","team_stats_raw_bq_source_format":"NEWLINE_DELIMITED_JSON","team_stats_raw_bq_schema_uri":"gs://mazlum_dev/workflows/team_league/schema/team_stats_raw_table_schema.json","dataset":"mazlum_test","team_stat_table":"team_stat","team_stat_raw_table":"team_stat_raw","team_stats_raw_files_hot_source_uri":"gs://mazlum_dev/workflows/team_league/elt/hot/*.json","team_stats_raw_files_cold_destination_uri":"gs://mazlum_dev/workflows/team_league/elt/cold/"}'
When we access to the workflow, we can see all the executions :
We click on the link in the execution ID column and access to the Workflow detail :
- The summary
- The input
- The output
- At the right side, we have access to the graph of the workflow (DAG)
The workflow can also be directly launched from the console with a click on the execute
button :
2.2.5 Create a cron job with a Scheduler, that will execute the workflow
In this section, we will create a cron job with a scheduler that will execute the workflow while passing runtime arguments.
We can also trigger a workflow with an event driven approach, but we will cover this aspect in a dedicated article.
steps:
- name: gcr.io/google.com/cloudsdktool/cloud-sdk:455.0.0-slim
entrypoint: 'bash'
args:
- '-c'
- |
apt-get -yq install jq \
&& ./scripts/deploy_workflow_scheduler.sh
env:
- 'PROJECT_ID=$PROJECT_ID'
- 'LOCATION=$LOCATION'
- 'WORKFLOW_CONFIG_FILE_PATH=$_WORKFLOW_CONFIG_FILE_PATH'
- 'WORKFLOW_URI=$_WORKFLOW_URI'
- 'WORKFLOW_SCHEDULER_NAME=$_WORKFLOW_SCHEDULER_NAME'
- 'WORKFLOW_SCHEDULER_INTERVAL=$_WORKFLOW_SCHEDULER_INTERVAL'
- 'WORKFLOW_SCHEDULER_TIME_ZONE=$_WORKFLOW_SCHEDULER_TIME_ZONE'
- 'WORKFLOW_SCHEDULER_SA=$_WORKFLOW_SCHEDULER_SA'
We install jq
and launch the bash script deploy_workflow_scheduler.sh
#!/usr/bin/env bash
set -e
set -o pipefail
set -u
echo "############# Deploying the scheduler for the workflow... #######################"
echo "############# The config is #######################"
cat "$WORKFLOW_CONFIG_FILE_PATH"
escaped_json_config=$(jq -R -s '. | gsub("[\\n\\t]"; "")' <"$WORKFLOW_CONFIG_FILE_PATH")
gcloud scheduler jobs create http "$WORKFLOW_SCHEDULER_NAME" \
--schedule="$WORKFLOW_SCHEDULER_INTERVAL" \
--location "$LOCATION" \
--uri="$WORKFLOW_URI" \
--message-body="{\"argument\": ${escaped_json_config} }" \
--time-zone="$WORKFLOW_SCHEDULER_TIME_ZONE" \
--oauth-service-account-email="$WORKFLOW_SCHEDULER_SA"
echo "############# The scheduler was deployed successfully... #######################"
Some parameters are passed by the scheduler :
- The scheduler name
- The schedule interval (cron)
- The location
- The Workflow URI
- Message body : the scheduler needs to pass the runtime arguments when launching the workflow. The
Json
config file is parsed and escaped withjq
and passed to thegcloud scheduler
command line - Scheduler timezone
- The service account used for the scheduler
The command line with hardcoded parameters looks like :
gcloud scheduler jobs create http team-league-elt-gcs-schema-cron-job \
--schedule="0 0 1 * *" \
--location europe-west1 \
--uri="https://workflowexecutions.googleapis.com/v1/projects/gb-poc-373711/locations/europe-west1/workflows/team-league-elt-gcs-schema/executions" \
--message-body="{\"argument\": \"{\\\"workflow_name\\\": \\\"team_league_workflow_elt\\\",\\\"team_stats_raw_bq_create_disposition\\\": \\\"CREATE_NEVER\\\",\\\"team_stats_raw_bq_write_disposition\\\": \\\"WRITE_APPEND\\\",\\\"team_stats_raw_bq_source_format\\\": \\\"NEWLINE_DELIMITED_JSON\\\",\\\"team_stats_raw_bq_schema_uri\\\": \\\"gs://mazlum_dev/workflows/team_league/schema/team_stats_raw_table_schema.json\\\",\\\"dataset\\\": \\\"mazlum_test\\\",\\\"team_stat_table\\\": \\\"team_stat\\\",\\\"team_stat_raw_table\\\": \\\"team_stat_raw\\\",\\\"team_stats_raw_files_hot_source_uri\\\": \\\"gs://mazlum_dev/workflows/team_league/elt/hot/*.json\\\",\\\"team_stats_raw_files_cold_destination_uri\\\": \\\"gs://mazlum_dev/workflows/team_league/elt/cold/\\\"}\"}" \
--time-zone="Europe/Paris" \
--oauth-service-account-email="sa-workflows-dev@gb-poc-373711.iam.gserviceaccount.com"
The scheduler appears in the dedicated page in Google Cloud
When we click on the workflow name, we can access to the scheduler detail :
The workflow will be triggered one time per month with the following cron expression : 0 0 1 * *
We can also trigger the workflow manually with the Force run button :
2.3 Execute the ELT Workflow with Cloud Build and Terraform
In this last section, we will create the scheduler and the workflow with Terraform
instead of gcloud
commands.
It is a good approach to manage and automate the creation of elements with a IaC tool like Terraform, that’s why this use case presents a complete IaC code to create the resources.
2.3.1 The Cloud Build part
We have 2 Yaml
files and Cloud Build jobs to launch the IaC part :
Plan : deploy-workflow-scheduler-terraform-plan.yaml
steps:
- name: alpine/terragrunt:1.6.5
script: |
terragrunt run-all init
terragrunt run-all plan --out tfplan.out
terragrunt run-all apply --terragrunt-non-interactive tfplan.out
dir: 'infra'
env:
- 'TF_VAR_project_id=$PROJECT_ID'
- 'TF_VAR_location=$LOCATION'
- 'TF_STATE_BUCKET=$_TF_STATE_BUCKET'
- 'TF_STATE_PREFIX=$_TF_STATE_PREFIX'
- 'TF_VAR_workflow_name=$_WORKFLOW_NAME'
- 'TF_VAR_workflow_source=$_WORKFLOW_SOURCE'
- 'TF_VAR_workflow_uri=$_WORKFLOW_URI'
- 'TF_VAR_workflow_sa=$_WORKFLOW_SCHEDULER_SA'
- 'TF_VAR_workflow_scheduler_name=$_WORKFLOW_SCHEDULER_NAME'
- 'TF_VAR_workflow_scheduler_interval=$_WORKFLOW_SCHEDULER_INTERVAL'
- 'TF_VAR_workflow_scheduler_timezone=$_WORKFLOW_SCHEDULER_TIME_ZONE'
- 'TF_VAR_workflow_scheduler_sa=$_WORKFLOW_SCHEDULER_SA'
- 'GOOGLE_PROVIDER_VERSION=$_GOOGLE_PROVIDER_VERSION'
Apply : deploy-workflow-scheduler-terraform-apply.yaml
steps:
- name: alpine/terragrunt:1.6.5
script: |
terragrunt run-all init
terragrunt run-all plan --out tfplan.out
terragrunt run-all apply --terragrunt-non-interactive tfplan.out
dir: 'infra'
env:
- 'TF_VAR_project_id=$PROJECT_ID'
- 'TF_VAR_location=$LOCATION'
- 'TF_STATE_BUCKET=$_TF_STATE_BUCKET'
- 'TF_STATE_PREFIX=$_TF_STATE_PREFIX'
- 'TF_VAR_workflow_name=$_WORKFLOW_NAME'
- 'TF_VAR_workflow_source=$_WORKFLOW_SOURCE'
- 'TF_VAR_workflow_uri=$_WORKFLOW_URI'
- 'TF_VAR_workflow_sa=$_WORKFLOW_SCHEDULER_SA'
- 'TF_VAR_workflow_scheduler_name=$_WORKFLOW_SCHEDULER_NAME'
- 'TF_VAR_workflow_scheduler_interval=$_WORKFLOW_SCHEDULER_INTERVAL'
- 'TF_VAR_workflow_scheduler_timezone=$_WORKFLOW_SCHEDULER_TIME_ZONE'
- 'TF_VAR_workflow_scheduler_sa=$_WORKFLOW_SCHEDULER_SA'
- 'GOOGLE_PROVIDER_VERSION=$_GOOGLE_PROVIDER_VERSION'
In the 2 Yaml
files, we pass variables to the Terraform
part and execute the IaC resources with the dedicated command line. We use Terragrunt
in this example for simplicity.
2.3.2 The Terraform and IaC part
The root folder containing all the infra code is infra
The resource
folder contains :
- The workflow yaml code
resource/workflow/team_league_elt_gcs_file_schema.yaml
- The SQL query
resource/workflow/config/query/compute_and_insert_team_stats_data.sql
- The workflow runtime configuration
resource/workflow/config/workflow_config.json
The workflow yaml code :
main:
params: [ workflowConfig ]
steps:
- init:
assign:
- project_id: ${sys.get_env("GOOGLE_CLOUD_PROJECT_ID")}
- location_id: "global"
- loadTeamStatsRawToBq:
call: googleapis.bigquery.v2.jobs.insert
args:
projectId: ${project_id}
body:
configuration:
load:
destinationTable:
datasetId: ${workflowConfig.dataset}
projectId: ${project_id}
tableId: ${workflowConfig.team_stat_raw_table}
referenceFileSchemaUri: ${workflowConfig.team_stats_raw_bq_schema_uri}
sourceFormat: ${workflowConfig.team_stats_raw_bq_source_format}
sourceUris: ${workflowConfig.team_stats_raw_files_hot_source_uri}
createDisposition: ${workflowConfig.team_stats_raw_bq_create_disposition}
writeDisposition: ${workflowConfig.team_stats_raw_bq_write_disposition}
result: loadTeamStatsRawToBqResult
- runQueryTransformToTeamStatsDomainAndLoadToBQ:
call: googleapis.bigquery.v2.jobs.query
args:
projectId: ${project_id}
body:
useLegacySql: false
query: "{{sql_query}}"
result: queryResult
- copyProcessedFilesToColdBucket:
call: googleapis.cloudbuild.v1.projects.builds.create
args:
projectId: ${project_id}
parent: ${"projects/" + project_id + "/locations/" + location_id}
body:
serviceAccount: ${sys.get_env("GOOGLE_CLOUD_SERVICE_ACCOUNT_NAME")}
options:
logging: CLOUD_LOGGING_ONLY
steps:
- name: gcr.io/google.com/cloudsdktool/cloud-sdk:455.0.0-slim
script: ${"gsutil cp " + workflowConfig.team_stats_raw_files_hot_source_uri + " " + workflowConfig.team_stats_raw_files_cold_destination_uri}
result: resultCloudBuildCopy
- returnResult:
return: ${resultCloudBuildCopy}
The principle is identical to the previous part with the exception that the SQL query is passed as a placeholder in the workflow code : {{sql_query}}
The SQL query
INSERT INTO `${project_id}.${dataset}.${team_stat_table}`
(
teamName,
teamScore,
teamSlogan,
teamTotalGoals,
topScorerStats,
bestPasserStats,
ingestionDate
)
SELECT
team_stats.teamName,
team_stats.teamScore,
team_slogan.teamSlogan,
sum(scorer.goals) as teamTotalGoals,
ARRAY_AGG(
STRUCT(
scorer.scorerFirstName AS firstName,
scorer.scorerLastName AS lastName,
scorer.goals AS goals,
scorer.games AS games
)
ORDER BY scorer.goals DESC LIMIT 1
)[OFFSET(0)] AS topScorerStats,
ARRAY_AGG(
STRUCT(
scorer.scorerFirstName AS firstName,
scorer.scorerLastName AS lastName,
scorer.goalAssists AS goalAssists,
scorer.games AS games
)
ORDER BY scorer.goalAssists DESC LIMIT 1
)[OFFSET(0)] AS bestPasserStats,
current_timestamp() as ingestionDate
FROM `${project_id}.${dataset}.${team_stat_raw_table}` team_stats
INNER JOIN `${project_id}.${dataset}.team_slogan` team_slogan ON team_stats.teamName = team_slogan.teamName,
UNNEST(team_stats.scorers) AS scorer
GROUP BY
team_stats.teamName,
team_stats.teamScore,
team_slogan.teamSlogan
The query contains dynamic parameters that will be resolved by Terraform
template.
The Terraform variables : infra/variables.tf
:
variable "project_id" {
description = "Project ID, used to enforce providing a project id."
type = string
}
variable "location" {
description = "Location."
type = string
}
variable "workflow_name" {
description = "Workflow name."
type = string
}
.....
This file contains all the Terraform variables. It corresponds to the TF_VAR_
env variables passed by Cloud Build to Terraform.
The Terraform local variables : infra/locals.tf
:
locals {
team_league_workflow_yaml_as_string = file("${path.module}/resource/workflow/team_league_elt_gcs_file_schema.yaml")
team_league_workflow_yaml_config_as_string = file("${path.module}/resource/workflow/config/workflow_config.json")
team_league_workflow_yaml_config = jsondecode(file("${path.module}/resource/workflow/config/workflow_config.json"))
compute_and_insert_team_stats_data_query = file("${path.module}/resource/workflow/config/query/compute_and_insert_team_stats_data.sql")
}
team_league_workflow_yaml_as_string
: load the workflowYaml
file asstring
, via thefile
Terraform operator. Thisstring
content will be then used by the Terraform resource to create the workflowteam_league_workflow_yaml_config_as_string
: load the workflow configJson
file asstring
. We usefile
again here.team_league_workflow_yaml_config
: load the workflow config json file as TerraformMap
. It will be necessary in the Terraform resources.compute_and_insert_team_stats_data_query
: load theSQL
query file asstring
withfile
The Terraform resources : infra/locals.tf
:
data "template_file" "raw_to_domain_query_template" {
template = local.compute_and_insert_team_stats_data_query
vars = {
project_id = var.project_id
dataset = local.team_league_workflow_yaml_config["dataset"]
team_stat_table = local.team_league_workflow_yaml_config["team_stat_table"]
team_stat_raw_table = local.team_league_workflow_yaml_config["team_stat_raw_table"]
}
}
resource "google_cloud_scheduler_job" "job" {
project = var.project_id
region = var.location
name = var.workflow_scheduler_name
description = "Scheduler for team league workflow"
schedule = var.workflow_scheduler_interval
time_zone = var.workflow_scheduler_timezone
attempt_deadline = "320s"
http_target {
body = base64encode(
jsonencode({
"argument" : local.team_league_workflow_yaml_config_as_string,
"callLogLevel" : "CALL_LOG_LEVEL_UNSPECIFIED"
}
))
http_method = "POST"
uri = "https://workflowexecutions.googleapis.com/v1/projects/${var.project_id}/locations/${var.location}/workflows/${var.workflow_name}/executions"
oauth_token {
scope = "https://www.googleapis.com/auth/cloud-platform"
service_account_email = var.workflow_scheduler_sa
}
}
}
resource "google_workflows_workflow" "workflow_elt_team_league" {
depends_on = [
data.template_file.raw_to_domain_query_template
]
project = var.project_id
region = var.location
name = var.workflow_name
description = "Workflow for team league ELT"
service_account = var.workflow_sa
source_contents = replace(local.team_league_workflow_yaml_as_string, "{{sql_query}}", data.template_file.raw_to_domain_query_template.rendered)
}
Terraform template :
The first data
block retrieves the SQL query as string
from locals and resolve parameters with Terraform
template : template_file
Some elements are retrieved from the configuration as Map
provided by locals.
The resource to create the scheduler :
The resource google_cloud_scheduler_job
creates the scheduler. The config as string
is passed to the http_target/body
encoded in base 64.
All the other elements are retrieved from Terraform variables.
The resource to create the scheduler :
The resource google_workflows_workflow
creates and deploys the workflow.
The depend_on
allows adding a dependency between the SQL query prepared by Terraform template and the current resource to create the workflow. So, the elements will be created in the correct order by Terraform.
In the source_contents
we pass the workflow as string
and replace the SQL query :
source_contents = replace(local.team_league_workflow_yaml_as_string, "{{sql_query}}", data.template_file.raw_to_domain_query_template.rendered)
Conclusion
This article showed a complete and real world use case with an ELT pipeline using Cloud Workflows.
Workflows presents the advantage to be serverless, cost effective and lightweight and fit well for an ELT.
GCP Developers can be quickly familiar with the Workflow’s system, because it’s based on API calls and native Google Cloud APIs.
To be as complete as possible, two executions with gcloud commands and Terraform resource were presented in detail.
For more complexe orchestration and industrialisation, developers can test and choose the solution that suits them best, Airflow and Composer could also be a good candidate.
Also, it’s really important to not add complexe logic in the YAML to keep a good maintainability and readability.
All the code shared on this article is accessible from my Github
repository :
If you like my articles, videos and want to see my posts, follow me on :