CDC-based Upserts with Debezium, Apache Kafka, and Apache Pinot

How to build a streaming data pipeline to capture MySQL database changes and stream them to Apache Pinot via Debezium and Kafka

Photo by T K on Unsplash

Why do we need upserts?

Events consist of a key and value
Each change made to the orders table results in firing a new change event. They have the same key, but different payloads.
Change log of events vs. the latest version

Upserts with Apache Pinot

Upserts MVP with MySQL, Debezium, Kafka, and Pinot

Use case

CREATE TABLE IF NOT EXISTS fakeshop.orders
(
id SERIAL PRIMARY KEY,
user_id BIGINT UNSIGNED REFERENCES users(id),
product_id BIGINT UNSIGNED REFERENCES products(id),
status VARCHAR(50) DEFAULT 'OPEN',
quantity INT UNSIGNED DEFAULT 1,
total FLOAT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at DATETIME DEFAULT CURRENT_TIMESTAMP on
UPDATE CURRENT_TIMESTAMP
);

The goal is to capture the changes made to the orders table in MySQL and ship them into Apache Pinot so that we can run real-time analytics on orders.

Solutions architecture

Setting up the project

git clone git@github.com:dunithd/edu-samples.git
cd pinot-upserts

Fakeshop — MySQL database structure

Configuring Debezium

{
"connector.class":"io.debezium.connector.mysql.MySqlConnector",
"database.hostname":"mysql",
"database.port":3306,
"database.user":"debezium",
"database.password":"dbz",
"database.server.name":"mysql",
"database.server.id":"223344",
"database.allowPublicKeyRetrieval":true,
"database.history.kafka.bootstrap.servers":"kafka:9092",
"database.history.kafka.topic":"mysql-history",
"database.include.list":"fakeshop",
"time.precision.mode":"connect",
"include.schema.changes":false,
"transforms":"unwrap",
"transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones":false,
"transforms.unwrap.delete.handling.mode":"rewrite"
}

Simulating the orders stream

Running the project

docker compose up -d
{
"schema":{
"type":"struct",
"fields":[
{
"type":"int64",
"optional":false,
"field":"id"
},
{
"type":"int64",
"optional":true,
"field":"user_id"
},
{
"type":"int64",
"optional":true,
"field":"product_id"
},
{
"type":"string",
"optional":true,
"default":"OPEN",
"field":"status"
},
{
"type":"int64",
"optional":true,
"default":1,
"field":"quantity"
},
{
"type":"double",
"optional":true,
"field":"total"
},
{
"type":"string",
"optional":true,
"name":"io.debezium.time.ZonedTimestamp",
"version":1,
"field":"created_at"
},
{
"type":"int64",
"optional":true,
"name":"org.apache.kafka.connect.data.Timestamp",
"version":1,
"default":0,
"field":"updated_at"
},
{
"type":"string",
"optional":true,
"field":"__deleted"
}
],
"optional":false,
"name":"mysql.fakeshop.orders.Value"
},
"payload":{
"id":617,
"user_id":38,
"product_id":52,
"status":"OPEN",
"quantity":5,
"total":460.3500061035156,
"created_at":"2022-07-15T13:15:44Z",
"updated_at":1657890944000,
"__deleted":"false"
}
}
kcat -b localhost:9092 -t mysql.fakeshop.orders -C

Create the Pinot schema and table

{
"schemaName":"orders",
"primaryKeyColumns":[
"payload.id"
],

"dimensionFieldSpecs":[
{
"name":"payload.id",
"dataType":"INT"
},
{
"name":"payload.user_id",
"dataType":"INT"
},
{
"name":"payload.product_id",
"dataType":"INT"
},
{
"name":"payload.status",
"dataType":"STRING"
}
],
"metricFieldSpecs":[
{
"name":"payload.quantity",
"dataType":"INT"
},
{
"name":"payload.total",
"dataType":"FLOAT"
}
],
"dateTimeFieldSpecs":[
{
"name":"payload.updated_at",
"dataType":"LONG",
"format":"1:MILLISECONDS:EPOCH",
"granularity":"1:MILLISECONDS"
}
]
}
{
"tableName":"orders",
"tableType":"REALTIME",
"segmentsConfig":{
"timeColumnName":"payload.updated_at",
"timeType":"MILLISECONDS",
"retentionTimeUnit":"DAYS",
"retentionTimeValue":"1",
"segmentPushType":"APPEND",
"segmentAssignmentStrategy":"BalanceNumSegmentAssignmentStrategy",
"schemaName":"orders",
"replicasPerPartition":"1"
},
"tenants":{

},
"tableIndexConfig":{
"loadMode":"MMAP",
"streamConfigs":{
"streamType":"kafka",
"stream.kafka.consumer.type":"lowLevel",
"stream.kafka.topic.name":"mysql.fakeshop.orders",
"stream.kafka.decoder.class.name":"org.apache.pinot.plugin.stream.kafka.KafkaJSONMessageDecoder",
"stream.kafka.hlc.zk.connect.string":"zookeeper:2181/kafka",
"stream.kafka.consumer.factory.class.name":"org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.zk.broker.url":"zookeeper:2181/kafka",
"stream.kafka.broker.list":"kafka:9092",
"realtime.segment.flush.threshold.size":30,
"realtime.segment.flush.threshold.rows":30
}
},
"ingestionConfig":{
"complexTypeConfig":{
"delimeter":"."
}
},
"metadata":{
"customConfigs":{

}
},
"upsertConfig":{
"mode":"FULL"
},
"routing":{
"instanceSelectorType":"strictReplicaGroup"
}

}
docker exec -it pinot-controller /opt/pinot/bin/pinot-admin.sh AddTable \
-tableConfigFile /config/orders_table.json \
-schemaFile /config/orders_schema.json -exec
You can see the orders table getting populated with new data in the Pinot Query Console

Testing upserts

Change the order status in MySQL

docker exec -it mysql /bin/bash
mysql -u mysqluser -p
mysqlpw
use fakeshop;
SELECT status        AS status,
Count(status) AS orders
FROM orders
GROUP BY status;
+--------+--------+
| status | orders |
+--------+--------+
| OPEN | 2686 |
+--------+--------+
1 row in set (0.01 sec)
UPDATE orders
SET status = 'PROCESSING'
WHERE id <= 100;

Verify the status change in Pinot

SELECT payload.status        AS status,
Count(payload.status) AS orders
FROM orders
GROUP BY payload.status

Summary

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Dunith Dhanushka

Editor of Event-driven Utopia(eventdrivenutopia.com). Technologist, Writer, Senior Developer Advocate at Redpanda. Event-driven Architecture, DataInMotion