Using Cloud Workflows to load Cloud Storage files into BigQuery
In this article, we will orchestrate and automate Google Cloud with serverless workflows.
We will create a Cloud Workflow to load data from Google Storage into BigQuery. This is a complete guide on how to work with workflows, connecting any Google Cloud APIs, working with subworkflows, arrays, extracting segments, and calling BigQuery load jobs.
There are various ways to process Cloud Storage files to BigQuery such as using a Cloud Function, by Eventarc triggers to Cloud Run services, a relatively new syntax is by using BigQuery create external table statement or the good old way via BQ CLI tool.
These require you to maintain a function, a container, a library, or SDK up to date, which means they need maintenance.
We are going to use Cloud Workflows to connect Cloud Storage API with BigQuery Jobs API for loading files into tables. Using the techniques that we’ll cover for this part, you will have a foundation to build any kind of serverless automation in Cloud Workflows, in YAML syntax, without maintenance.
Note: To get started with Cloud Workflows, check out my introductory presentation about: Serverless orchestration with Cloud Workflows.
What is Cloud Workflows?
In a nutshell, Workflows allows you to connect services together, anything that has a public API.
- Workflow orchestration (step engine as a service)
- Integrate any Google Cloud API, SaaS API, or private APIs
- Out of the box authentication support for Google Cloud products
- Fully managed service — requires no infrastructure or capacity planning
- Serverless with Pay-per-use pricing model
- Declarative workflow language using YAML syntax
- Developer friendly built-in error handling with retries
Let’s assume we have all our source files in Google Storage. Files are organized in buckets, folders, and could be versioned. Our workflow definition will have multiple steps. We will start by using the GCS API to list files in a bucket, by using a folder as a filter. For each file then, we will further use parts from the filename to use in BigQuery’s generated table name. The workflow’s last step will be to load the GCS file into the indicated BigQuery table.
Cloud Storage file structure:
File contents are three columns:
mydate:timestamp, col1:float, col2:float.
Subworkflow to read Cloud Storage objects
As you see in the subworkflow named GCSObjectsList, we have two input params, the bucket name and the prefix which is the folder path that we are using as a filter. This subworkflow has two steps, one is readItem which issues a CALL action using HTTP GET to connect with Cloud Storage API. We have indicated the workflow will use Oauth2 type authentication which is built-in and works out of the box. In the background, the default or the indicated service account that executes the Workflow will need to have permissions to read from Cloud Storage.
As the Workflow CALL action ends in a result assignment, we need a second step to return the relevant content.
As Cloud Workflows has inbuilt JSON conversion, the return statement simply returns only the items array.
Looping an array with Cloud Workflows
In the previous steps, we had items array returned which have two keys bucket, and name. As items is an array, we need to loop through and call ProcessItem subworkflow for each item. In Cloud Workflows to loop an array essentially, you need 4 steps: init, check_condition, iterate, and exit_loop steps. Each is easy to understand by just reading the YAML definition.
Now we have looped through the items, and we called ProcessItem subworkflow with project and gcsPath parameters.
Doing an intermediate step — Parsing and extracting a segment from the URI
A sample URI is:
gs://bucket/folder/table_12_2020.csv from this we need to extract segment 12 as we want to name our new table: table_12. You could further engineer this to extract the BQ dataset or table name from the file naming by using a specific naming convention.
As we want to use a segment from the filename in the BigQuery destination table naming convention, we need to extract these from the URI path.
Cloud Workflows in 2020 doesn’t support yet inbuilt string parsing and regexp.
We are going to use BigQuery query syntax to parse and extract the segments from the URL and return them as a single row result. This way we will have an intermediate lesson on how to query from BigQuery and process the results.
The above query will output:
Workflow to execute synchronous BQ query
The above example has a couple of new concepts. Such as a standalone Cloud Workflow definition needs to have a
main subworkflow as an entrypoint. Also you will notice the use of try/exception blocks in the BQJobsQuery subworkflow. The last BQParseResults workflow has a switch block to execute a condition if no rows were found.
At the end in the returning section, we are reading the first row(and only row in our example) with
rows, and BQ API returns a structure with
v keys to read out the values. So to return the first column value you need
f.v key to get the value from returned JSON. To read the fifth column of a result you need to use
f.v and so on.
Workflow to load Cloud Storage files into BigQuery
A simplified Workflow definition with some omitted syntax (but they are in the final example) to load a CSV file from Cloud Storage to BigQuery.
You will notice we defined a workflow for the BigQuery API Jobs endpoint, with the configuration setup for LOAD type Jobs. We have set the sourceURIs and the destination table based on params. The schema is set in the configuration objects exactly as it would be in a Python or Node.js script.
This call is asynchronous, and in this reduced example we are not addressing polling for the result of the import, but in production, you could have a LOG emitting step defined in case of an error. Polling in Workflows could be done by using the SLEEP definition and retry the status check.
Deploying and executing Cloud Workflows
To deploy your workflow, you need the source YAML file, that’s at the end of the article. You can deploy using Cloud Console, by API, or with gcloud command-line utility.
We recommend using VSCode as there you can setup the GCP Project Switcher extension, and also to define IDE tasks to automate, deploy, execute, and describe execution.
Once your workflow is deployed, you need to trigger the execution when you want to process your files. This could be a Cloud Scheduler job, manual execution triggered by your script, or by setting up an event observer based on Eventarc or Cloud Function trigger.
We have covered how to define multiple steps in Cloud Workflows to get a list of Cloud Storage files, to extract a segment from the filename, and to import it into a custom and dynamically changing BigQuery destination table. We were able to see Workflows definition for assignments, for HTTP-based GET and POST requests. It was a huge advantage that we have leverage the built-in Authentication and JSON parsing capabilities, and in some examples, we even used try/catch/exception syntaxes.
Our workflow is defined in YAML, it’s not based on any SDK or library version, so the maintenance of this Workflow is zero, can be left in place for years, and we are not forced to update every two years when old runtime versions are deprecated. It scales down to zero, and on-demand for requests as it’s serverless.
Cloud Workflows are super handy for everyday developers and for DevOps engineers who can automate the Cloud, interconnecting REST-based services. As it’s defined in YAML, it’s also a good starting point for juniors and kids to leverage automation knowledge.
In the meantime, if you want to check it out, here are some links:
- [slides] Serverless orchestration and automation with Cloud Workflows
- Workflows overview
- Workflows docs
- Sample VSCode/.tasks.json file for deploying to Workflows
- [video] Serverless orchestration and automation with Cloud Workflows
Complete YAML workflow definition.