Understanding Materialized Views — 3 : Stream-Table Joins with CDC

Join a stream with a lookup table to enrich the content and produce a materialized view. Then use Debezium to synchronise the lookup table with source data.

Photo by Volodymyr Hryshchenko on Unsplash

Example — finding the total sales by country

{
"id":4218,
"customer_id":90,
"total":56.03,
"order_date":1632142103
}
create table customers (
id INT PRIMARY KEY,
first_name VARCHAR(50),
last_name VARCHAR(50),
gender VARCHAR(50),
country VARCHAR(50)
);
This is how the enriched stream would look like
Total sales are keyed by the country

Why joining with an external table is always a bad idea?

Rather than reaching out to the database for lookup data, what if we can move them inside our stream processing application to co-locate with the locally cached state?

Bringing lookup data into the stream processor

CREATE TABLE customers (
id INT PRIMARY KEY,
first_name VARCHAR,
last_name VARCHAR,
gender VARCHAR,
country VARCHAR
) WITH (
kafka_topic='customers',
partitions=1,
value_format='avro'
);
INSERT INTO movies (id, first_name, last_name, gender, country) VALUES (1, 'John','Doe','Male','US');INSERT INTO movies (id, first_name, last_name, gender, country) VALUES (2, 'John','Doe','Male','US');INSERT INTO movies (id, first_name, last_name, gender, country) VALUES (3, 'Simone','Bordeux','Female','FR');
CREATE STREAM orders (
id INT,
customer_id INT,
total DOUBLE,
order_date BIGINT
) WITH (
kafka_topic='orders',
value_format='json'
);
CREATE TABLE sales_by_country AS
SELECT
country,
count(*) as total_orders,
sum(total) as total_revenue
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.id
GROUP BY country
EMIT CHANGES;
select * from sales_by_country where country='BR';

How does the stream-table join work?

Automatic repartitioning of the enriched table

Using CDC to deal with a fast-changing lookup data

What if the contents in the lookup table grow fast and change frequently?

Final solutions architecture

Capturing customers table changes with Debezium

Creating the MySQL source connector

curl -i -X POST -H "Accept:application/json" -H  "Content-Type:application/json" http://localhost:8083/connectors/ '
{
"name":"customer-connector",
"config":{
"connector.class":"io.debezium.connector.mysql.MySqlConnector",
"tasks.max":"1",
"database.hostname":"localhost",
"database.port":"3306",
"database.user":"root",
"database.password":"mysqlpwd",
"database.server.id":"184054",
"database.server.name":"mysql",
"database.whitelist":"customers_db",
"database.history.kafka.bootstrap.servers":"localhost:9092",
"database.history.kafka.topic":"schema-changes.customers",
"transforms":"unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones":false,
"transforms.unwrap.delete.handling.mode":"rewrite"

}
}
'

Materializing the change stream into a table

CREATE STREAM customers WITH (
kafka_topic = 'mysql.customers_db.customers',
value_format = 'avro'
);
CREATE TABLE customers_meta AS
SELECT
id,
latest_by_offset(first_name) AS first_name,
latest_by_offset(last_name) AS last_name,
latest_by_offset(gender) AS gender,
latest_by_offset(country) AS country
FROM customers
GROUP BY id
EMIT CHANGES;
CREATE TABLE sales_by_country AS
SELECT
country,
count(*) as total_orders,
sum(total) as total_revenue
FROM orders o
INNER JOIN customers_meta c
ON o.customer_id = c.id
GROUP BY country
EMIT CHANGES;

Takeaways

  • If we attempt to join a stream with an external database table for every event in the stream, it will have a severe latency and imposes a significant load on the database.
  • To avoid that, we can bring in the lookup data inside the stream processor and then join the stream with locally cached lookup data.
  • A materialized table can be used to hold the lookup data. That will work for lookup datasets that are static, change infrequently, and small in size.
  • If the lookup dataset changes frequently, we can use a change data capture (CDC) tool to stream the change events into Kafka. That change stream can be used as a lookup table after materializing it as a table.

Previous posts in this series

References

--

--

EdU is a place where you can find quality content on event streaming, real-time analytics, and modern data architectures

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Dunith Dhanushka

Editor of Event-driven Utopia(eventdrivenutopia.com). Technologist, Writer, Senior Developer Advocate at Redpanda. Event-driven Architecture, DataInMotion