Understanding Materialized Views — 3 : Stream-Table Joins with CDC
Join a stream with a lookup table to enrich the content and produce a materialized view. Then use Debezium to synchronise the lookup table with source data.
In the previous post of this series, we learned how to convert an immutable event stream into a mutable table, which we can call a materialized view. Then we saw how we could leverage stream processing to maintain the materialized views across multiple machines in a reliable and scalable manner.
We often need to join a stream with a lookup table for enrichment, extracting additional fields from the table and adding them to each event. The enriched stream can then be materialized to feed the queries or pass to a downstream system for further processing.
This post explores the options for joining a stream with a database table and then materializing the enriched result. We will also discuss how to use Debezium to keep the lookup table synchronized with the source data.
To understand the concepts in this post, you need to have a fair knowledge on ksqlDB and Debezium. If you are new to them, I suggest you visit the references section to learn more about them.
Example — finding the total sales by country
As usual, I will take a realistic example to explain the concept better.
Assume that you are reading a stream of e-commerce orders from a Kafka topic, each order having the following format:
{
"id":4218,
"customer_id":90,
"total":56.03,
"order_date":1632142103
}
The objective is to analyze the orders in real-time and produce a view that shows total sales by country. The management will use this view to tune their sales process to tap into under-utilized markets.
As we saw above, an order event doesn’t carry the buyer’s location. But it has the customer_id, which we can use as a lookup key to fetch the customer’s country from a lookup table.
But, where’s that table? It sits inside a MySQL database, external to our stream processor, and has the following structure.
create table customers (
id INT PRIMARY KEY,
first_name VARCHAR(50),
last_name VARCHAR(50),
gender VARCHAR(50),
country VARCHAR(50)
);
An obvious solution persuades me to join each order event with the customers table based on the customer_id field. That’ll result in a denormalized, enriched stream like this.
This stream will carry sufficient information to build a materialized view that can answer our question by grouping the events by country, then computing the total.
Will things be that simple?
Not really. You’ll come across many problems. Let’s see what they are and how do we solve them in the coming sections.
Why joining with an external table is always a bad idea?
We will have several critical problems in the “obvious solution” that I discussed above.
Latency — We will make a lookup query against the database for each order event in the stream, enrich the event if there’s a match. That will add significant latency to the processing of every event — usually between 5–15 milliseconds.
Load on the database — The lookup process will impose a significant load on the database, which may not be in a state to handle. Stream processing systems can often handle 100K-500K events per second, but the database may only handle 10K events per second at reasonable performance. Ultimately, the database will be the bottleneck.
Therefore, we need a solution that scales as well as offers a higher processing throughput.
Rather than reaching out to the database for lookup data, what if we can move them inside our stream processing application to co-locate with the locally cached state?
Then, when you get order events, you can look up the customer_id at your local cache and enrich the event. And because you are using a local cache, this scales a lot better and will not affect the database and other apps using it.
That sounds interesting. But how do we move data in a database into the stream processor?
We can use a table for that.
Bringing lookup data into the stream processor
You can simply create a table called customers and then load it with the lookup table records. The following shows how you do that with ksqlDB, a Kafka-native stream processing framework.
CREATE TABLE customers (
id INT PRIMARY KEY,
first_name VARCHAR,
last_name VARCHAR,
gender VARCHAR,
country VARCHAR
) WITH (
kafka_topic='customers',
partitions=1,
value_format='avro'
);INSERT INTO movies (id, first_name, last_name, gender, country) VALUES (1, 'John','Doe','Male','US');INSERT INTO movies (id, first_name, last_name, gender, country) VALUES (2, 'John','Doe','Male','US');INSERT INTO movies (id, first_name, last_name, gender, country) VALUES (3, 'Simone','Bordeux','Female','FR');
ksqlDB offers you multiple options to move data from a database into a table, including the JDBC connector, which you can use along with Kafka Connect. Here, I just used the INSERT statements because I wanted to keep things simple.
Let’s define a stream to represent the orders as well.
CREATE STREAM orders (
id INT,
customer_id INT,
total DOUBLE,
order_date BIGINT
) WITH (
kafka_topic='orders',
value_format='json'
);
Then you can perform a stream-table join between orders stream and customers table to derive a materialized view called sales_by_country
.
CREATE TABLE sales_by_country AS
SELECT
country,
count(*) as total_orders,
sum(total) as total_revenue
FROM orders o
INNER JOIN customers c
ON o.customer_id = c.id
GROUP BY country
EMIT CHANGES;
When new orders arrive, sales_by_country
will update itself to reflect the changes in the total sales. You can issue a pull query to check the aggregated results from time to time.
select * from sales_by_country where country='BR';
That will yield an output like this:
How does the stream-table join work?
When ksqlDB joins a stream with a table, it keeps an internal buffer for the table. There’s no point in keeping a buffer for the stream because it is infinite. When a row from a table is read, it goes into the internal buffer. Then it will be joined against a row from the stream, which continuously sends data.
Our materialized view uses an INNER join, which means that if there’s a match between the stream and the table, a new row is created. If there is no match, the row is dropped. Note that updates to the table don’t cause the join to fire; only updates to the stream do so.
Automatic repartitioning of the enriched table
One thing to notice in our materialized view is that it has been keyed by country, which was not a key in either orders or customers.
What really happened there?
Since the SELECT query only had the country as a dimension, ksqlDB has automatically reshuffled the records by country. In simpler terms, the table has been partitioned by country.
Using CDC to deal with a fast-changing lookup data
A manually populated lookup table like above will be fine if the dataset is static, changes are less frequent, and small in size.
What if the contents in the lookup table grow fast and change frequently?
To make the materialized view consistent and prevent stale data, we need to capture changes made to the source table and apply them to locally cached tables in the stream processor. Manually Doing that is impossible.
Hence, we need to capture the changes made to source data and propagate them to the stream processor in real-time. Does that remember anything familiar?
Of course, we can use a change data capture (CDC) tool like Debezium to do that in a reliable and scalable manner.
When every component comes together, the final solution would look like this:
Capturing customers table changes with Debezium
We will use Debezium to create a new lookup table in ksqlDB that synchronizes with the source data. Then use that to perform the join instead of manually populated customers table.
Debezium is an open-source Kafka Connect component that listens for changes to a database (INSERTS, UPDATES, DELETES), translate them into change data capture (CDC) events, and pushes them to Kafka.
Creating the MySQL source connector
curl -i -X POST -H "Accept:application/json" -H "Content-Type:application/json" http://localhost:8083/connectors/ '
{
"name":"customer-connector",
"config":{
"connector.class":"io.debezium.connector.mysql.MySqlConnector",
"tasks.max":"1",
"database.hostname":"localhost",
"database.port":"3306",
"database.user":"root",
"database.password":"mysqlpwd",
"database.server.id":"184054",
"database.server.name":"mysql",
"database.whitelist":"customers_db",
"database.history.kafka.bootstrap.servers":"localhost:9092",
"database.history.kafka.topic":"schema-changes.customers",
"transforms":"unwrap", "transforms.unwrap.type":"io.debezium.transforms.ExtractNewRecordState",
"transforms.unwrap.drop.tombstones":false,
"transforms.unwrap.delete.handling.mode":"rewrite"
}
}
'
The above command deploys a MySQL source connector configuration into Kafka Connect. That connector then takes a snapshot of the customers table and writes that to a Kafka topic named mysql.customers_db.customers
. Debezium ensures that the subsequent changes made to the table are captured as changed events and written to the mysql.customers_db.customers
topic afterward.
Notice that the statement specifies an unwrap transform. By default, Debezium sends all events in an envelope that includes many pieces of information about the change captured. I’m only interested in reading the changed value here, so the command tells Kafka Connect to keep this information and discard the rest.
Materializing the change stream into a table
If you check the content of this topic, you’ll notice that it is a changelog that is not suitable as a lookup table as it contains multiple values for the same key.
In ksqlDB, If a table is created directly on top of a Kafka topic, it’s not materialized. Non-materialized tables can’t be queried because they would be highly inefficient. So, let’s first create a stream to represent the topic inside ksqlDB.
Note: Drop the previously created customers table to avoid the confusion. You can do that by drop table customers;
CREATE STREAM customers WITH (
kafka_topic = 'mysql.customers_db.customers',
value_format = 'avro'
);
Then materialized the changelog stream into a table.
CREATE TABLE customers_meta AS
SELECT
id,
latest_by_offset(first_name) AS first_name,
latest_by_offset(last_name) AS last_name,
latest_by_offset(gender) AS gender,
latest_by_offset(country) AS country
FROM customers
GROUP BY id
EMIT CHANGES;
The LATEST_BY_OFFSET aggregation in the above statement allows you to select any column and retains only the last value it receives, where “last” is in terms of offsets.
That ultimately gives us a queryable materialized table that can be joined with the orders stream like before. The only difference is that the customers table keeps on updating itself to reflect the changes in the source MySQL table.
CREATE TABLE sales_by_country AS
SELECT
country,
count(*) as total_orders,
sum(total) as total_revenue
FROM orders o
INNER JOIN customers_meta c
ON o.customer_id = c.id
GROUP BY country
EMIT CHANGES;
Takeaways
- If we attempt to join a stream with an external database table for every event in the stream, it will have a severe latency and imposes a significant load on the database.
- To avoid that, we can bring in the lookup data inside the stream processor and then join the stream with locally cached lookup data.
- A materialized table can be used to hold the lookup data. That will work for lookup datasets that are static, change infrequently, and small in size.
- If the lookup dataset changes frequently, we can use a change data capture (CDC) tool to stream the change events into Kafka. That change stream can be used as a lookup table after materializing it as a table.
Previous posts in this series
Understanding Materialized Views — Part 1
Understanding Materialized Views — Part 2