BigQuery Cross-Region Replication Gotchas

Keven Pinto
Appsbroker CTS Google Cloud Tech Blog
7 min readJan 26, 2023

2022 was the fifth warmest year on record. The July heatwave pushing temperatures to 40°C in parts of the UK was so brutal, that it was a rare occasion that my camera was switched on during office meetings.

There were times, I felt that if I stayed out too long, I may melt. Thankfully that didn’t happen… to me at least! Sadly, we can’t say the same for certain Google and Oracle services located in a London data centre on the 19th July. The failures caused by the heatwave and subsequent disruption that resulted led to one of our clients facing a service outage that affected them for several hours. After restoring service, Google was quick to commission a project to ensure multi-regional resiliency for this client. So ironically, this extreme climate event has resulted in a technical opportunity to improve the client platform — and inspire this blog

We kicked of this project with a single goal in mind, that if this ever were to happen again, we would be ready(ish). I say that with some reduced conviction as regional failovers and fail-backs do incur some downtime, albeit minutes, it’s not seamless! Especially the database layer — more on that some other time.

In this Blog, I’d like to talk about a small slice of the overall architecture — the Multi Region Setup for BigQuery, our approach, our initial success and why we felt we needed to rethink the approach.

Before we go into details of our architecture, let’s clear some misconceptions and lay out some facts:

  • Multi-region Datasets in Bigquery does not imply that a copy of the data is stored in a different geographical region. Currently, implementation means that data is located in a region within a large geographical area such as the US or EU
  • The term multi-region in BigQuery can be confusing as it is not the same as Cloud Storage’s definition of multi-region
  • Multi-region datasets cannot be used for data resiliency purposes

Architecture

A quick scan of current Google offerings threw up BigQuery’s cross-region dataset replication as a good candidate of achieving resiliency and cross-region durability. The only small challenge was that this feature is still currently in Preview — I say small, as the client was happy to underwrite this risk. Onwards we went!

Database Cross Region View

Our Proposed setup involved the following components:

  • A HA Cloud SQL (PostgreSQL 14) Instance (2 Zones) in Region 1 with a read- replica in Region 2
  • A Datastream in Region 1 to replicate data from PostgreSQL to a Bigquery dataset in Region 1
  • A Primary Bigquery dataset in Region 1
  • A Replica Bigquery dataset in Region 2 being kept in sync using the Bigquery dataset transfer service

In case you wondering, why didn’t we setup a Datastream in our Secondary region?.. Good question! At the moment, Datastream only supports read replicas from MySQL and we are on PostgreSQL 14.

Challenge 1 — Scheduling

Our goal was to have consistency across our 2 datasets, however when setting the Transfer schedule we hit the first constraint. You cannot increase the frequency of the copy job to be less than every 12 hours.

This was not ideal, we wanted consistency across regions and a latency of 12 hours was not going to cut it.

12 hr Constraint

Solution

Low Latency Approach

The flow of the proposed Solution is as follows:

  • Deploy the Data Transfer Service as an On-Demand Service. This simply means that it does not run on a schedule but will need to be called explicitly
  • Configure the Data Transfer Service to write to a Pub/Sub topic upon completion. This is an out of the box feature:
Data Transfer Notification
  • Create an Event Based Trigger on this Pub Sub Q to call a Cloud Function
  • Code the triggered Cloud function to call the Data Transfer Service Job manually, thus creating a continuous loop

Simple! Well, not quite. The job ran for a few hours and then started failing. It looked like we forgot to take some other dataset copy constraints in consideration. Enter Challenges 2 & 3!

Challenge 2 & 3 — Daily Limits

Constraints

Before we look at the impact of these constraints, lets understand a few things:

  • 1 table Copy = 1 Job
  • If there is no new data since the last Job, a job slot is not taken up (nice!), see screenshot below where n tables have been skipped as there was no new data and the summary shows 1 jobs as only 1 table had data added to it. Data set copy does this even with the Overwrite Destination Tables option set as True

High Velocity tables

To highlight challenge 2 let’s take a look at the following scenario:

Consider a group of tables that are high velocity and qualify for a data copy in each run. If each transfer job takes on average 3 minutes to run, we will hit our 100 jobs per destination table limit in the first 300 minutes of the day (continuous loop design). Given, 1440 minutes in a day, we won't be able to refresh that table for the next 19 hrs! At this point the 12 hour constraint seems like a much better solution.

In order to mitigate, Challenge 2, we decided to add some logic to our Cloud function to ensure that the job never runs more than 4 times per hour. We worked this out by dividing the minutes in a day (1440), by the number of job per table constraint, i.e. 1440/100. Ok! So we now do 96 jobs a day, that’s 4 jobs short of the constraint. Good work then?! Not quite! There was still Challenge 3 to consider.

Our Client has a large number of high velocity tables, and given that we wanted to keep latency low, we would have to pick only 20 high velocity tables in order to meet the 2000 jobs per day per project limit, i.e 20 * 100 runs = 2000 runs. It was at this point, we decided that our approach needs a rethink!

For details about our eventual Solution, look here.

Summary

The BQ cross-region dataset transfer service is easy to setup and fast (very!). However, the scheduling limitations and daily limits do not make it an ideal candidate for high velocity and cross region consistency use cases.

Finally, I attach a very basic and early version of the cloud function we wrote in order to invoke the Transfer, enjoy!

# https://cloud.google.com/python/docs/reference/bigquerydatatransfer/latest/google.cloud.bigquery_datatransfer_v1.services.data_transfer_service.DataTransferServiceClient#google_cloud_bigquery_datatransfer_v1_services_data_transfer_service_DataTransferServiceClient_get_transfer_run
# https://cloud.google.com/bigquery/docs/reference/datatransfer/rest/v1/TransferState
from pathlib import Path
import time
import functions_framework
from google.cloud import bigquery_datatransfer_v1
from google.protobuf.timestamp_pb2 import Timestamp
from google.cloud import pubsub_v1


def publish_message(project_id: str, topic_id: str, message: str):
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_id)
data = message.encode("utf-8")
future = publisher.publish(topic_path, data)
print(f"Published messages to {topic_path}.-{future.result()}")


def get_transfer_id(
config_name: str,
parent: str,
client,
):
# Initialize request argument(s)
request = bigquery_datatransfer_v1.ListTransferConfigsRequest(
parent=parent,
)

# Make the request
page_result = client.list_transfer_configs(request=request)
config_id = ""

# Handle the response
for response in page_result:
if response.display_name == config_name:
config_id = Path(response.name).name

return config_id


def run_manual_transfer(project_id: str, config_id: str, parent: str, client):
parent = f"{parent}/transferConfigs/{config_id}"
start_time = Timestamp(seconds=int(time.time()))
request = bigquery_datatransfer_v1.StartManualTransferRunsRequest(
{"parent": parent, "requested_run_time": start_time}
)
response = client.start_manual_transfer_runs(request, timeout=360)
return response


def get_transfer_run(project_id: str, parent: str, config_id: str, run_id: str, client):

transfer_running_states = ["PENDING", "RUNNING"]
name = f"{parent}/transferConfigs/{config_id}/runs/{run_id}"

# Initialize request argument(s)
request = bigquery_datatransfer_v1.GetTransferRunRequest(name=name)

# Inner Recursive Function
def monitor_run(pause_interval: int):
# Make the request
response = client.get_transfer_run(request=request)
for line in str(response).splitlines():
if line.strip().startswith("state"):
state = line.split(":")[1].strip()
if state in transfer_running_states:
# Wait n seconds
print(f"Run State:{state}...Waiting:{pause_interval} Seconds")
time.sleep(pause_interval)
return monitor_run(pause_interval + 1)
else:
return state

# Handle the response
return monitor_run(0)


def get_run_id(response):
run_id = ""
for line in str(response).splitlines():
if line.strip().startswith("name"):
run_id = Path(line.split(":")[1].replace('"', "")).name
return run_id


# Entry Point
# This routine assumes that the tranfer job has already been created
@functions_framework.http
def main(request):
project_id = "PLEASEUPDATE"
source_region = "PLEASEUPDATE"
target_region = "PLEASEUPDATE"
config_name = "PLEASEUPDATE"
topic_id = "PLEASEUPDATE"
transfer_success_states = ["SUCCEEDED"]
transfer_failed_states = ["TRANSFER_STATE_UNSPECIFIED", "FAILED", "CANCELLED"]

# parent
parent = f"projects/{project_id}/locations/{target_region}"

# Create a client
client = bigquery_datatransfer_v1.DataTransferServiceClient()

# Get the Config id of the Transfer Job
config_id = get_transfer_id(
config_name=config_name,
parent=parent,
client=client,
)

# Trigger the Transfer Manually
response = run_manual_transfer(
project_id=project_id, config_id=config_id, parent=parent, client=client
)

# Extract Run Id of the Triggered Job
run_id = get_run_id(response=response)

if run_id != "":
print(f"Cross Region DataSet Copy Initiated - Run Id:{run_id}")
# Monitor the Job to completion
# One can skip this and just parse the Completion message on the Q
run_response = get_transfer_run(
project_id=project_id,
parent=parent,
config_id=config_id,
run_id=run_id,
client=client,
)

if run_response in transfer_success_states:
print("Transfer Run Succeeded... Proceeding to Schedule Next Run")
publish_message(
project_id=project_id, topic_id=topic_id, message=run_response
)
else:
print(f"Transfer Run Completed with Status:{run_response}")
else:
print("Unable to Start Job")



if __name__ == "__main__":
main(None)

Thank you Charlotte McKee and Stuart Barnett for editing this article and providing some great feedback.

--

--

Keven Pinto
Appsbroker CTS Google Cloud Tech Blog

Traveller | Eco warrior | Data Engineer | Curious Fella | Foodie | Father