Build a serverless BigQuery ingestion pipeline using Cloud Workflows

Christian Kravanja
CodeShake
Published in
8 min readFeb 19, 2021

If you are building a Datalake using BigQuery, there are many ways to implement your ingestion pipeline.

However, GCP Workflows, unveiled at Cloud Next’20 and now in GA, provides a serverless, cost-effective, and easy way to implement such a pipeline. The product is still new, but looks already promising.

We will start with a simple workflow that consists in loading a CSV file to BigQuery, using BQ load command. In our case, the ingestion workflow is triggered in a “push” way : a datasource publishes regularly data as a new file to an input GCS bucket, and we want to update our datalake right after a new file arrives.

At first we will start by writing the workflow YAML definition. We will see later in this article how we can trigger the workflow execution.

For those who may want to jump directly to the code, here is the repo link:

Pre-requisites

In this article, we assume that you are familiar with GCP, and that you have a Linux / macOs shell with gcloud SDK installer. We also assume that you have a GCP project at your disposal (named : YOUR_PROJECT in this article), with billing enabled, and GCS, Bigquery and Worflkows APIs enabled.

Implementing the workflow

The main purpose of the workflow is to execute a succession of steps that will load the data on BQ and give execution feedback:

  • launch a BigQuery load job with a GCS input file
  • regularly query the job status, until the job is finished
  • when the job state is “DONE”, get the result
  • if the load succeeded, mark the data object in GCS as loaded
  • return the job result

Workflows are defined in a YAML file as a succession of steps. They offer extended capabilities, here is a link to the reference syntax documentation : https://cloud.google.com/workflows/docs/reference/syntax

Here is our sample workflow outline :

Let’s walk through the main workflow steps :

  • assign_vars: in this first step, we prepare the request body for our BigQuery load job creation request. The request_body variable contains the exact JSON format expected by the BQ API for the load request: we are reusing the args.bucket and args.object given as a parameter by the calling Cloud Function, to designate the GCS url of the file to load. We also define the sourceFormat (CSV), that we want to autodetect the schema, the nullMarker, and field delimiter, and the target BQ Datasets parameters.
- assign_vars:
assign:
- request_body : {
"load": {
"sourceUris": [
"${ \"gs://\" + args.bucket + \"/\" + args.object}"
],
"destinationTable": {
"datasetId": "YOUR_DATASET",
"projectId": "YOUR_PROJECT",
"tableId": "generated_table"
},
"sourceFormat": "CSV",
"autodetect": "true",
"nullMarker": "NA",
"fieldDelimiter": ","
}
}

Optionally, we could have specified the schema that the source file should comply with. This can be useful to validate the input data format.

  • createBigQueryLoadJob : The second step will launch the BQ load job. The request is asynchronous : the result of the load request will just tell if the job has been accepted or not, but not the load job result yet.
- createBigQueryLoadJob:
call: http.post
args:
url: https://bigquery.googleapis.com/bigquery/v2/projects/YOUR_PROJECT_NUMBER/jobs
body:
configuration: ${request_body}
headers:
Content-Type: "application/json"
auth:
type: OAuth2
result: jobLoadRes

See https://cloud.google.com/bigquery/docs/reference/rest/v2/jobs/insert for more details about BQ job insert API endpoint, and https://cloud.google.com/bigquery/docs/reference/rest/v2/Job#JobConfigurationLoad for details about the JobConfigurationLoad API object.

  • getJobFinalStatus : Now we have to wait for the job to complete. This will be implemented using the subworkflow feature of GCP Workflows. More details about this subworkflow implementation are given later in this article.
- getJobFinalStatus:
call: sub_getJobFinalStatus
args:
joburl: ${jobLoadRes.body.selfLink}
result: finalStatus
  • checkJobResult : Once the job is finished, we will check the result of the load job. If there is an errorResult object in the reponse, we will raise an exception. This is important as we need to ensure that the status of the workflow execution matches the load job status itself.
- checkJobResult:
switch:
- condition: ${"errorResult" in finalStatus.body.status }
raise: ${finalStatus.body.status.errors}
  • tagSourceObject : if we reach this step, the job has suceeded. We can now safely update the source data file metadata in GCS, by adding some tags to tell that this particular file has already been loaded to BQ, and let a trace about the load job id.
- tagSourceObject:
call: http.put
args:
url: "${\"https://storage.googleapis.com/storage/v1/b/\" + args.bucket + \"/o/\" + args.object }"
body:
metadata:
"status": "loaded"
"loadJobId": ${finalStatus.body.id}
headers:
Content-Type: "application/json"
auth:
type: OAuth2

See https://cloud.google.com/storage/docs/json_api/v1/objects/update for more details about GCS update API endpoint

  • returnResult : If the job is successfull, we return the execution result, so we can later analyze all BQ load job execution outputs in the GCP Worflows console :
- returnResult:
return:
"jobStatus" : ${finalStatus}

Implementing the subworkflow :

The sub_getJobFinalStatus subworkflow is nested in the same YAML file as the workflow. Its role is to wait for the given job to complete and return the final result once available. To do that we will loop at a 5 seconds interval on the job status until the status is “DONE”. If not, we will return back to the sleep step and wait 5 more seconds :

sub_getJobFinalStatus:
params: [joburl]
steps:
- sleep:
call: sys.sleep
args:
seconds: 5
- getJobCurrentStatus:
call: http.get
args:
url: ${joburl}
auth:
type: OAuth2
result: jobStatusRes
- isJobFinished:
switch:
- condition: ${jobStatusRes.body.status.state == "DONE"}
return: ${jobStatusRes}
# else
next: sleep

Uploading the workflow

To create the workflow (along with its subworkflow), we will use the gcloud workflows command (gcloud version ≥ 324.0)

First, we create a service account for our worflow and give it some rights to create BigQuery tables and read and write files in the GCS bucket (we need write permissions for metadata update):

gcloud iam service-accounts create sa-workflow --display-name sa-workflowgcloud projects add-iam-policy-binding YOUR_PROJECT \
--member serviceAccount:sa-workflow@YOUR_PROJECT.iam.gserviceaccount.com \
--role roles/bigquery.dataOwner --role roles/storage.objectAdmin

Security note : for the sake of simplicity of this article, the IAM roles are given to the Service Account at project level. For any real use case, you should however follow the “least privilege principle” and apply the roles only at bucket and dataset levels.

Now we can deploy our YAML definition. At the time of writing, 3 regions are available for the Workflows: us-central1, europe-west4, asia-southeast1 :

gcloud workflows deploy load-workflow \
—- location=europe-west4 \
—- description=’Sample load workflow’ \
-- source=./workflow.yaml \
—- project YOUR_PROJECT\
-- service-account=sa-workflow@YOUR_PROJECT.iam.gserviceaccount.com

The full workflow.yaml source can be downloaded in the GitHub repo mentioned at the beginning of the article.

Creating the Trigger Function

There is a first step that we didn’t talk about yet : we need to trigger the workflow every time a new file is uploaded in the bucket. As Cloud Workflows are not yet compatible with GCS triggers, we will use a boilerplate Cloud Function to propagate the “Upload Event” received by Cloud Functions to Cloud Workflows :

The Cloud Function will be basic : take the event related to the file upload, and call the workflow with some of the parameters. The example given below in this article shows a Cloud Function implementation written in Python.

Why not everything in Cloud Function ?

At this step you might ask why do we continue with GCP workflow and not write the whole workflow in the Python Cloud Function. Using Workflows provides many advantages :

  • straightforward YAML syntax, clear delimited steps
  • workflow updates are applied almost instantaneously
  • workflows can run for days (Cloud Functions are limited to 9 minutes), useful if you need to wait for a human interaction or implement some pause in your workflow
  • workfows are billed by number of steps and external calls, not by execution duration
  • low startup time, no ‘cold start’ effect
  • monitoring UI for tracking workflows executions result is included
  • neat diagram of steps is generated when editing the workflow in the GCP UI

Let’s write the function :

On Linux / macOS, follow these steps :

  • Create a directory on your local system for the function code:
mkdir WorkflowTrigger

Create a main.py file in the WorkflowTrigger directory with the following contents:

In this code, we can see that we use an HTTPS call towards GCP Workflows API, giving as a json payload the GCS bucket name and the GCS object path of the file containing the data.

We are also using the Cloud Function own credentials to authenticate against the GCP API.

Specify dependencies

As we need to import Google Auth Python librairies, we will create a metadata file requirements.txt in the WorkflowTrigger directory. We will also needs the requests library to perform outgoing HTTPS requests.

The file will contain the name of the two libraries to import :

google-auth
requests

Deploy the function and activate the trigger:

We can now deploy the function to GCP. As we will want to activate the bucket trigger when deploying the function, we will first create the bucket where the source will be pushing its data :

gsutil mb -b on gs://input_data_bucket

We will then create the service account that the Cloud Function will be using. We just need to give this service account the right to invoke the workflow :

gcloud iam service-accounts create sa-cf-trigger --display-name sa-cf-triggergcloud projects add-iam-policy-binding YOUR_PROJECT \
--member serviceAccount:sa-cf-trigger@YOUR_PROJECT.iam.gserviceaccount.com \
--role roles/workflows.invoker

Note that the Datasource pushing the data files will likely use another service account with a writing role on the GCS bucket.

Then we can deploy the cloud function and associate it with this service account :

gcloud functions deploy workflow-trigger \
--region=europe-west4 \
--entry-point onNewFile \
--runtime python38 \
--trigger-resource input_data_bucket \
--trigger-event google.storage.object.finalize \
--service-account=sa-cf-trigger@YOUR_PROJECT.iam.gserviceaccount.com

The deployment parameters activate the google.storage.object.finalize trigger on the GCS bucket. The event is sent when a new object is created (or an existing object is overwritten, and a new generation of that object is created) in the bucket.

The ingestion pipeline in action

To test the workflow, we will use an example file (public CSV file published by Florida State University) with an average number of hurricanes per months for year range 2005–2015 :

https://people.sc.fsu.edu/~jburkardt/data/csv/hurricanes.csv

Let’s upload this data file to our Cloud Storage bucket :

curl https://people.sc.fsu.edu/~jburkardt/data/csv/hurricanes.csv |gsutil cp — gs://input_data_bucket/hurricanes.csv

We can now see that the workflow was triggered and did succeed :

The BQ table is created :

Conclusion

Thanks to GCP Workflows, we managed easily to lay the first stone of our data pipeline.

This new serverless service is quite new, but already provides :

  • serverless execution
  • simple workflow definition in YAML
  • control loop capabilities
  • native compatibility with other GCP services
  • IAM integration
  • interactive Web GUI for workflow definition
  • Workflow executions monitoring interface

As a promising service, we can expect leveraging on it for more complex workflows.

Special thanks for the article reviewers : Pascal, Guillaume, Vladimir, Florent

--

--