Debezium event flattening with SQL in Snowflake
As a company grows, data engineering teams need to focus on scaling to meet the increasing demands from BI reporting, product analytics, and data science. In the beginning, it’s enough to report directly from the application database to get the data you need. Next, you might add some read-only databases to move the analytics workloads off of the primary database. Eventually, you may arrive at a similar architecture as in the diagram above; you replicate your application’s database to a separate database that is optimized for analytics. This approach can help a company scale for application and analytics requirements independently.
In the case of Vimeo, we want to replicate MySQL database tables to the Snowflake database; Snowflake is a cloud computing-based data warehouse that we use around here. But as we capture changes from MySQL into our data warehouse, we need to do some transformations.
This article explains how we approach transforming the MySQL changes into Snowflake tables, which is sure to be of interest to you if you stream Debezium events into Snowflake using the Snowflake Connector for Kafka and you want to flatten these events into tables. (What’s Debezium? Read on.)
A little background
The change data capture or CDC approach is common for replicating a source system’s state into a data warehouse; changes to data in the source are captured by consumers to take some action. This facilitates analytics and decision-making based on multiple data sources by using a central data warehouse.
We’ve set up Debezium to stream MySQL application backend database changes on certain tables as CDC events to the Snowflake database. Debezium is an open-source distributed platform for CDC deployed to our Kafka Connect cluster. Kafka Connect is a framework for streaming data between Kafka and other data stores. While Debezium streams CDC events to Kafka, the Snowflake Connector streams these events from Kafka into Snowflake.
So we need to transform the raw data from the Snowflake Connector into flat tables. But because the Snowflake Connector doesn’t write to flat tables, we need to transform the unstructured CDC events. In this approach, we use Snowflake Streams, a job scheduler, and custom SQL to flatten the events as shown in the diagram below.
Why custom event flattening?
Debezium does support event flattening for a few databases like PostgreSQL and MySQL. However, at the time of writing, the Snowflake Connector receives raw CDC event records and writes them into a table with two VARIANT columns:
RECORD_CONTENT. This column includes the Kafka message.
RECORD_METADATA. This column includes metadata about the message, such as the topic from which the message was read.
It’s worth briefly discussing how Debezium event flattening works for other databases. The Kafka Connect API provides an interface called Single Message Transforms (SMTs), which allows for transforming every message before it’s sent to its destination. Debezium implements an event-flattening SMT to parse through the complex structure of data change events and output flat field names and values. This would be perfect, since our goal is to have a flat database table. Unfortunately, the Snowflake Connector has limitations that prevent us from using this convenient SMT.
Given that our goal was to ingest database changes into Snowflake, we had some options:
- Attempt to integrate the Debezium event flattening SMT (or write a custom SMT) with the Snowflake Kafka Connector and maintain a fork of https://github.com/snowflakedb/snowflake-kafka-connector.
- Work to make open-source changes to https://github.com/snowflakedb/snowflake-kafka-connector instead of forking.
- Handle the transformation in SQL.
In the interest of time and team familiarity with certain tools, we opted to do this event flattening in SQL.
We’ll take you through our process step by step. But before we get into the details, here’s a look at the finished SQL, just as a preview of where we’re headed.
https://gist.github.com/oolongtea/694bd25b0d0a906a4821270606f1cdf7
Introducing the example
To set the stage with an example, consider a table named video
in MySQL:
We set up Debezium and the Snowflake Connector to write this to a Snowflake table called myschema.video_cdc_log
. Here’s a sample record:
https://gist.github.com/oolongtea/06c1b9d8094cd711e5ba480174bb7725
The goal is to convert these records into a clean Snowflake table: myschema.video
.
Terminology
The table below helps explain common terms we will use throughout this article. For example, when we say “CDC table,” we mean the Snowflake table called myschema.video_cdc_log
.
Keeping track of incoming events
Okay, so we’re receiving a stream of records into myschema.video_cdc_log
. We have a few options here:
A. Recreate the target table every time:
- Truncate the target table.
- Run SQL on the entire
myschema.video_cdc_log
table. - Insert into the target table.
B. Implement and design our own CDC approach to keep track of new records in myschema.video_cdc_log
.
C. Use Snowflake Streams, a proprietary CDC tool.
We opted for using Snowflake Streams, defined as:
A stream object records data manipulation language (DML) changes made to tables, including inserts, updates, and deletes, as well as metadata about each change, so that actions can be taken using the changed data.
This is the DDL SQL we used:
CREATE OR REPLACE STREAM myschema.video_streamON TABLE myschema.video_cdc_logAPPEND_ONLY = true
Event flattening with SQL
The select … from vimeo.video_stream
statement contains recent events streamed into the myschema.video_cdc_log
table. Depending on how often you run your event-flattening statement, this batch should be relatively small. The structure of the subquery contains:
select
op
is a value provided by Debezium that we’ll use to handle what to do when we encounter an existing record:u
for update,c
for insert, andd
for delete- We coalesce the unique keys of our table because, for delete records, the values come only in the “before” part of the event.
- Finally, we can parse out all of our columns and cast to the expected data type.
from vimeo.video_stream
- This is the Snowflake stream with the batch of new events.
where
- We choose to be explicit about the
op
values we care about. The Debezium version we’re using doesn’t pushr
events, so we protect against this for now. - We ensure our primary key isn’t null.
qualify row_number() over (...) = 1
- This is deduplication with a window function.
- We partition on the primary key (considering the “before” and “after” sections of the Debezium payload).
- We order by the Debezium record timestamp plus the the MySQL binlog position, which is the ultimate source of truth for ordering.
Handling merge conflicts
Now that we have our subquery ready with a batch of deduplicated events, we can construct our merge-into statement with a few cases:
When matched and op
equals d
:
- This is a delete record in Debezium, so let’s delete the record from our target table.
When matched and op
equals u
or c
:
- The
u
is a simple update. - The reason to include
c
is to handle the case when a record is deleted and recreated immediately in the same Debezium event batch. In this case, we can treat this as an update.
When not matched and op
doesn’t equal d
:
- This is a simple insert record.
Continuous ingestion
Now that you have:
- Debezium events ingested into a raw CDC Snowflake table
- A Snowflake stream on top of the CDC table
- Full merge-into SQL
You should be able to run your SQL with your scheduler of choice, whether that’s a tool like Apache Airflow or a bash script run on a cron schedule.
As you regularly run your SQL, events will be ingested and cleared from your Snowflake stream. From here, it depends on your requirements for how often you want to run this to keep your target table up to date. We choose to run our SQL on 15-minute intervals using Airflow.
What’s next?
Are we done here? No! There’s always room for more. We perceive scaling, data schema evolution, and data reconciliation on the horizon.
Scaling
As we continue to use this method, there’s the question: what if we want fifty more MySQL tables replicated to Snowflake? You can imagine that writing each SQL statement for each table would take time.
In the short term, we’ve developed a script to semi-automate this process. The input is your source MySQL database and source table name. The outputs are DDL SQL to create the Stream, DDL to create the target table in Snowflake, and the merge-into statement. From here, we’re able to integrate this into an Airflow DAG quickly, along with Great Expectations data validation suites.
In the future, we are considering using transformation tools such as dbt to take this automation further. We can imagine leveraging dbt’s Jinja templating to generate and deploy the event-flattening SQL automatically, along with basic schema and data tests.
Data schema evolution
The other limitation with our current approach is that source table modifications aren’t propagated. If a new column is added to a source table, our target table in Snowflake won’t pick up on this until we add it manually. Currently, our use cases don’t require such end-to-end automation because the source tables don’t change often.
However, as we replicate newer tables with more recent development, we don’t want to handle these changes manually or potentially miss out on a new critical column. Fortunately, Debezium has a schema change topic where each event has DDL statements. One short-term goal is to set up automated alerting when we receive a schema change event so that we can decide whether we need to integrate this change into our event-flattening process. Long term, these schema change events can be used to make the necessary changes to the target table and event-flattening SQL automatically, so that we reduce missing out on data or risk breaking our pipeline.
Data reconciliation
Now you have your target table, but do you trust it? At some point you need to compare your source to your target.
As we went through a few manual iterations, we developed scripts to perform basic daily aggregations to validate each new target table against its source table. Depending on the size of your tables, two useful comparisons could be:
- Grouping by a date and counting records
- For a given date (or smaller time range), ensure that all primary keys in the source are in the target system
While we have other data observability tools, an enhancement to this pipeline would be to automate this data reconciliation process at an interval that makes for the criticality of the data set.
Conclusion
To recap, we came up with a full SQL statement that we run on incremental batches of Debezium CDC logs and which results in flattened Snowflake tables.
Replicating source data in one database to another database is difficult. Fortunately, CDC tools like Debezium and Snowflake Streams help keep track of small changes and incrementally replicate a target table. The last missing piece for us was flattening these events in Snowflake, so this approach helped us quickly achieve a near real-time table replication.
Interested in flexing your engineering chops at Vimeo? Join our team!