Near-real time CDC in Snowflake

How I migrated from Redshift and simplified my pipelines

Continuously moving data from RDS (AWS managed relational database) to Snowflake can be challenging, specially for a Big Data scenario. I would like to share the architecture that I designed and successfully implemented to achieve CDC (change data capture) with a tiny latency, unblocking near-real time data refresh. Let's start reviewing the challenge.

Legacy architecture

As usual, my company had a transactional MySQL database in AWS recording data from the application and Redshift for OLAP solution. Before the Snowflake project, we had 2 ingestion flows. For small and medium tables, we used DMS (AWS Database Migration Service) to get data from a MySQL read-replica and ingest into Redshift to serve Power BI dashboards and analytics queries (with complex joins and transformations). In addition, we also had EMR (AWS Elastic MapReduce, for Big Data transformations) workloads for large tables migration, implementing incremental load using watermarks with PySpark. However, many tables did not have a reliable updated_at column, and the JDBC connections between Spark and RDS took many hours to be completed (Spark first brings all the data to memory before processing it, because it cannot enact parallel computing in the source database layer).

Legacy data flow using AWS products: EMR, DMS and Redshift.

The promise: Database Migration Service CDC

Therefore, we decided to try DMS for CDC. We first thought of connecting DMS directly to Redshift, but that didn't work very well. We faced a lot of latency issues on the Redshift side, and were not able to solve it just looking at DMS logs. So we changed the approach again: maybe using S3 as an intermediary could solve the lag problems, by avoiding having to structure the data before ingesting on a data warehouse. That actually worked pretty fine, we used DMS to read the bin-logs from RDS and just dump the raw CDC files in a bucket. Still, we would also have the need to implement a job to ingest this new data continuously into Redshift (using Glue, for example).

DMS flow to insert CDC data from RDS into S3.

In parallel, the business required the implementation of a lot of views with many heavy joins, and Redshift was not performing as expected (because we had to manage all the distribution keys and sort keys). Managing the distribution of data inside Redshift nodes is actually a huge headache, because we have to beforehand guess the need from all of our future queries, and also analyses the most frequent joins and filters. Even if you have success in your analysis, you still need to recreate the table in order to change the distribution strategy (a lot of I/O costs and development effort).

Redshift data distribution strategies. (source: Medium)

The solution: Snowflake

After having some research on the market and benchmarks, we decided to move into Snowflake to leverage the easy storage integration with S3, and also avoid having to manage the distribution of the data. Snowflake actually breaks all the data into micro-partitions without asking you to specify any cluster key. Even if you find that clustering some tables (different from their natural ingestion order) is necessary (usually just for tables in the terabyte size), you can do that without having to recreate the table, just inform a parameter to Snowflake and you are good to go! Snowflake also manages the reclustering if needed through automatic clustering, avoiding data skew as your data grows.

Snowflake's micro-partition example. (source: Snowflake documentation)

Another Snowflake tool that helps a lot is streams. It tracks the version of the table, storing an offset that allows it to return unconsumed data from that table (bringing the changes between the current state of the table and the offset saved). Streams actually do not store column data, and automatically updates the offset only after the returned data is consume in a successfully DML transaction (e.g. INSERT or MERGE statement). It is much better and easier than using temporary stage tables.

Table versioning in Snowflake stream. (source: Snowflake documentation)

Snowflake CDC Architecture

We decided to go for external tables and create streams on top of these tables. This had some implications. The first is that we were not duplicating the raw CDC data inside Snowflake (external tables only stores metadata about files that are actually on an external cloud storage service — such as S3 or ADLS). The drawback is that this setup does not allow Snowflake to create invisible columns in the source object, forcing our stream to be insert-only.

Architecture to reflect S3 CDC data in Snowflake’s external tables.

The main advantage of a standard stream is that it only return the last state of the data between offsets. For example: if a row is inserted and then updated in the same offset, a standard stream only return the updated row (but with METADATA$ISUPDATE column set to False and METADATA$ACTION set to INSERT). On my insert-only stream, I had to leverage window functions to deduplicate the data and fetch only the last state of each primary key.

Data flow illustration for Snowflake stream. (source: Snowflake documentation)

Setting tasks

We then created tasks to consume from the streams and store the new or changed data into the target table. We used a MERGE statement to deal with INSERT, UPDATE and DELETE in just a single statement. If the primary key of the table matched any from the target, we updated (if row was an update) or deleted (if marked for delete); if not matched, we inserted. Instead of manually creating external tables, tasks and streams for each table, I wrote a storage procedure (using Python wrapped in a Snowpark function) to spin up everything in a single run.

-- Create a task that inserts new name records 
-- from the 'rawstream1' stream into the 'names' table.
-- Execute the task every 4 hours when the stream contains records.
CREATE OR REPLACE TASK raw_to_names
WAREHOUSE = mywh
SCHEDULE = 'USING CRON 0 */4 * * * America/Los_Angeles'
WHEN
SYSTEM$STREAM_HAS_DATA('rawstream1')
AS
MERGE INTO names n
USING (
SELECT var:id id, var:fname fname, var:lname lname
FROM rawstream1
) r1
ON n.id = TO_NUMBER(r1.id)
WHEN MATCHED AND metadata$action = 'DELETE'
THEN DELETE
WHEN MATCHED AND metadata$action = 'INSERT'
THEN UPDATE SET n.first_name = r1.fname, n.last_name = r1.lname
WHEN NOT MATCHED AND metadata$action = 'INSERT'
THEN INSERT (id, first_name, last_name) VALUES (r1.id, r1.fname, r1.lname);

Another advantage is that we can check if the stream returns any data, and skip the task execution if it's empty. We also have total control over the data refresh frequency (this is important to reduce compute cost from Snowflake warehouses — specially for highly updated/inserted tables). This is also the reason we decided to go with streams and tasks instead of Snowpipe (a product that automatically ingests data as soon as new file arrives in the external storage). As we only needed to look at the data with some hours (or even days) of latency, that was enough for us (while also giving the flexibility to reduce the latency as much as needed, because our design also allows for near-real time data refresh).

Architecture to refresh Snowflake tables from stream’s CDC data.

Historical tables: SCD Type 2

Our design also allowed us to create a SCD Type 2 table into Snowflake (to discover data about deleted and updated fields). Implementation was as simples as just creating another stream and consuming from it in the SCD Type 2 table (inserting all rows in it, instead of actually implementing the INSERT, UPDATE and DELETE). Snowflake documentation recommends to have another stream in the scenario of multiple consumers, to avoid having issues with the offset management inside the stream.

SCD Type 2 pipeline in Snowflake using stream and task.

Finally, we had a job that deleted older data from the SCD Type 2 data to avoid storage inside Snowflake getting too expensive (historical tables can get big pretty quickly!). This was not a problem for data durability because we keep having the CDC data inside S3. In order to be cost-effective, we also move old S3 data to a cheaper storage class (such as Glacier or Infrequent Access). That allowed us to create ad-hoc reports on top of this data (using Athena or Glue jobs) and storing them in Snowflake if needed.

Below is the complete solution workflow, in the case you find it interesting. Thank you for reading!

Architecture design for near-real time data ingestion into Snowflake.

--

--