CDC-based Upserts with Debezium, Apache Kafka, and Apache Pinot

How to build a streaming data pipeline to capture MySQL database changes and stream them to Apache Pinot via Debezium and Kafka

Photo by T K on Unsplash

Upserting means inserting a record into a database if it does not already exist or updating it if it does exist. Analytics database at the end of a streaming data pipeline can benefit from upserts to maintain the data consistency with the source database.

This article explores a minimal viable setup for a streaming data pipeline that captures changes from MySQL and streams them to Apache Pinot via Debezium and Apache Kafka. You can find several videos on the same topic. But this article gives you a solid blueprint to start building your CDC pipeline at scale.

Why do we need upserts?

A real-time analytics system consists of several sub-systems working together to derive insights from events flowing through them.

Change data capture (CDC) tools such as Debezium capture changes in transactional databases, transform them as events and streams them into an event streaming platform like Kafka or Pulsar. These events optionally go through a streaming ETL pipeline for further massaging and will end up in the serving layer, a read-optimized data store that serves analytics at scale.

Events flowing through a real-time analytics system typically have a key-value pair structure. Usually, the event key consists of an event attribute, while the value consists of the event payload.

Events consist of a key and value

Practically, events arriving in a stream can have the same key while having updated values over time. For example, a changelog stream from the ORDERS table can have the same key (order id) while having different values in the payload over time.

Each change made to the orders table results in firing a new change event. They have the same key, but different payloads.

When the serving layer receives these change events, it gets to decide what to do with them. Assuming that the events are written to a table in the serving database, there can be two alternatives:

  1. Append — Change events with the same key get appended as a new row in the table, capturing each change made to the source table.
  2. Upsert — Change events with the same key get merged to reflect the latest version of the event. There can be only one event in the destination table with the same key.
Change log of events vs. the latest version

Upserts help to maintain a strong data consistency among source databases and derived data systems. Because, from an analytics perspective, you may only be interested in the most up-to-date version of events.

Upserts with Apache Pinot

Apache Pinot is a real-time OLAP database that can ingest data from a streaming data source like Kafka and run high-throughput, low-latency OLAP queries on the ingested data. Due to its speed, throughput, and ability to maintain data freshness, Apache Pinot is ideal for the serving layer.

Although the data inside Pinot is immutable, it supports upserts since the 0.6.0 release, allowing you to query only the latest version of the events streamed into it. But remember that data stays immutable even if you enable upserts on a Pinot table.

It takes an entire blog post to explain how upserts work in Pinot. So I will omit that now and focus only on the practical usage. However, you can learn more about Pinot upserts by visiting the following.

Introduction to Upserts in Apache Pinot by Kenney Bastani

Upserts and JSON indexing in Apache Pinot by Yupeng Fu and Jackie Jiang

Streaming ingestion with upserts on Apache Pinot docs

Full upserts in Pinot on StarTree Developer Portal

Partial upserts in Pinot on StarTree Developer Portal

Upserts MVP with MySQL, Debezium, Kafka, and Pinot

For those who want to quickly set up and experiment with how Pinot work with the Apache Pinot ecosystem, I put together a Docker compose a project that brings only the minimal but essential components.

Impatient readers can go ahead and clone the Git repo from below.

I will walk you through each project component in the coming sections.

Use case

The project mimics an online e-commerce store with MySQL as the transactional database. E-commerce orders are captured in the orders MySQL table with the following schema.

CREATE TABLE IF NOT EXISTS fakeshop.orders
(
id SERIAL PRIMARY KEY,
user_id BIGINT UNSIGNED REFERENCES users(id),
product_id BIGINT UNSIGNED REFERENCES products(id),
status VARCHAR(50) DEFAULT 'OPEN',
quantity INT UNSIGNED DEFAULT 1,
total FLOAT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP on
UPDATE CURRENT_TIMESTAMP
);

When an order goes through its lifecycle stages, the status field should transition from OPEN to PROCESSING to SHIPPED.

The goal is to capture the changes made to the orders table in MySQL and ship them into Apache Pinot so that we can run real-time analytics on orders.

We will use Debezium and Apache Kafka to build this real-time data pipeline. Debezium captures the changes made to the orders table and streams them into Kafka, allowing Pinot to ingest them in real-time.

Solutions architecture

Setting up the project

The project comes as a Docker Compose project, and it’s in the ready-to-run state.

First, ensure you have Docker Compose installed on your machine and allocate at least 6 CPU cores and 8 GB of memory for the Docker engine.

Clone the following Git repo and change into the pinot-upserts folder.

git clone git@github.com:dunithd/edu-samples.git
cd pinot-upserts

You will find the docker-compose.yml file inside the project root directory that bundles the following containers.

  1. MySQL database
  2. Zookeeper
  3. Kafka
  4. Debezium
  5. Pinot Controller
  6. Pinot Broker
  7. Pinot Server
  8. Order simulator

Fakeshop — MySQL database structure

The bootstrap script you can find inside <project_root>/mysql directory will create the fakeshop schema in MySQL, along with users, products, and orders tables. That will happen automatically upon the MySQL container startup.

Configuring Debezium

We will use the debezium/connect image here, a Kafka Connect image with all Debezium connectors, and part of the Debezium platform.

The debezium_deploy container registers a MySQL Debezium connector for the fakeshop schema with the following configurations.

{
"connector.class":"io.debezium.connector.mysql.MySqlConnector",
"database.hostname":"mysql",
"database.port":3306,
"database.user":"debezium",
"database.password":"dbz",
"database.server.name":"mysql",
"database.server.id":"223344",
"database.allowPublicKeyRetrieval":true,
"database.history.kafka.bootstrap.servers":"kafka:9092",
"database.history.kafka.topic":"mysql-history",
"database.include.list":"fakeshop",
"time.precision.mode":"connect",
"include.schema.changes":false,
"transforms":"unwrap",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones":false,
"transforms.unwrap.delete.handling.mode":"rewrite"
}

Simulating the orders stream

You can find a Python script inside the <project_root>/simulator folder. This script rapidly inserts random records into the orders table to simulate an environment of many customers concurrently buying items from the site.

The script uses the Python Faker library to generate mock values for users, products, and orders.

The remaining containers in the project represent Zookeeper, Kafka, and Pinot. Here we will create a Pinot cluster of three nodes; a controller, a broker, and a server.

Running the project

Now that we’ve seen the critical configurations of the project, it’s time to see everything in action.

Execute the following to bring the Docker stack up.

docker compose up -d

That will first build the simulator container from the local Docker project, start the MySQL container, create the schema, and start the remaining services. The simulator will start as the last item and keep inserting new orders into the database.

When new orders are inserted into the orders table, Debezium starts streaming the change events into the mysql.fakeshop.orders topic.

A sample change event written to that topic would look the following.

{
"schema":{
"type":"struct",
"fields":[
{
"type":"int64",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":true,
"field":"user_id"
},
{
"type":"int64",
"optional":true,
"field":"product_id"
},
{
"type":"string",
"optional":true,
"default":"OPEN",
"field":"status"
},
{
"type":"int64",
"optional":true,
"default":1,
"field":"quantity"
},
{
"type":"double",
"optional":true,
"field":"total"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.time.ZonedTimestamp",
"version":1,
"field":"created_at"
},
{
"type":"int64",
"optional":true,
"name":"org.apache.kafka.connect.data.Timestamp",
"version":1,
"default":0,
"field":"updated_at"
},
{
"type":"string",
"optional":true,
"field":"__deleted"
}
],
"optional":false,
"name":"mysql.fakeshop.orders.Value"
},
"payload":{
"id":617,
"user_id":38,
"product_id":52,
"status":"OPEN",
"quantity":5,
"total":460.3500061035156,
"created_at":"2022-07-15T13:15:44Z",
"updated_at":1657890944000,
"__deleted":"false"
}
}

You can verify the CDC stream by running a command like this:

kcat -b localhost:9092 -t mysql.fakeshop.orders -C

Create the Pinot schema and table

Now that we have the orders change stream coming into Kafka. Let’s configure Pinot to ingest events from there and convert them into segments. We can do that by creating a Pinot schema and a real-time table.

You will find the orders_schema.json inside the <project_root>/pinot/config folder. Notice the primaryKeyColumns definition, which is mandatory for upserts.

{
"schemaName":"orders",
"primaryKeyColumns":[
"payload.id"
],

"dimensionFieldSpecs":[
{
"name":"payload.id",
"dataType":"INT"
},
{
"name":"payload.user_id",
"dataType":"INT"
},
{
"name":"payload.product_id",
"dataType":"INT"
},
{
"name":"payload.status",
"dataType":"STRING"
}
],
"metricFieldSpecs":[
{
"name":"payload.quantity",
"dataType":"INT"
},
{
"name":"payload.total",
"dataType":"FLOAT"
}
],
"dateTimeFieldSpecs":[
{
"name":"payload.updated_at",
"dataType":"LONG",
"format":"1:MILLISECONDS:EPOCH",
"granularity":"1:MILLISECONDS"
}
]
}

The table definition can be found in the same folder, with the name orders_table.json. Notice the configuration blocks under streamConfig and upsertConfig.

{
"tableName":"orders",
"tableType":"REALTIME",
"segmentsConfig":{
"timeColumnName":"payload.updated_at",
"timeType":"MILLISECONDS",
"retentionTimeUnit":"DAYS",
"retentionTimeValue":"1",
"segmentPushType":"APPEND",
"segmentAssignmentStrategy":"BalanceNumSegmentAssignmentStrategy",
"schemaName":"orders",
"replicasPerPartition":"1"
},
"tenants":{

},
"tableIndexConfig":{
"loadMode":"MMAP",
"streamConfigs":{
"streamType":"kafka",
"stream.kafka.consumer.type":"lowLevel",
"stream.kafka.topic.name":"mysql.fakeshop.orders",
"stream.kafka.decoder.class.name":"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.hlc.zk.connect.string":"zookeeper:2181/kafka",
"stream.kafka.consumer.factory.class.name":"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.zk.broker.url":"zookeeper:2181/kafka",
"stream.kafka.broker.list":"kafka:9092",
"realtime.segment.flush.threshold.size":30,
"realtime.segment.flush.threshold.rows":30
}
},
"ingestionConfig":{
"complexTypeConfig":{
"delimeter":"."
}
},
"metadata":{
"customConfigs":{

}
},
"upsertConfig":{
"mode":"FULL"
},
"routing":{
"instanceSelectorType":"strictReplicaGroup"
}

}

Execute the following command to create the schema and table for orders.

docker exec -it pinot-controller /opt/pinot/bin/pinot-admin.sh AddTable \
-tableConfigFile /config/orders_table.json \
-schemaFile /config/orders_schema.json -exec

If the command ran successfully, you should see orders arriving at the orders table inside the Pinot Query console.

You can see the orders table getting populated with new data in the Pinot Query Console

Testing upserts

Now we have the end-to-end streaming data pipeline in operation, and new orders written to MySQL are showing up in the Pinot table.

Let’s go ahead and verify the upserts functionality.

Change the order status in MySQL

Execute the following command in a terminal to log into the MySQL container and use the fakeshop database.

docker exec -it mysql /bin/bash
mysql -u mysqluser -p
mysqlpw
use fakeshop;

Run the following query to see the status of orders.

SELECT status        AS status,
Count(status) AS orders
FROM orders
GROUP BY status;

You should only see the orders with OPEN status.

+--------+--------+
| status | orders |
+--------+--------+
| OPEN | 2686 |
+--------+--------+
1 row in set (0.01 sec)

Let’s update the first 100 orders to have the PROCESSING status. Note that the order id is set to increase sequentially from 1.

UPDATE orders
SET status = 'PROCESSING'
WHERE id <= 100;

Verify the status change in Pinot

After a few seconds, run the following query in the Pinot query console. You should see the 100 orders with the PROCESSING status.

SELECT payload.status        AS status,
Count(payload.status) AS orders
FROM orders
GROUP BY payload.status

You can also update the first 50 orders to have the SHIPPED status and run the same query to witness that 50 orders show up in the result with the SHIPPED status.

Notice that it took only a few seconds to propagate the MySQL table changes into Pinot through this data pipeline.

Summary

Maintaining strong data consistency between a source and derived databases is crucial for a CDC-based streaming data pipeline. The destination database must reflect the latest changes made to the source database as the data changes rapidly.

A real-time OLAP database like Apache Pinot leverages its upsert feature to deliver a robust end-to-end data consistency. Upserts ensure that the ingested data set is always accurate and kept up-to-date to reflect the upstream changes.

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Dunith Dhanushka

Dunith Dhanushka

Editor of Event-driven Utopia(eventdrivenutopia.com). Technologist, Writer, Senior Developer Advocate at Redpanda. Event-driven Architecture, DataInMotion