Low latency highly scalable CDC solution to Bigquery

Ayush Jain
Google Cloud - Community
7 min readMar 2, 2024

CDC has become the de facto choice for most modern data platforms. As more and more organizations move towards low latency analytical solutions, the need for CDC based tooling is gaining importance as compared to the traditional ETL or ELT. Listing some of the key benefits of CDC based solution compared to a traditional ELT -

  • Less impact on the source database. Since cdc fetches the change data by reading the logs from the source database rather than hitting SQL queries on the database itself, its has far less impact on the operational database especially for high frequency replication.
  • Tracks deletes. A database can undergo IUD operations. Traditional ETL/ELT processes were unable to track for deletes and mainly sync for updates and inserts. Since, CDC fetches the data from logs, it can track for deletes and can further replicate in the target database/DW/DL as well.
  • Tracks Schema Evolution. It’s normal for operational databases to have schemas evolve over a period of time with new fields getting added, change in data types and sometimes even dropping of fields. Again, since CDC fetches data from the logs, it can track these operations on the database and can be further replicated on the target environment.

There are numerous solutions to implementing cdc from a variety of data sources. A few common proven solutions are below-

  • Google’s Datastream
  • Confluent
  • Fivetran

These solutions work great and handle most of the key aspects mentioned above and go beyond. However, they can be either limited to few data sources or may be too costly for some organizations.

An alternative low cost solution is to implement an open source technology based solution to this implementation which is -

Debezium — Kafka — BQ Connector(from wepay) — BQ

This works well too and I know for sure of a few implementations in production environments. There are quite a few blogs for it on medium as well. However, there are a few challenges with this approach too as it can be tedious to implement and manage. You need a high engineering effort to implement this and manage for challenges especially kafka and connector scalability with varying peak ingestion rates, setting up its logging & monitoring and configuring a number of connectors both on the source & sink side.

A middle path which not only solves for these challenges (from the open source solution implementation) but also for cost is what we needed. With this article, I intend to share an approach for this.

CDC to Bigquery with Pubsub

Replicate data sources using Debezium to Pubsub to BQ for a low cost, low maintenance higher control pipeline.

CDC to Bigquery with Pubsub

Here we leverage open source technology debezium server which provides efficient connectivity to a variety of data sources at no licensing cost. This is a kafkaless implementation of debezium so you define the sink of debezium as pubsub. Debezium reads log with options to selectively choose tables from the source database.

Google Cloud’s Pubsub here is the event processing platform which is serverless and can scale to any ingestion rates in a few seconds without needing any manual handling. Pubsub brings the unique capabilities of zero maintenance due to its partition-less architecture.

Pubsub has native integration to bigquery through PubSub to BQ subscription which basically writes events to Bigquery once published on a Pubsub topic. This resolves the need of setting up a connector to sink cdc events to Bigquery.

How does this work?

Debezium tracks the source database logs, sends a payload with the information of before and after change due the cdc event. It also sends information for schema evolution. All this is sent to a pubsub topic which has a subscription sink to bigquery. The payload is in json format. Bigquery is uniquely efficient at processing json data with its json data types in built in json functions. Using which one can derive the changed data and implement a merge to the base historical data. I have built a Metadata driven dynamic merge solution as a starting point to handle for thousands of table merges in parallel using an appropriate scheduler like composer.

Here are the key things it solves for -

  • Pubsub and Bigquery being serverless, scale is handled at charm without needing any reconfiguration etc.
  • The metadata-driven dynamic merge takes table name as input and can derive the schema and primary key for the table from the payload and perform the merge between the incremental data and base/historical data. So you don’t have to write 100’s merge for different tables.
  • The merge frequency can be controlled by the orchestrator’s schedule frequency.
  • It derives schema differences and handles schema evolution.
  • The raw changed events ensure reliability. Even if there is a failure for any reason during the merge, you still do not lose data since all the change events are already available on the bigquery _log table to derive it again.
  • It automatically identifies the primary key from the payload. If the source table does not have a primary key, or you want to override it, you can define it as part of metadata.
  • The Bigquery target table is partitioned on the ingestion timestamp and clustered on the merge key. So, you can start querying for low latency and high performance out of the box.
  • You can also implement dynamic data masking for PII fields, so you have security right from the first possible layer of exposure to business users.

Implementation Steps

Here I have set up a replication between mysql and bigquery. Similar setup can be done for various other data sources as well.

  1. Install Debezium over your mysql instance to track cdc -
  • Install Debezium on your compute which has connectivity to your MySQL instance. The solution has been tested with debezium 2.2.0.
  • Use the application.properties to setup debezium with MySQL as source and PubSub as sink.
  • Run Debezium(./run.sh) to ensure you are able to successfully execute debezium without any errors. Here is a screenshot of successful debezium execution -
Active Debezium Server

2. Create a Bigquery _log table which will get the data streamed from pubsub. Use the below schema definition for all log tables -

create or replace table <dataset>.<table_name>_log
(
message_id string,
subscription_name string,
publish_time timestamp,
attributes json,
data json,
bq_load_ts timestamp default current_timestamp
) partition by timestamp_trunc(publish_time, DAY);
  • field names should be lower case when creating table definitions
  • table name should end with _log

3. Set up Pub/Sub to receive CDC -

  • Each table will need a topic and subsequent subscription to a BQ table.
  • Create a Pub/Sub topic with the name of <dbname>.<servername>.<tablename>
  • Select “Add a default subscription” and add configure to sink into BQ with “write metadata” and “message ordering” and “Dead lettering”. Provide the _log BQ table as sink. Screenshot of setup is shared below -

4. You can also test if the subscription is receiving messages from debezium successfully -

  • You need to create another “pull” subscription to the same topic
  • Click “PULL” under the subscription messages tab. You should see messages showing up like below -

Test Subscription

  • Implement a dynamic merge BQ stored procedure — sp_debezium_log_merge.sql in the same regions where the _log table dataset is. If there are tables across regions. Implement the stored procedure in a dataset for each region.
  • Setup your orchestration tool to call a ps_bq_log_final_merge.sql script which will call the stored procedure with some pre-processing.
  • This script takes the below parameters as input for execution implements the merge for the table -
  • region — The region of the _log table dataset
  • sp_db — The dataset in which the stored procedure will be implemented. This dataset region should be the same as _log table dataset.
  • dbtbname — The name of the log table without _log in the format — <project>.<dataset>.<tablename>
  • pk — primary key override. If the source table has primary key, the utility detects it and doesn’t need to be provided. The value can be defaulted to ‘retrieve’. If the source table does not have a primary key, you can specify the primary key field. The merge would be executed based on this field.

For scaled execution, please create a list of tables which need to be merged and execute the .sql script for each table name and relevant parameters to execute parallely for all the tables so that merge is not sequential or dependent.

The merge frequency can be controlled by the orchestrator’s schedule frequency.

Test your data

Log landing table

SELECT * FROM <project>.<dataset>.<table>_log order by bq_load_ts desc;

Log parsed view

SELECT * FROM <project>.<dataset>.<table>_log_v order by bq_load_ts desc;

Final table

SELECT * FROM <project>.<dataset>.<table>
where publish_time = '<date>'
and <pk field> = '<value1>'
order by bq_load_ts desc;

Latency

My tests show at xxx tps, the data replicates from source to bigquery _log table in ~1 sec!

Enjoy!

--

--