Building zero-latency data lake using Change Data Capture

Ofir Ventura
Yotpo Engineering
5 min readJun 25, 2019

--

Introduction

As systems get more complex, we need more solutions to maintain large amounts of data in a centralized place, monitor it, and query it without interfering with the operational databases. At Yotpo we have many microservices and multiple databases, so the need to transfer data and expose it in a centralized data lake is crucial. We were looking for easy to use infrastructure (configurations only) and save the engineers time.

The concept of Change Data Capture (CDC) architecture is to track the data that has changed so that actions can be taken using the changed data (Wiki).

The challenge is to track database changes and have materialized views for different purposes. This could be useful for analytics (e.g. Apache Spark jobs), monitoring changes in data, search index, measuring data quality, triggering event based actions in your ecosystem.

Tracking database changes with CDC

In this article, I‘ll explain step-by-step how we implemented Change Data Capture architecture within our ecosystem at Yotpo.

Before we started using CDC, we maintained daily processes to load database tables into our data lake, the process consisted of selecting the full tables and overwriting an S3 folder with Parquet files. This method is not scalable, overloads databases, and time-consuming. We wanted to be able to query the latest dataset, and expose data in our data lake (e.g. in Amazon s3 and Hive metastore), to make sure it ends up in all the right places. After having this architecture in place, we’ll have an up-to-date fully monitored copy of our production databases in our data lake.

The basic concept is whenever a change occurs in the database (create/update/delete), database logs are extracted and moved to Apache Kafka. Materialized views jobs consume those events and keep the views up to date. The materialized view streaming job consumes changes to always have the latest view of the database in S3 and Hive. Internal engineers can also consume those changes independently.

CDC with Kafka and Metorikku

Figure 1: Change Data Capture with Metorikku

Debezium (Kafka Connect)

The first part is to use a database plugin (Kafka Connect based) — in our case — Debezium, and specifically its MySQL connector. You‘ll need to make sure that you have BINLOG enabled in Row mode for this to work. Debezium then connects to the database using JDBC and executes a snapshot of the entire content. After that, an event is triggered for each change in data in real time.

These events are encoded using Avro and are sent directly into Kafka. You can learn more about Kafka Connect and Schema Registry in this Debezium tutorial.

Why Avro is Awesome

Avro has a static type of schema that can be evolved. Adding a column to the database evolves the schema, but still maintains backward compatibility. We prefer using Avro encoding for data-transfer-objects because it’s very compact, and has various data types — such as multiple numeric types and bytes that aren’t supported on JSON.

Schema Registry

One of the cool parts here is how schema changes happen in this process. When registering a new database plugin, the database schema is registered in Schema Registry, it is derived from the database and automatically translates the schema to Avro.

Whenever a change occurs in the schema, a new version is added to the schema registry for that specific table, later allowing us to browse the different schema versions.

Apache Hudi Storage Format

The next part is to handle the materialized view. One of the biggest challenges when working with data lakes is updating data in an existing dataset. In a classic file-based data lake architecture, when we want to update a row we would have to read the entire latest dataset and overwrite it.

Apache Hudi format is an open-source storage format that brings ACID transactions to Apache Spark. We chose Hudi over other formats, like Parquet, because it allows incremental updates over key expression, which in our case is the primary key of the table.

For Hudi to work, we need to define three important parts. Key column — the key that distinguishes each row in the input. Time column — based on this column Hudi will update rows with newer values. Partition — how to partition the rows.

Metorikku

To combine all the above components, we used our open source Metorikku library. Metorikku simplifies writing and executing ETLs on top of Apache Spark and supports multiple output formats.

Metorikku consumes the Avro events from Kafka, deserializes them using Schema Registry, and writes them as Hudi format.

We can configure our Metorikku materialized view job to sync with the Hive metastore which will grant our jobs immediate access to it. This comes out-of-the-box with Hudi and a simple Hive URL configuration.

steps:
- dataFrameName: cdc_filtered
sql:
SELECT ts_ms,
op,
before,
after
FROM cdc_events
WHERE op IN ('d', 'u', 'c')
- dataFrameName: cdc_by_event
sql:
SELECT ts_ms,
CASE WHEN op = 'd' THEN before ELSE after END AS cdc_fields,
CASE WHEN op = 'd' THEN true ELSE false END AS _hoodie_delete
FROM cdc_filtered
- dataFrameName: cdc_with_fields
sql:
SELECT ts_ms,
cdc_fields.*,
_hoodie_delete
FROM cdc_by_event
- dataFrameName: cdc_table
sql:
SELECT *,
id AS hoodie_key,
from_unixtime(created_at, 'yyyy/MM/dd') AS hoodie_partition
FROM cdc_with_fields
output:
- dataFrameName: cdc_table
outputType: Hudi
outputOptions:
path: table_view.parquet
keyColumn: hoodie_key
timeColumn: ts_ms
partitionBy: hoodie_partition
hivePartitions: year,month,day
tableName: hoodie_test
saveMode: Append

The configuration above is the materialized view metric, that reads the events and creates the materialized view. You can find a full dockerized (Docker compose) example on our end-to-end CDC test. You can use the Docker compose file as a reference for how to set it up in production when running in a dockerized environment (Yotpo is using Hashicorp’s Nomad over AWS).

Check out the full Metorikku job and metric files.

Monitoring

Kafka Connect comes with out-of-the-box monitoring, it allows us to have a deep understanding of what’s going on in each of our database connectors.

Figure 2: Monitoring Kafka Connect

Using Metorikku, we also monitor the actual data, for example counting the number of events per type (create/update/delete) for each of our CDC tables. A single Metorikku job can consume CDC topics using Kafka’s topic pattern.

Looking Forward

There are many solutions out there for the challenges we discussed above. We integrated some of the best solutions to deploy the Change Data Capture infrastructure. That enabled us to manage and monitor our data lake better and we can only improve from here. Looking forward, the capabilities of the infrastructure will expand and support more databases (i.e. Mongo, Cassandra, PostgreSQL). All the tools already exist, the challenge is to integrate them well, to advocate and educate the R&D and the community. The more we rely on the infrastructure, the better accessibility we get, synergies between the services, monitoring, and data quality checks.

--

--