Using Datastream to replicate PostgreSQL tables to BigQuery partitioned tables in GCP
Datastream is a serverless and user-friendly Google Managed Service in the Google Cloud Platform (GCP) for Change Data Capture (CDC), designed to enable reliable data replication with minimal latency.
In particular, Datastream enables seamlessly replicates data from operational databases to BigQuery. In this tutorial we will look at the use case of replicating PostgreSQL tables to BigQuery partitioned tables. We also zoom into the downsides and mitigating measures of taking such approach.
Before attempting the steps described in this blog, please ensure that you have the following:
- A Google project in the Google Cloud Platform.
- A user with, or the ability to grant, Cloud SQL and BigQuery (admin) roles.
For simplicity grant yourself the following IAM roles on the project (you might be able to do with lesser roles but I didn’t check):
- Cloud SQL Admin
- BigQuery Admin
PostgreSQL set-up
The first step is to setup a minimum Cloud SQL instance for PostgreSQL. Create a Cloud SQL instance by following the steps described here. When creating the Cloud SQL instance, please ensure the following:
- Name your instance: postgres-datastream-tutorial.
- Select PostgreSQL version 15.
- To reduce costs for this tutorial select the “Sandbox” preset.
- Use Public IP for the instance IP assignment. You can also use this guide for a Private IP instance, but you will need to set-up a reverse proxy.
- Save the password you used to create the instance. This is the password of the “postgres” user. Note that this is not a superuser as per native PostgreSQL terms but a cloudsqlsuperuser.
- Ensure that you pick a single region to host your instance. The selected region will be important for following steps.
When you have create the instance please follow these steps:
Connect to your PostgreSQL instance by using Cloud Shell by running (use the saved password to login):
gcloud sql connect postgres-datastream-tutorial —-user=postgres
Next up you should create a database by running the following command:
-- Create a "partitioned_tables" database
CREATE DATABASE partitioned_tables;
Now connect with the database:
\c partitioned_tables.
Now create a table by running:
-- Create the "event" table
CREATE TABLE event (
event_id SERIAL PRIMARY KEY,
creation_timestamp TIMESTAMPTZ NOT NULL,
event_data JSONB
);
We will insert some dummy data into this table:
-- Insert five rows with dummy data
INSERT INTO event (creation_timestamp, event_data)
VALUES
(NOW(), '{"event_name": "Event 1", "event_description": "Description 1"}'),
(NOW(), '{"event_name": "Event 2", "event_description": "Description 2"}'),
(NOW(), '{"event_name": "Event 3", "event_description": "Description 3"}'),
(NOW(), '{"event_name": "Event 4", "event_description": "Description 4"}'),
(NOW(), '{"event_name": "Event 5", "event_description": "Description 5"}');
As the last step in you will need to create a publication and replication slot by running the following:
-- Create a publication for the event table
CREATE PUBLICATION bigquerypublicationevent FOR TABLE partitioned_tables.event;
-- Create a replication slot
SELECT PG_CREATE_LOGICAL_REPLICATION_SLOT('bigqueryreplication', 'pgoutput');
Note that you can also replicate PostgreSQL partitioned tables by adding WITH (publish_via_partition_root = true)
to the create publication statement. This will ensure that only the master table is replicated.
BigQuery set-up
By default Google Datastream does not offer support to stream to BigQuery partitioned tables. However, by pre-defining tables in BigQuery you can bypass this.
In BigQuery create a dataset with the same name as the database. Next up we create the table which we will use to replicate the event table:
-- Create the event table
CREATE OR REPLACE TABLE partitioned_tables.event (
event_id STRING NOT NULL,
creation_timestamp TIMESTAMP,
event_data STRING
)
PARTITION BY DATE(creation_timestamp)
Also we need to set a primary key, otherwise Datastream will not be able to replicate. Note that BigQuery does not offer full enforcement of primary keys.
ALTER TABLE partitioned_tables.event
ADD PRIMARY KEY (event_id) NOT ENFORCED;
And as a last step for this table we set a partition expiry of 7 days, more on this later
ALTER TABLE partitioned_tables.event
SET OPTIONS (partition_expiration_days=7);
Datastream set-up
Next up datastream needs to be set-up. In order to use Datastream you will need to set-up two connection profiles.
- For setting up the PostgreSQL connection profile, follow the steps here. For simplicity you can use use the postgres user to connect to your instance. However, in a production setting it is better to set-up a separate replication user with minimum permissions.
- For setting up the BigQuery connection profile, follow the steps here.
As the last step you will need to create and start the stream itself. Here it is important that you select the same region as you used in the set-up of your PostgreSQL instance.
All the details to set-up the stream between your PostgreSQL instance and BigQuery and be found here.
Make sure that you choose “Automatic” as the backfill mode as this will automatically backfill the event table to BigQuery. Set the staleness limit to 15 minutes.
In action
To see this in action, you can insert a few new rows into the PostgreSQL event table.
-- Insert five rows with dummy data
INSERT INTO event (creation_timestamp, event_data)
VALUES
(NOW(), '{"event_name": "Event 6", "event_description": "Description 6"}'),
(NOW(), '{"event_name": "Event 7", "event_description": "Description 7"}'),
(NOW(), '{"event_name": "Event 8", "event_description": "Description 8"}'),
(NOW(), '{"event_name": "Event 9", "event_description": "Description 9"}'),
(NOW(), '{"event_name": "Event 10", "event_description": "Description 10"}');
Note that it might take up to the staleness limit (e.g. 15 minutes) for the new rows to show up in the BigQuery table. BigQuery will first insert these streamed values into its Delta log, after which an apply job is ran to update the table. To learn more about this refer to here.
Conclusion
You can now set-up a Datastream that replicates data from a PostgreSQL table to a BigQuery partitioned table. Replication to aBigQuery partitioned table should allow you to optimise querying on less data and enable you to for example copy partitions to another table.
There’s a few things to keep in mind though:
- BigQuery might scan the entire table replicating events into the table. It will take into account the primary key (for deduplication) and for updates will need to find the right entry. This can cause excessive BigQuery costs. In this example we set an expiry on the partitions, meaning that only 7 days is kept in the replicated table, but this only works when there’s no updates on older data. If you want to keep the entire history it is recommended to save this to another table.
- Datastream for PostgreSQL still has some limitations. For example, array types and dropping entire columns is not supported.
- The staleness limit will ensure that BigQuery data is not stale for longer than the configured staleness limit. However, this will also cause operations such as the creation of materialized views and writing dependent tables to fail when the staleness limit is exceeded. In my experience this does not happen often, but can cause some friction.
- It is mandatory to pre-define the partitioned table in BigQuery, otherwise Datastream will define the table itself (non-partitioned by default).
I hope this helps you setting up Datastream for your specific use case. In case of questions or suggestions, feel free to reach out.