DIY — CDC Pipeline from MySQL to Snowflake

Zach Beniash
Fundbox Engineering
7 min readMay 19, 2019

If you’ve ever needed to design a data streaming pipeline, you’ve probably encountered challenges such as durability, scalability, duplications, automation, monitoring, and more.

When I joined Fundbox in 2017, I had 6 years of experience with Big-Data technologies. In my role as a senior data engineer at Fundbox, I soon found myself trying to tackle a new challenge.
This time it was Change Data Capture (CDC).

Prior to 2017, members of the analytics team regularly ran heavy queries on top of MySQL, which caused them huge delays.

So, we decided to create a data-warehouse for Fundbox and make it available for analytics, data scientists, finance team, and others. We chose Snowflake as our cloud-based data warehouse, due to its separation between storage and computation, and the next step was to decide how to stream CDC into it.
Although you can find some off-the-shelf solutions for CDC that can specifically handle MySQL-to-Snowflake streaming, at Fundbox we encountered some additional requirements that needed to be addressed in a dedicated solution:

  • Low Latency — from a change in MySQL to a row in Snowflake
  • Each event need to contain full row data (not just delta)
  • Fully structured tables in Snowflake
  • Automatic handling of DDL SQL statements (CREATE/ALTER/DROP)
  • Never drop columns or tables in Snowflake, even if dropped in MySQL
  • No duplications on Snowflake

Implementing CDC ourselves from scratch turned out to be a good decision. The data pipeline we developed has been up for more than a year now and is serving an increasing number of consumers.

In this post, I will share the architecture, the challenges we encountered and how we overcame them, as well as some useful tips to start building your own CDC. We’ll dive deep into some core aspects, but not every part of the solution.

Bird’s Eye View

There are two main components in the pipeline:

Both are using Apache Zookeeper for offsets management, and for synchronization.
Let’s take a closer look at the Binlog Streamer.

Binlog Streamer: 3 main responsibilities

1. Catching changes

First, we looked for a way to hook on MySQL database and continually capture every DML and DDL statement in real time. We found that MySQL Binary Log fit the task, so we created a python program that streams data from the log using python-mysql-replication.

Binlog Streamer — how to initiate a stream on top of the binary log of MySQL server

latest_binlog_state is a tuple, containing the last binary log file name and the position where we stopped reading before the last shutdown (maintenance/error). You should keep updating it after each successfully-handled log event, we used Zookeeper for that, as it is robust and takes care of replications.
Losing this tuple means losing data or having duplicated data — so don’t lose it!

2. Sending the data

There are 3 types of DML statements:

  • INSERT event will result in a dictionary with the columns’ values
  • UPDATE will result in two dictionaries objects for before and after (not just the delta, but a full set of values of the row before and after the update)
  • DELETE will result in a dictionary of the row before it has been deleted

Consume the underlying dictionary, serialize it into JSON, and send it to Kafka (keyed by the table name). We used simple Kafka Python Client, as it was best-suited for our database change rate. The decision was made based on this benchmark. Make sure to use Random partitioner, to evenly distribute the messages.

Enrich the JSON with the time of the change, along with its type (InsertRowsEvent/UpdateRowsEvent/DeleteRowsEvent).

{"change_time": "2018-03-22 17:28:33.000","change_type": "UpdateRowsEvent","col_1": "some value","col_2": 1234,"col_n": True}

3. Handling DDL SQL statements

Each DDL statement should be reflected in the relevant Snowflake schema/table. But before making any changes on Snowflake, suspend the Spark job, because it is relying on the old structure of the table. Here, Binlog Streamer is playing the role of Spark-Job-Orchestrator, by restarting the Spark job automatically for every schema change. Once the Spark job is suspended, Binlog Streamer can execute the DDL on Snowflake.

Snowflake has ANSI compliant SQL, but still has some differences and some extra reserved words that need to be escaped. The lack of a converter from SQL to Snowflake syntax made us create one, using some regex and string manipulations to achieve the desired conversions. Here we also took care of emitting DROP operations for columns/tables/schemas.

Once the converted query has been executed on Snowflake — resume the Spark job and continue streaming the next events from the binary log.

Next, let’s take a quick look at the layout of Binlog Streamer, and then move forward to talk about Spark job.

Binlog Streamer — how to handle CDC records according to their types

Spark Streaming Job

First, Kafka provides a scalable messaging queue that can serve multiple consumers in a distributed manner. At the same time, Spark Streaming is a fault-tolerant, distributed system that can process in parallel multiple tasks. Consequently, combining those two is a good idea if you need to consume versatile data and rapidly store it into multiple outputs while having the ability to scale as soon as your data volume increases.

Each Spark task reads messages from a single Kafka partition so you can scale out your Kafka topic together with your Spark cluster linearly, to increase the pipeline throughput. As your data load increases, just add partitions to the Kafka topic, and workers to your Spark cluster.

Spark Job layout — scales linearly with the number of Kafka partitions

The Spark driver is a program that manages the job flow and schedules tasks. You should initialize SparkConfig in the driver. Make sure you set these properties:

  • spark.streaming.kafka.maxRatePerPartition
  • spark.streaming.receiver.maxRate
  • Spark.streaming.backpressure.enabled
  • spark.streaming.backpressure.initialRate
  • spark.streaming.stopGracefullyOnShutdown

Then, create an Input DStream using spark-streaming-kafka-0–8 :

Spark Job — how to initiate direct stream on top of Kafka topic

Why not Snowflake Connector for Spark

The available Snowflake-Spark connector requires a DataFrame that can be saved into a single table. In our case, we get an RDD with mixed data that should be delivered to multiple tables. At first, we did try to utilize this connector and got this:

Spark Job — transformations and actions that you better avoid

This approach creates a long and narrow DAG that will get longer with the number of tables. It is not driven by the actual data received (which involves just a portion of tables), so will always iterate on ALL tables → waste of time!

So do it yourself

You can easily distribute the work for each RDD on this stream by using the ‘mapPartitions’ transformation. This allows you to execute a piece of code in parallel on each Kafka partition. Use its return value to aggregate the name of tables that had new changes during this micro-batch.

Spark Job — transformations and actions for each micro batch
  • extract_partition_data_to_s3 — In this method, you should transform the list of KafkaMassageAndMetadata objects into different Parquet files. Use the messages keys (=table name) to organize them into dedicated Parquet files, then upload to S3
  • run_copy_commands_from_s3_to_snowflake —If you don’t care about duplications, you can spread copy-command executions between Spark executors. But if you do care, you should run them all within the driver, in parallel, as part of a single transaction. As for now, Snowflake does not support the notion of “distributed-transaction”

Kafka Offsets Management: achieving exactly-once-delivery

In your pipeline, you might want to ensure that each message is delivered exactly once. Therefore, to monitor the streaming message-delivery progress you can use Zookeeper.
For each partition of a Kafka topic, update the latest offset that processed at the end of each Spark micro-batch. You can manage the offsets-ranges of each micro-batch by doing that:

Offsets Management in Spark — beware of Spark Streaming queuing system → handle queue of offset-ranges

Adding each offset range into a queue is critical. If your batch interval is 30 seconds, but some batch took 80 seconds, then the following batch is going to be queued. Driver executes enqueue_offset_ranges on time (before queuing the batch), and executes process_micro_batch only when all leading batches in the queue are completed. So you don’t want to override the offsets range of the late batch that is still running.

You should also make sure that Spark is not running more than one batch at a time. Think of a case where batch T+1 completed successfully and updated its offsets in ZK, but batch T is failing to handle it’s older offsets. This will cause a data-loss upon Spark job restart!
conf.set(“spark.streaming.concurrentJobs”, “1”)

Wrapping up

The evolution of the pipeline from its first prototype, to what it is now, is quite fascinating. We have many more ideas on how to keep evolving and making further improvements.
We may consider adding a deduping process to replace the exactly-once-delivery semantic, and gain more scalability. Also on our roadmap is to look for a way to spare the Spark job reload upon DDL.
Stay tuned for more to come!

--

--