Data Mesh Self Service — Schema Evolution in Big Query for any change in Google Cloud Storage

Shashank Tripathi
Google Cloud - Community
6 min readJan 6, 2023
Photo by Eugene Zhyvchik on Unsplash

In the previous blog of this series ‘Data Mesh Self Service — Ingestion Pattern from GCS(Google Cloud Storage) to BigQuery (Data Store for Mesh)’, we discussed various approaches to load the files from Google Cloud Storage (GCS) to BigQuery (Schedule-Based and Event-Based) and various GCP service offerings to load files. In this blog, we will focus on how to handle Automatic Schema Evolution in File to Big Query Ingestion Pattern without any manual intervention.

One of the most common ingestion patterns in Google Cloud is Google Cloud Storage (GCS) to Big Query. After the initial load of migration, the challenges in this ingestion pattern is to handle the schema change from the source file and sync it with the target Big Query Table. In this article, we will focus on a possible way to handle the Schema Evolution in GCS to Big Query Ingestion Pattern via Big Query API Client Library.

About Big Query API Client Library:

Client libraries make it easier to access Google Cloud APIs from a supported language. While you can use Google Cloud APIs directly by making raw requests to the server, client libraries provide simplifications that significantly reduce the amount of code you need to write. It provides natural code in each language to make Cloud APIs simple and easy to use. It automatically handles all the low-level details of communication with the server, including authenticating with Google. Cloud Client Library can be installed using familiar package management tools such as npm and pip. The example below command will install all the required resources needed to interact with Big Query:

pip3 install google-cloud-bigquery

BigQuery API :

This is the central API used for core interactions. Within this API, you can find resources for interacting with core resources such as datasets, tables, jobs, and routines.

For more information about this library and usage, see the Python Client for Google BigQuery.

Objective:

Currently, if there is a schema change(column addition) at the GCS bucket, the updated schema, data is not automatically synced with Big Query Table. The objective of this blog is to achieve automatic schema evolution at BigQuery whenever there is a column addition by running the pipeline using Big Query API Client Library in Python.

Pipeline Setup for File to Big Query Ingestion pattern :

One of the Approaches to handling the schema evolution at the Big Query side is to track the change every time the source bucket/prefix is updated with the schema and its corresponding data, which will automatically trigger the pipeline to load the new data with the updated schema in the existing BigQuery table via Big Query API Client Library. If the necessary BigQuery tables don’t exist, the pipeline creates them. Otherwise, existing BigQuery tables are used.

Two approaches discussed to run the pipeline are described below :

1. Cloud Function

2. Cloud Run.

Both approaches can run the schema evolution python code automatically without any manual intervention, this setup is an event-based approach since the arrival time of files is not fixed and the files need to be loaded as when they arrive.

Eventarc makes it easy to connect various services (Cloud Run, Cloud Functions, Workflows) with events from a variety of sources. It allows you to build event-driven architectures in which micro-services are loosely coupled and distributed. The Trigger set up on the Google Cloud Storage uses Eventarc for the event based approach. Cloud Storage trigger enables Cloud function/Cloud Run to be called in response to changes in Cloud Storage. When you specify a Cloud Storage trigger for a Cloud Function/ Cloud Run, you choose an event type and specify a Cloud Storage bucket. Cloud function/Cloud Run will be called whenever a change occurs on an object (file) within the specified bucket. The event type in trigger is Object finalized i.e Pipeline gets triggered when a new object is created, or an existing object is overwritten and a new generation of that object is created.

The below diagram shows the architecture to handle the Schema Evolution in File to Big Query Ingestion pattern. Pipeline code executes on either Cloud Run/ Cloud Function depending upon the business requirement. Using Big Query API Client Library we can handle the Schema Change with different data sources file formats like CSV, Avro, and Parquet. The below diagram shows the architecture to handle the Schema Evolution in File to Big Query Ingestion pattern.

GCS-BQ Ingestion Template

Underline Pipeline Code approach for Schema Evolution:

To perform Schema Evolution we have used Big Query API Client Library as an underline code.

Approach to handle the Schema Evolution in GCS to BQ Ingestion Pattern:

Approach 1: Create Big Query External Table to store the updated schema data and finally write it to the Big Query managed table.

Step 1. Create an external table separate from the native table present in BQ using biquery.ExternalConfig(source_format) with the Update Schema present in the GCS Bucket.

dataset_ref = bq_client.dataset(dataset_id)
table_ref = bigquery.TableReference(dataset_ref, table_id_stg)
table = bigquery.Table(table_ref, bq_schema)

external_config = bigquery.ExternalConfig(source_format.upper())
if source_format == "csv":
external_config.options.skip_leading_rows = 1
elif source_format == "avro":
external_config.options.use_avro_logical_types = True
external_config.source_uris = [gcs_file_path]
table.external_data_configuration = external_config
bq_client.create_table(table)

Step 2. Create a query to select all the columns from the external table created in Step 1 and load the data in BQ target table using bigquery.QueryJobConfig() with the following setting described below.

sql = f""" SELECT src.*
FROM {external_table_id} src;"""
job_config = bigquery.QueryJobConfig(
destination=target_id,
write_disposition="WRITE_APPEND",
schema_update_options=[
bigquery.SchemaUpdateOption.ALLOW_FIELD_ADDITION,
bigquery.SchemaUpdateOption.ALLOW_FIELD_RELAXATION,
],
)
query_job = bq_client.query(sql, job_config=job_config)
query_job.result()

The above code will allow the extra column and the required columns to be nullable by setting Field Addition and Relaxation in QueryJob config. After the Schema is updated and Data is present in the target table. Delete the external table created in step 1.

Approach 2: Export Table Data to Cloud Storage and then Load with the Modified Schema definition.

One can export your table data to Cloud Storage, and then load the data into a new table with the modified schema definition. BigQuery load and export jobs are free, but you incur costs for storing the exported data in Cloud Storage.

Both the Approach discussed above can be used depending on the required use case. Approach 1 provides more flexibility where you can add the audit column required apart from the changed schema column, also you can cast the column according to the use case. Approach 2 is preferable where you want to use the auto-detect schema option and doesn’t have strict schema requirement using bigquery.LoadJobConfig as mentioned below.

job_config = bigquery.LoadJobConfig(
autodetect=True, source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
)

Apart from this, there are some other ways with which we can achieve the Schema Evolution in GCS File to BQ Ingestion Pattern as described here.

Summary:

In the previous part of the series Data Mesh Self Service — Ingestion Pattern from GCS(Google Cloud Storage) to BigQuery (Data Store for Mesh), we discussed various ways to tackle Schema Evolution in Spanner to BigQuery Ingestion Pattern.

In this part, we discussed how to handle the automatic schema evolution of Google Cloud Storage to the Big Query Ingestion pattern using Big Query API Client Library. One approach we discussed is creating the external table to load the updated schema data with bigquery.ExternalConfig and using bigquery.QueryJobConfig loads the updated schema data to the target BQ table. Another approach we discussed is to export the data to GCS Cloud Storage and then load the data to a new table with the modified schema.

In the coming part of the series, we would cover other ingestion patterns like Pub/Sub to BigQuery and so on.

--

--