DIY — CDC Pipeline from MySQL to Snowflake
If you’ve ever needed to design a data streaming pipeline, you’ve probably encountered challenges such as durability, scalability, duplications, automation, monitoring, and more.
When I joined Fundbox in 2017, I had 6 years of experience with Big-Data technologies. In my role as a senior data engineer at Fundbox, I soon found myself trying to tackle a new challenge.
This time it was Change Data Capture (CDC).
Prior to 2017, members of the analytics team regularly ran heavy queries on top of MySQL, which caused them huge delays.
So, we decided to create a data-warehouse for Fundbox and make it available for analytics, data scientists, finance team, and others. We chose Snowflake as our cloud-based data warehouse, due to its separation between storage and computation, and the next step was to decide how to stream CDC into it.
Although you can find some off-the-shelf solutions for CDC that can specifically handle MySQL-to-Snowflake streaming, at Fundbox we encountered some additional requirements that needed to be addressed in a dedicated solution:
- Low Latency — from a change in MySQL to a row in Snowflake
- Each event need to contain full row data (not just delta)
- Fully structured tables in Snowflake
- Automatic handling of DDL SQL statements (CREATE/ALTER/DROP)
- Never drop columns or tables in Snowflake, even if dropped in MySQL
- No duplications on Snowflake
Implementing CDC ourselves from scratch turned out to be a good decision. The data pipeline we developed has been up for more than a year now and is serving an increasing number of consumers.
In this post, I will share the architecture, the challenges we encountered and how we overcame them, as well as some useful tips to start building your own CDC. We’ll dive deep into some core aspects, but not every part of the solution.
Bird’s Eye View
There are two main components in the pipeline:
- Binlog Streamer reads changes from MySQL Binary Log files and sends them to Kafka
- Spark Streaming job consumes data from Kafka and stores Parquet files in S3. It further executes Snowflake COPY commands from those files into target tables
Both are using Apache Zookeeper for offsets management, and for synchronization.
Let’s take a closer look at the Binlog Streamer.
Binlog Streamer: 3 main responsibilities
1. Catching changes
First, we looked for a way to hook on MySQL database and continually capture every DML and DDL statement in real time. We found that MySQL Binary Log fit the task, so we created a python program that streams data from the log using python-mysql-replication.
latest_binlog_state
is a tuple, containing the last binary log file name and the position where we stopped reading before the last shutdown (maintenance/error). You should keep updating it after each successfully-handled log event, we used Zookeeper for that, as it is robust and takes care of replications.
Losing this tuple means losing data or having duplicated data — so don’t lose it!
2. Sending the data
There are 3 types of DML statements:
- INSERT event will result in a dictionary with the columns’ values
- UPDATE will result in two dictionaries objects for before and after (not just the delta, but a full set of values of the row before and after the update)
- DELETE will result in a dictionary of the row before it has been deleted
Consume the underlying dictionary, serialize it into JSON, and send it to Kafka (keyed by the table name). We used simple Kafka Python Client, as it was best-suited for our database change rate. The decision was made based on this benchmark. Make sure to use Random partitioner, to evenly distribute the messages.
Enrich the JSON with the time of the change, along with its type (InsertRowsEvent/UpdateRowsEvent/DeleteRowsEvent).
{"change_time": "2018-03-22 17:28:33.000","change_type": "UpdateRowsEvent","col_1": "some value","col_2": 1234,"col_n": True}
3. Handling DDL SQL statements
Each DDL statement should be reflected in the relevant Snowflake schema/table. But before making any changes on Snowflake, suspend the Spark job, because it is relying on the old structure of the table. Here, Binlog Streamer is playing the role of Spark-Job-Orchestrator, by restarting the Spark job automatically for every schema change. Once the Spark job is suspended, Binlog Streamer can execute the DDL on Snowflake.
Snowflake has ANSI compliant SQL, but still has some differences and some extra reserved words that need to be escaped. The lack of a converter from SQL to Snowflake syntax made us create one, using some regex and string manipulations to achieve the desired conversions. Here we also took care of emitting DROP operations for columns/tables/schemas.
Once the converted query has been executed on Snowflake — resume the Spark job and continue streaming the next events from the binary log.
Next, let’s take a quick look at the layout of Binlog Streamer, and then move forward to talk about Spark job.
Spark Streaming Job
First, Kafka provides a scalable messaging queue that can serve multiple consumers in a distributed manner. At the same time, Spark Streaming is a fault-tolerant, distributed system that can process in parallel multiple tasks. Consequently, combining those two is a good idea if you need to consume versatile data and rapidly store it into multiple outputs while having the ability to scale as soon as your data volume increases.
Each Spark task reads messages from a single Kafka partition so you can scale out your Kafka topic together with your Spark cluster linearly, to increase the pipeline throughput. As your data load increases, just add partitions to the Kafka topic, and workers to your Spark cluster.
The Spark driver is a program that manages the job flow and schedules tasks. You should initialize SparkConfig in the driver. Make sure you set these properties:
spark.streaming.kafka.maxRatePerPartition
spark.streaming.receiver.maxRate
Spark.streaming.backpressure.enabled
spark.streaming.backpressure.initialRate
spark.streaming.stopGracefullyOnShutdown
Then, create an Input DStream using spark-streaming-kafka-0–8 :
Why not Snowflake Connector for Spark
The available Snowflake-Spark connector requires a DataFrame that can be saved into a single table. In our case, we get an RDD with mixed data that should be delivered to multiple tables. At first, we did try to utilize this connector and got this:
This approach creates a long and narrow DAG that will get longer with the number of tables. It is not driven by the actual data received (which involves just a portion of tables), so will always iterate on ALL tables → waste of time!
So do it yourself
You can easily distribute the work for each RDD on this stream by using the ‘mapPartitions’ transformation. This allows you to execute a piece of code in parallel on each Kafka partition. Use its return value to aggregate the name of tables that had new changes during this micro-batch.
extract_partition_data_to_s3
— In this method, you should transform the list of KafkaMassageAndMetadata objects into different Parquet files. Use the messages keys (=table name) to organize them into dedicated Parquet files, then upload to S3run_copy_commands_from_s3_to_snowflake
—If you don’t care about duplications, you can spread copy-command executions between Spark executors. But if you do care, you should run them all within the driver, in parallel, as part of a single transaction. As for now, Snowflake does not support the notion of “distributed-transaction”
Kafka Offsets Management: achieving exactly-once-delivery
In your pipeline, you might want to ensure that each message is delivered exactly once. Therefore, to monitor the streaming message-delivery progress you can use Zookeeper.
For each partition of a Kafka topic, update the latest offset that processed at the end of each Spark micro-batch. You can manage the offsets-ranges of each micro-batch by doing that:
Adding each offset range into a queue is critical. If your batch interval is 30 seconds, but some batch took 80 seconds, then the following batch is going to be queued. Driver executes enqueue_offset_ranges
on time (before queuing the batch), and executes process_micro_batch
only when all leading batches in the queue are completed. So you don’t want to override the offsets range of the late batch that is still running.
You should also make sure that Spark is not running more than one batch at a time. Think of a case where batch T+1 completed successfully and updated its offsets in ZK, but batch T is failing to handle it’s older offsets. This will cause a data-loss upon Spark job restart!conf.set(“spark.streaming.concurrentJobs”, “1”)
Wrapping up
The evolution of the pipeline from its first prototype, to what it is now, is quite fascinating. We have many more ideas on how to keep evolving and making further improvements.
We may consider adding a deduping process to replace the exactly-once-delivery semantic, and gain more scalability. Also on our roadmap is to look for a way to spare the Spark job reload upon DDL.
Stay tuned for more to come!