Building CQRS Views with Debezium, Kafka, Materialize, and Apache Pinot — Part 2

How to build an incrementally updated materialized view that serves queries in a faster and scalable manner?

Photo by Vitalii Chernopyskyi on Unsplash
Solution architecture we discussed in part 1

Step 1: Event-enabling Microservices data

Streaming orders from OrderService with Debezium

curl -H 'Content-Type: application/json' localhost:8083/connectors --data '
{
"name":"orders-connector",
"config":{
"connector.class":"io.debezium.connector.mysql.MySqlConnector",
"tasks.max":"1",
"database.hostname":"mysql",
"database.port":"3306",
"database.user":"debezium",
"database.password":"dbz",
"database.server.id":"184054",
"database.server.name":"mysql",
"database.include.list":"pizzashop",
"database.history.kafka.bootstrap.servers":"kafka:9092",
"database.history.kafka.topic":"mysql-history"
}
}'

Capturing order status changes from KitchenService

{
"id":"1",
"order_id":1,
"status":"CREATED",
"updated_at":1453535342
}
kcat -b localhost:29092 -t order_updates -T -P -l data/updates.txt

Step 2: Building the order_summary materialized view.

Define sources and views

psql -U materialize -h localhost -p 6875 materialize
CREATE SOURCE orders
FROM KAFKA BROKER 'kafka:9092' TOPIC 'mysql.pizzashop.orders'
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://schema-registry:8081' ENVELOPE DEBEZIUM;
CREATE SOURCE items
FROM KAFKA BROKER 'kafka:9092' TOPIC 'mysql.pizzashop.order_items'
FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://schema-registry:8081' ENVELOPE DEBEZIUM;
CREATE SOURCE updates_source
FROM KAFKA BROKER 'kafka:9092' TOPIC 'order_updates'
FORMAT BYTES;
CREATE MATERIALIZED VIEW updates AS
SELECT
(data->>'id')::int AS id,
(data->>'order_id')::int AS order_id,
data->>'status' AS status,
data->>'updated_at' AS updated_at
FROM (SELECT CONVERT_FROM(data, 'utf8')::jsonb AS data FROM updates_source);
materialize=> show sources;
name
----------------
items
orders
updates_source
(3 rows)
materialize=> show views;
name
---------
updates
(1 row)

Define the order_summary materialized view

CREATE MATERIALIZED VIEW order_summary AS
SELECT
orders.order_id AS order_id,
orders.total AS total,
orders.created_at as created_at,
array_agg(distinct concat( items.name,'|',items.quantity)) as items,
array_agg(distinct concat( updates.status,'|',updates.updated_at)) as status
FROM orders
JOIN items ON orders.order_id=items.order_id
JOIN updates ON orders.order_id=updates.order_id
GROUP BY orders.order_id, orders.created_at, orders.total;
materialize=> select * from order_summary;
order_summary materialized view is populated with existing data

Define the Kafka sink

CREATE SINK results
FROM order_summary
INTO KAFKA BROKER 'kafka:9092' TOPIC 'orders_enriched'
CONSISTENCY TOPIC 'orders_enriched-consistency'
CONSISTENCY FORMAT AVRO USING CONFLUENT SCHEMA REGISTRY 'http://schema-registry:8081'
WITH (reuse_topic=true)
FORMAT JSON;
SELECT sink_id, name, topic
FROM mz_sinks
JOIN mz_kafka_sinks ON mz_sinks.id = mz_kafka_sinks.sink_id;
Remember the topic name

Step 3: Serving the enriched materialized view

Why Apache Pinot?

Ingesting nested JSON objects and multi-value fields

{
"before":null,
"after":{
"row":{
"order_id":1,
"total":50.0,
"created_at":"1660194991000",
"items":[
"Chicken BBQ|1",
"Sri Lankan Spicy Chicken Pizza|1"
],
"status":[
"CREATED|1453535342",
"PROCESSING|1453535345"
]
}
},
"transaction":{
"id":"1660196614999"
}
}

Enabling upserts on order_id

Define the orders schema and table

docker exec -it pinot-controller /opt/pinot/bin/pinot-admin.sh AddTable \
-tableConfigFile /config/orders_table.json \
-schemaFile /config/orders_schema.json -exec

Testing the end-to-end solution

SELECT 
order_id,
total,
items,
status
FROM orders
items and status are multi-value columns, storing values as string arrays
{"id":"5","order_id":1,"status":"READY","updated_at":1453535345}
SELECT 
order_id,
total,
items,
status
FROM orders
WHERE order_id=1

Takeaways — what more can we do?

Since that table has all the necessary information to populate the UI, it is read-optimized and avoids additional on-demand joins and filtering. That enables rendering the order summary UI faster, enhancing the overall user experience.

How to expand the solution?

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Dunith Dhanushka

Editor of Event-driven Utopia(eventdrivenutopia.com). Technologist, Writer, Senior Developer Advocate at Redpanda. Event-driven Architecture, DataInMotion