Using Cloud Workflows to load Cloud Storage files into BigQuery

Márton Kodok
Nov 30, 2020 · 6 min read
Loading Data Into BigQuery From Cloud Storage by using Cloud Workflows.
Loading Data Into BigQuery From Cloud Storage by using Cloud Workflows.
Loading Data Into BigQuery From Cloud Storage by using Cloud Workflows.

In this article, we will orchestrate and automate Google Cloud with serverless workflows.

We will create a Cloud Workflow to load data from Google Storage into BigQuery. This is a complete guide on how to work with workflows, connecting any Google Cloud APIs, working with subworkflows, arrays, extracting segments, and calling BigQuery load jobs.

There are various ways to process Cloud Storage files to BigQuery such as using a Cloud Function, by Eventarc triggers to Cloud Run services, a relatively new syntax is by using BigQuery create external table statement or the good old way via BQ CLI tool.

These require you to maintain a function, a container, a library, or SDK up to date, which means they need maintenance.

We are going to use Cloud Workflows to connect Cloud Storage API with BigQuery Jobs API for loading files into tables. Using the techniques that we’ll cover for this part, you will have a foundation to build any kind of serverless automation in Cloud Workflows, in YAML syntax, without maintenance.

Note: To get started with Cloud Workflows, check out my introductory presentation about: Serverless orchestration with Cloud Workflows.

Cloud Workflows

Image for post
Image for post
Cloud Workflows — orchestrate & integrate

In a nutshell, Workflows allows you to connect services together, anything that has a public API.

  • Workflow orchestration (step engine as a service)
  • Integrate any Google Cloud API, SaaS API, or private APIs
  • Out of the box authentication support for Google Cloud products
  • Fully managed service — requires no infrastructure or capacity planning
  • Serverless with Pay-per-use pricing model
  • Declarative workflow language using YAML syntax
  • Developer friendly built-in error handling with retries

Problem definition

Workflow steps to load Cloud Storage files into BigQuery.
Workflow steps to load Cloud Storage files into BigQuery.
Workflow steps to load Cloud Storage files into BigQuery.

Let’s assume we have all our source files in Google Storage. Files are organized in buckets, folders, and could be versioned. Our workflow definition will have multiple steps. We will start by using the GCS API to list files in a bucket, by using a folder as a filter. For each file then, we will further use parts from the filename to use in BigQuery’s generated table name. The workflow’s last step will be to load the GCS file into the indicated BigQuery table.

Cloud Storage file structure:

gs://bucket/folder/file_12_2020.csv
gs://bucket/folder/file_01_2021.csv

File contents are three columns: mydate:timestamp, col1:float, col2:float.

Subworkflow to read Cloud Storage objects

As you see in the subworkflow named GCSObjectsList, we have two input params, the bucket name and the prefix which is the folder path that we are using as a filter. This subworkflow has two steps, one is readItem which issues a CALL action using HTTP GET to connect with Cloud Storage API. We have indicated the workflow will use Oauth2 type authentication which is built-in and works out of the box. In the background, the default or the indicated service account that executes the Workflow will need to have permissions to read from Cloud Storage.

As the Workflow CALL action ends in a result assignment, we need a second step to return the relevant content.

As Cloud Workflows has inbuilt JSON conversion, the return statement simply returns only the items array.

Looping an array with Cloud Workflows

In the previous steps, we had items array returned which have two keys bucket, and name. As items is an array, we need to loop through and call ProcessItem subworkflow for each item. In Cloud Workflows to loop an array essentially, you need 4 steps: init, check_condition, iterate, and exit_loop steps. Each is easy to understand by just reading the YAML definition.

Now we have looped through the items, and we called ProcessItem subworkflow with project and gcsPath parameters.

Doing an intermediate step — Parsing and extracting a segment from the URI

A sample URI is: gs://bucket/folder/table_12_2020.csv from this we need to extract segment 12 as we want to name our new table: table_12. You could further engineer this to extract the BQ dataset or table name from the file naming by using a specific naming convention.

As we want to use a segment from the filename in the BigQuery destination table naming convention, we need to extract these from the URI path.

Cloud Workflows in 2020 doesn’t support yet inbuilt string parsing and regexp.

We are going to use BigQuery query syntax to parse and extract the segments from the URL and return them as a single row result. This way we will have an intermediate lesson on how to query from BigQuery and process the results.

The above query will output:

Image for post
Image for post

Workflow to execute synchronous BQ query

The above example has a couple of new concepts. Such as a standalone Cloud Workflow definition needs to have a main subworkflow as an entrypoint. Also you will notice the use of try/exception blocks in the BQJobsQuery subworkflow. The last BQParseResults workflow has a switch block to execute a condition if no rows were found.
At the end in the returning section, we are reading the first row(and only row in our example) with rows[0], and BQ API returns a structure with f and v keys to read out the values. So to return the first column value you need f[0].v key to get the value from returned JSON. To read the fifth column of a result you need to use f[4].v and so on.

Workflow to load Cloud Storage files into BigQuery

A simplified Workflow definition with some omitted syntax (but they are in the final example) to load a CSV file from Cloud Storage to BigQuery.

You will notice we defined a workflow for the BigQuery API Jobs endpoint, with the configuration setup for LOAD type Jobs. We have set the sourceURIs and the destination table based on params. The schema is set in the configuration objects exactly as it would be in a Python or Node.js script.

This call is asynchronous, and in this reduced example we are not addressing polling for the result of the import, but in production, you could have a LOG emitting step defined in case of an error. Polling in Workflows could be done by using the SLEEP definition and retry the status check.

Deploying and executing Cloud Workflows

To deploy your workflow, you need the source YAML file, that’s at the end of the article. You can deploy using Cloud Console, by API, or with gcloud command-line utility.

We recommend using VSCode as there you can setup the GCP Project Switcher extension, and also to define IDE tasks to automate, deploy, execute, and describe execution.

Once your workflow is deployed, you need to trigger the execution when you want to process your files. This could be a Cloud Scheduler job, manual execution triggered by your script, or by setting up an event observer based on Eventarc or Cloud Function trigger.

We have covered how to define multiple steps in Cloud Workflows to get a list of Cloud Storage files, to extract a segment from the filename, and to import it into a custom and dynamically changing BigQuery destination table. We were able to see Workflows definition for assignments, for HTTP-based GET and POST requests. It was a huge advantage that we have leverage the built-in Authentication and JSON parsing capabilities, and in some examples, we even used try/catch/exception syntaxes.

Our workflow is defined in YAML, it’s not based on any SDK or library version, so the maintenance of this Workflow is zero, can be left in place for years, and we are not forced to update every two years when old runtime versions are deprecated. It scales down to zero, and on-demand for requests as it’s serverless.

Cloud Workflows are super handy for everyday developers and for DevOps engineers who can automate the Cloud, interconnecting REST-based services. As it’s defined in YAML, it’s also a good starting point for juniors and kids to leverage automation knowledge.

Wrap Up

In the meantime, if you want to check it out, here are some links:

Feel free to reach out to me on Twitter @martonkodok or read my previous posts on medium/@martonkodok

Complete YAML workflow definition.

Google Cloud - Community

Google Cloud community articles and blogs

Márton Kodok

Written by

Speaker at conferences, a Google Developer Expert top user on Stackoverflow, software architect at REEA.net, co-founder IT Mures, life-long learner, mentor

Google Cloud - Community

A collection of technical articles and blogs published or curated by Google Cloud Developer Advocates. The views expressed are those of the authors and don't necessarily reflect those of Google.

Márton Kodok

Written by

Speaker at conferences, a Google Developer Expert top user on Stackoverflow, software architect at REEA.net, co-founder IT Mures, life-long learner, mentor

Google Cloud - Community

A collection of technical articles and blogs published or curated by Google Cloud Developer Advocates. The views expressed are those of the authors and don't necessarily reflect those of Google.

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store