BigQuery CDC with PubSub: Overcoming limitations
Creating and deploying a new cloud application is fairly straightforward and doesn’t impact existing systems.
However, migrating an application that’s already critical for business operations is a different story.
“Big bang” approaches and service disruptions are unacceptable. A dual-run process is necessary to ensure a smooth transition for both users and teams.
A real world use case
At Carrefour, a retail company, we’re migrating our warehouse software to a newer, cloud-based version. This means shifting analytics and reporting from IBM DB2 Info Center to BigQuery.
The migration will be gradual, spanning over 80 warehouses and two years.
During this period, we can’t disrupt warehouse operations or disable reporting dashboards. Therefore, we need to run the legacy and new systems concurrently.
Data warehouse synchronisation
Real-time data replication between databases is a key strategy for a seamless migration.
Change Data Capture (CDC) is designed for this purpose. Whenever a change occurs in the source system, an event is triggered with the change details.
In BigQuery, this can be achieved using PubSub and a specific subscription to BigQuery. For this to work effectively, primary keys must be set on the target table to enable deduplication.
BigQuery CDC limitation
During our testing phase, we encountered errors that required resetting BigQuery tables to start over after fixing bugs. This is a normal part of software development.
However, we encountered two unexpected limitations:
- DML Restrictions: When using a PubSub CDC subscription to a BigQuery table, DML statements like DELETE, UPDATE, and MERGE are not allowed.
This is documented, but still presented a challenge.
- Truncate Timeout: If you want to erase all data using a truncate statement (possible in our testing environment), you can’t do so within 90 minutes of the last write to the table. Otherwise, an error message appears.
The 90-minute timeout is unusual because it’s related to the legacy streaming API and seems to contradict the BigQuery CDC cost description, which mentions using the Storage Write API.
I’m currently discussing this issue with Google and will provide an update when I have more information.
Consequences of Constraints
These limitations slowed down our testing process, but we managed.
More concerning is the impact on the actual migration. With over 80 warehouses to migrate over two years, we anticipate potential issues and rollbacks.
The challenge lies in removing incorrect data from a specific warehouse. We can’t truncate the entire table each time, and we can’t run DML queries to remove specific rows.
Furthermore, future schema updates with added fields might require setting default values for existing rows. The DML limitation preventing UPDATE operations complicates this.
Leveraging CDC to achieve DML
With no immediate solution from Google, we needed a way to ensure a secure migration to BigQuery with CDC. I decided to work within the constraints and avoid direct DML operations.
Instead, I opted to publish messages to the PubSub CDC topic to let the CDC background process handle DELETE or UPDATE operations.
This led me to develop a small tool that performs the following:
- Runs a query in BigQuery.
- Converts each row to JSON, adding the appropriate
_CHANGE_TYPE
(UPSERT or DELETE). - Publishes the JSON message to the PubSub CDC topic.
This tool can be used in CLI mode or as a webserver (hosted on Cloud Run, for example. You can imagine adding a webpage on top of it if you wish.).
It’s open source and available on GitHub.
Examples
To delete specific rows, you can use the following syntax (CLI version):
cdc-dml \\
-query="SELECT * FROM `<TableProjectID>.<dataset>.<table>` WHERE warehouse_id="1234"" \\
-topic="projects/<TopicProjectID>/topics/<CDC topic>" \\
-job_project_id="<JobProjectID>" \\
-operation="DELETE"
Note the different project IDs:
- Table Project ID: Where the BigQuery table resides.
- Topic Project ID: Where the CDC topic resides (can be the same as the table, but not mandatory).
- Job Project ID: Where you want to run the BigQuery jobs (the query) (can be the same as the table, but not mandatory).
To update a column with a default value, you can use the following syntax (webservice version):
curl -X POST -H "Content-Type: application/json" \\
-H "Authorization: Bearer $(gcloud auth print-identity-token)" \
-d '{"jobProjectID":"<JobProjectID>","query":"SELECT * except(new_column), "default value" as new_column FROM `<TableProjectID>.<dataset>.<table>`","pubsubTopic":"projects/gdglyon-cloudrun/topics/bq-cdc", "operation": "DELETE"}' \
https://your-service.XXX.run.app
An helpful tool with tradeoffs
Every solution has tradeoffs. Here, you can notice two major ones:
- This solution will cost a query in BigQuery, messages in PubSub (maybe millions!), and CDC background processing. It would not be neutral in terms of cost, depending on your data volume.
- This solution will lose/erase the
_CHANGE_SEQUENCE_NUMBER
on every updated row. It also won’t prevent changes/updates from duplicates or late-arriving CDC messages.
This tool can help you in those tricky situations where you have no other options.
In any case, use with caution! But enjoy the CDC updates in BigQuery!