Configure Datastream for only certain DML’s on PostgreSQL as source.

Deepak Mahto
Google Cloud - Community
5 min readOct 18, 2022

This is part 1 of series on Solving real world scenarios on Datastream for PostgreSQL and BigQuery, if interested in checking out all curated scenarios click me.

PostgreSQL is one of the vastly adopted managed databases and lots of customers have a requirement to directly stream transaction changes to BigQuery. On Google Cloud, continuous streams from CloudSQL for PostgreSQL were one of the common asks from different customers as part of further consolidated analytics on downstream platform like BigQuery.
With Datastream for BigQuery recent releases, it provide initial backfill and continuous replication from different PostgreSQL compatible sources using logical replication. With Datastream, integration between CloudSQL and BigQuery is eased out with an enriching, simple, serverless and scalable approach. It is currently in public preview to test it out for different use case on streaming changes from transactional workload to BigQuery.

Primarily Insert, update and delete are common changes on transactional tables that need to be included in streams. But we might have a functional requirement to stream only insert or only insert and update for a specific list of tables. Currently, Datastream does not provide features to support selective data manipulation events to capture and publish on BigQuery.

In current blog we will walkthrough how to configure stream for specific events like insert only or only, insert and update on specific list of tables using PostgreSQL logical replication feature.

Let’s configure CloudSQL for PostgreSQL as sources with sample tables set.

drop table if exists sample_insert_only;
create table sample_insert_only
as
select col1 , col1+1 as col2 , col1+2 as col3 , md5(random()::text) as col4 from generate_series(1,1000) as col1;
alter table sample_insert_only add constraint pk_sample_insert_only primary key(col1);
drop table if exists sample_insert_update_only;
create table sample_insert_update_only
as
select col1 , col1+1 as col2 , col1+2 as col3 , md5(random()::text) as col4 from generate_series(1,1000) as col1;
alter table sample_insert_update_only add constraint pk_sample_insert_update_only primary key(col1);
drop table if exists sample_all_dml;
create table sample_all_dml
as
select col1 , col1+1 as col2 , col1+2 as col3 , md5(random()::text) as col4 from generate_series(1,1000) as col1;
alter table sample_all_dml add constraint pk_sample_all_dml primary key(col1);

Below is the summary of tables list and dml publish needed.

Datastream documentation is detailed on how to configure sources and enable logical decoding for managed instances.

As part of enabling selective publish, we will create three different publication and add tables as per publish required in source PostgreSQL database i.e. whether it has to be insert only or insert and update only.

Create Publication and configure publish properties

CREATE PUBLICATION pub_bq_pg_insert FOR TABLE sample_insert_only 
WITH (publish = 'insert');
CREATE PUBLICATION pub_bq_pg_insert_update FOR TABLE sample_insert_update_only
WITH (publish = 'insert,update');
CREATE PUBLICATION pub_bq_pg_all_dml FOR TABLE sample_all_dml
WITH (publish = 'insert,update,delete') ;

Summary of the publication created and tables added to it from psql console. We can add multiple tables to same publication and its changes will be published based on publication properties.

Each stream created by Datastream will fetch from an active replication slot using pgoutput plugin. Next steps we will be creating three different replication slots.

Create Replication slot for each publication

SELECT PG_CREATE_LOGICAL_REPLICATION_SLOT('pg_rep_bq_insertonly', 'pgoutput');
SELECT PG_CREATE_LOGICAL_REPLICATION_SLOT('pg_rep_bq_insertupdonly', 'pgoutput');
SELECT PG_CREATE_LOGICAL_REPLICATION_SLOT('pg_rep_bq_all_dml', 'pgoutput');

We have already configured the necessary connection profiles and enabled cloudsqlproxy for connectivity on GCE(Google Compute Engine) to CloudSQL as source.

Next steps, we will create three different streams based on the publish required. We have completed all necessary steps to create three different streams based on publish pattern and completed initial backfill of Data.

Create Streams in Datastream for PostgreSQL as source and BigQuery as target.

Let’s check how different streams push changes to BigQuery based on publish events configured for Table.
On Datastream stream 1 — insertonly-stream, it is subscribed for only insert. Let’s apply all dml at source side and check how it is consumed at target side.

insert into sample_insert_only
(select col1 , col1+1 as col2 , col1+2 as col3 , md5(random()::text) as col4
from generate_series(1001,2000) as col1);
update sample_insert_only
set col4 = 'updated'
where col1 < 11;
delete from sample_insert_only
where col1 between 11 and 20;

Though we have pushed all dml but only events related to Insert were pushed to Datastream and BigQuery target for stream 1.

On Datastream stream 2- insertupdateonly-stream, it is subscribed for only insert and update. Let’s apply all dml at source side and check how it is consumed at target side.

insert into sample_insert_update_only(select col1 , col1+1 as col2 , 
col1+2 as col3 , md5(random()::text) as col4
from generate_series(1001,2000) as col1);
update sample_insert_update_only
set col4 = 'updated-one'
where col1 < 11;
delete from sample_insert_update_only
where col1 between 11 and 20;

Though we have pushed all dml but only events related to Insert and update were pushed to Datastream and BigQuery target for Stream 2.

Last stream 3 is subscribe to all events and it will capture delete as well including insert and delete.

insert into sample_all_dml
(select col1 , col1+1 as col2 , col1+2 as col3 , md5(random()::text) as col4
from generate_series(1001,2000) as col1);
update sample_all_dml
set col4 = 'updated-one'
where col1 < 11;
delete from sample_all_dml
where col1 between 11 and 20;

Learning

With PostgreSQL publication, we can alter events to publish based on functional requirement for BigQuery as target with Datastream. In a single publication, we can include multiple tables for publishing specific dml’s based on configuration.
In the process we created multiple replication slots as per custom changes, we should test it to validate any overhead or impact on source with necessary configuration of workers tuned for logical replication-publisher.

Further Reading

Check out the part 2 of Solving real world scenarios on Datastream for PostgreSQL and BigQuery that includes “Configure Streams from Partition table in PostgreSQL to non partition in BigQuery using Datastream.”

--

--

Deepak Mahto
Google Cloud - Community

Database Migration Expert - Enabling success with PostgreSQL on Cloud.