GCP Dataform Orchestration With Google Workflows

Denis Zaboronsky
6 min readSep 5, 2023

--

Disclaimer: all opinions are my own and don’t necessarily reflect the opinions of my employer.

Motivation

With Dataform being released on GCP I wanted to write a brief article on how to interact with the new API endpoints via google cloud workflows. I find this is an excellent way to set up a lightweight orchestration pipeline, especially if you have multiple GCP services being linked together.

Note: this article assumes base familiarity with either the old or new Dataform objects.

Key Concepts

Release Configurations

The release configurations are set up in the Dataform console and are similar to environments in the legacy version of the product. These are incredibly useful if you want Dataform to write to different projects or datasets once you merge to your changes to a specific branch.

A release configuration is linked to a branch in your repository and can contain compilation over-rides for properties such as the project. In this example the release configuration writes to a different project than the default project and points to the master branch. This means only merged code will be run from this configuration.

Tags

Tags may be applied to each object in Dataform. They can then be called during invocation to only run part of the project. This becomes very useful if your Dataform project has SQLX objects that need to be run only some of the time, this being particularly useful if you are triggering Dataform on a workflow that only impacts some of the objects in the project.

IAMs

To interact with the Dataform API the identity needs to have the Dataform editor (roles/dataform.editor) permission in the appropriate project. The creator will also need the workflows editor permission to create the workflow (roles/workflows.editor). If you need to use a third identity to invoke the workflow this will need the workflow invoker role (roles/workflows.invoker).

You will notice in each step below that authentication is then done using the OAuth2 method so these permissions can be utilised at each step.

Google Cloud Workflows

Google cloud workflows is a lightweight HTTP orchestrator that is particularly good for linking the interaction between google services. It is YAML/JSON based and has the benefit of being deployable via your choice of infrastructure as code alongside other GCP services.

While workflows is not a full fat orchestrator like airflow, it is incredibly lightweight, cheap to run and fully serverless. This means if you have short extract/load jobs running in something like cloud functions or cloud run, it is perfect for linking the subsequent SQL Dataform transforms to these. I personally find this combination cheap to run and easy to maintain.

The Pipeline

This pipeline will go through the end to end process I use when interacting with Dataform with workflows. There are other ways to do this which I will highlight where appropriate, as well as flagging some gotcha’s the I came across during development.

A diagram of my suggested implementation looks like:

Set Up Repository

First of all it helps to set up the Dataform repository string. This takes the form: projects/${project_id}/locations/${location}/repositories/${repository_name} and will look like:

"dataform": {
"params": ["input"],
"steps": [
{
"init": {
"assign": [
{
"repository": "projects/${project_id}/locations/${location}/repositories/${repository_name}"
}
]https://cloud.google.com/dataform/reference/rest
}
},

Refresh Release Config (Optional)

There are multiple ways to interact with the release config. The default is currently to set up a scheduled refresh of the config in Dataform or to manually refresh it. I personally don’t like these approaches since it means that running a Dataform job straight after merge risks not picking up changes. As such the two approaches I considered were:

  • Refresh the config when starting the Dataform run
  • On merges to the main branch in CI refresh the config

In general I prefer the latter approach as this minimises how often you refresh the config, however for the sake of speed and to reduce tooling I actually used the former.

To do this you can call the compilationResults endpoint with a post request. You will need to have set up the repository and pass through the release configuration name. This will look like:

{
"createCompilationResult": {
"call": "http.post",
"args": {
"url": "${\"https://dataform.googleapis.com/v1beta1/\" + repository + \"/compilationResults\"}",
"auth": {
"type": "OAuth2"
},
"body": {
"releaseConfig": "${repository + \"/releaseConfigs/\" + \"${releaseConfigName}\"}"
}
},
"result": "compilationResult"
}
}

Note: I have marked this as optional as you may choose to have your release config refreshed on a schedule or on merge. In this case you will need to replace this step with getting the latest compilation compilation result.

IMPORTANT NOTE: I have my Dataform set up with a github integration. Unfortunately this API call is flaky and so I have to run it in a loop with a retry predicate on it (otherwise I used to see frequent pipeline failures). This makes the interaction look like:

{
"createCompilationResult": {
"try": {
"call": "http.post",
"args": {
"url": "${\"https://dataform.googleapis.com/v1beta1/\" + repository + \"/compilationResults\"}",
"auth": {
"type": "OAuth2"
},
"body": {
"releaseConfig": "${repository + \"/releaseConfigs/\" + \"${env}\"}"
}
},
"result": "compilationResult"
},
"retry": {
"predicate": "${custom_predicate}",
"max_retries": 3,
"backoff": {
"initial_delay": 2,
"max_delay": 60,
"multiplier": 2
}
}
}
},
...
"custom_predicate": {
"params": ["e"],
"steps": [
{
"what_to_repeat": {
"switch": [
{
"condition": "${e.code == 400}",
"return": true
}
]
}
},
{
"otherwise": {
"return": false
}
}
]
}

Start Execution

Now that you have the compilationResult from the previous step you can trigger the run to start. This is an asyncronous request, and accepts arguments such as tags. It looks like:

{
"createWorkflowInvocation": {
"call": "http.post",
"args": {
"url": "${\"https://dataform.googleapis.com/v1beta1/\" + repository + \"/workflowInvocations\"}",
"auth": {
"type": "OAuth2"
},
"body": {
"compilationResult": "${compilationResult.body.name}",
"invocationConfig": {
"includedTags": ["${input.tag}"]
}
}
},
"result": "workflowInvocation"
}
},

Loop Execution Checks

Since the execution of the Dataform job is asynchronous, if you want to see how the run finishes you will need to keep checking the status. In workflows this can be achieved with a switch as:

{
"checkWorkflowInvocationState": {
"call": "http.get",
"args": {
"url": "${\"https://dataform.googleapis.com/v1beta1/\" + workflowInvocation.body.name}",
"auth": {
"type": "OAuth2"
}
},
"result": "invocationState"
}
},
{
"sleepGate": {
"switch": [
{
"condition": "${invocationState.body.state == \"RUNNING\"}",
"next": "sleep"
},
{
"condition": "${invocationState.body.state == \"SUCCEEDED\"}",
"next": "complete"
}
],
"next": "raiseRunError"
}
},

{
"sleep": {
"call": "sys.sleep",
"args": {
"seconds": 3
},
"next": "checkWorkflowInvocationState"
}
},

{
"raiseRunError": {
"raise": "dataformFinishException"
}
},
{
"complete": {
"return": "${invocationState.body.state}"
}
}

This will check the status of the Dataform job every 3 seconds, and then either sleep, return a successful statues or raise an exception based on the response back from the Dataform API.

Final Pipeline

The final pipeline will look something like:

{
"dataform": {
"params": ["input"],
"steps": [
{
"init": {
"assign": [
{
"url": "${\"https://dataform.googleapis.com/v1beta1/\" + repository + \"/compilationResults\"}",
}
]
}
},
{
"createCompilationResult": {
"try": {
"call": "http.post",
"args": {
"url": "${\"https://dataform.googleapis.com/v1beta1/\" + repository + \"/compilationResults\"}",
"auth": {
"type": "OAuth2"
},
"body": {
"releaseConfig": "${repository + \"/releaseConfigs/\" + \"production\"}"
}
},
"result": "compilationResult"
},
"retry": {
"predicate": "${custom_predicate}",
"max_retries": 3,
"backoff": {
"initial_delay": 2,
"max_delay": 60,
"multiplier": 2
}
}
}
},
{
"createWorkflowInvocation": {
"call": "http.post",
"args": {
"url": "${\"https://dataform.googleapis.com/v1beta1/\" + repository + \"/workflowInvocations\"}",
"auth": {
"type": "OAuth2"
},
"body": {
"compilationResult": "${compilationResult.body.name}",
"invocationConfig": {
"includedTags": ["${input.tag}"]
}
}
},
"result": "workflowInvocation"
}
},
{
"checkWorkflowInvocationState": {
"call": "http.get",
"args": {
"url": "${\"https://dataform.googleapis.com/v1beta1/\" + workflowInvocation.body.name}",
"auth": {
"type": "OAuth2"
}
},
"result": "invocationState"
}
},
{
"sleepGate": {
"switch": [
{
"condition": "${invocationState.body.state == \"RUNNING\"}",
"next": "sleep"
},
{
"condition": "${invocationState.body.state == \"SUCCEEDED\"}",
"next": "complete"
}
],
"next": "raiseRunError"
}
},

{
"sleep": {
"call": "sys.sleep",
"args": {
"seconds": 3
},
"next": "checkWorkflowInvocationState"
}
},

{
"raiseRunError": {
"raise": "dataformFinishException"
}
},
{
"complete": {
"return": "${invocationState.body.state}"
}
}
]
},
"custom_predicate": {
"params": ["e"],
"steps": [
{
"what_to_repeat": {
"switch": [
{
"condition": "${e.code == 400}",
"return": true
}
]
}
},
{
"otherwise": {
"return": false
}
}
]
}
}

Useful Reads

The getting started docs on dataform were the basis for this and are a great read: https://cloud.google.com/dataform/docs/schedule-executions-workflows

Also useful is the specific API reference: https://cloud.google.com/dataform/reference/rest

Cloud workflow docs: https://cloud.google.com/workflows/docs

--

--

Denis Zaboronsky

Senior Data Engineer at Oxwash, passionate about data engineering and the environment