Optimizing star-schema queries with IN queries and denormalization in Clickhouse
Most data environments usually have 2 main groups of data objects. First — event based objects, usually organized into timeseries tables. Second — instances (with properies/features), usually organized into ID-based tables (and stored in row-based storages). This is what called star (or normalized) schema. This stucture is storage efficient, but can result in slow read performance. In some cases denormalization can be used to make things faster, let’s see how.
Star schema
Let’s see how Clickhouse performs using joins on sample star-schema data. We have 2 tables — log (timeseries events) and users (instances with features):
We have log
events table which refers users
table using user_id
column. Events log
table has 250m
rows and the following structure:
CREATE TABLE log (dt DateTime, user_id UInt64, session_id UInt64, platform String, label String, url String)
ENGINE = MergeTree ORDER BY (dt, user_id, session_id)
It also worth mentioning that features tables are usually mutating a lot (data gets added, removed and updated) while events tables are historically static (data is only appeneded but not changed). To keep things simple for now, we have users
table in Clickhouse as well:
CREATE TABLE users
(`id` UInt64, `name` String, `category` String, `age` UInt8 )
ENGINE = MergeTree ORDER BY (category, id)
Star schema Join
Let’s try real world example — counting events from users of a certain category and age within specific period of time:
SELECT date(dt) AS day, count(*) FROM log l
JOIN users u ON u.id = l.user_id
WHERE
u.category = 'aa' AND u.age > 50 AND dt > NOW() - INTERVAL 1 YEAR
GROUP BY day ORDER BY day DESC LIMIT 5
While our log
table has 250 million rows, our users table has around 25 million rows. So previous query results in the following:
Doesn’t seem very fast, let’s check execution details using EXPLAIN
:
EXPLAIN indexes = 1, json = 1 SELECT date(dt) ...
This will bring large response, but the most important stuff is following:
This is because JOIN
s in Clickhouse don’t exactly work like we used to with other databases (separate article on the soon).
Using IN instead of Joins
In most cases we can easily switch to IN
instead of JOIN
. We should ensure nested table doesn’t return too much data, so Clickhouse doesn’t exceed available memory. E.g. in our case, filtering on users
table returns only 1k
someting of rows, which is cool:
SELECT count(*) FROM users WHERE (category = 'aa') AND (age > 50)┌─count()─┐
│ 1122 │
└─────────┘
Using IN
is easy in our case and will be much more efficient:
SELECT date(dt) AS day, count(*) FROM log
WHERE user_id IN (
SELECT id FROM users WHERE (category = 'aa') AND (age > 50)
) AND (dt > (NOW() - toIntervalYear(1)))
GROUP BY day ORDER BY day DESC LIMIT 5
Which brings in dramatically better results:
Although this can work in most cases we have to make sure IN
expression sometimes it doesn’t result in sufficient speed improvements. Denormalizing data can be considered in those cases.
Denormalizing data
Denormalizing data basically means creating single table from multiple joined tables. Let’s create and populate denormalized table in our case:
REATE TABLE log_users (
`dt` DateTime, `user_id` UInt64, `session_id` UInt64,
`platform` String, `label` String, `url` String,
`user_name` String, `user_category` String, `user_age` UInt8
)
ENGINE = MergeTree
ORDER BY (dt, user_id, session_id, user_category, platform, label)
As you see, we’re now able to use more sophisticated sorting key, so more querying cases are covered. Populating takes some time:
INSERT INTO log_users
SELECT l.*, u.name, u.category, u.age
FROM log l JOIN users u ON (l.user_id = u.id)
But now we can see our initial query performs much better:
Obviously, having everything in a single table, gives us powerfull optimization tools including better tuned sorting key and projections. But don’t forget that you have to pay with additional disk space for this solution.
Real time denormalization
Previous example is good if you have enough time to wait while denormalized table population occurs. But we actually can get real time denormalized data by using MATERIALIZED VIEW
solution:
CREATE MATERIALIZED VIEW log_users_rt (
`dt` DateTime, `user_id` UInt64, `session_id` UInt64,
`platform` String, `label` String, `url` String,
`user_name` String, `user_category` String, `user_age` UInt8
)
ENGINE = MergeTree ORDER BY (dt, user_id, session_id) AS
SELECT l.*, u.name AS user_name, u.category AS user_category,
u.age AS user_age
FROM default.log AS l INNER JOIN users AS u ON l.user_id = u.id
Here, we ask Clickhouse to do the following:
So Clickhouse will automatically join fresh portion of log
table data with users
table data and store enriched (denormalized) data into log_users_rt
materialized view.
Note that Clickhouse will now execute JOIN on users
table for each INSERT
query, so we might need to tune periods and packet size of insertions:
Insert time in our case is 4
seconds, so we have to execute inserts in larger batches and longer periods (e.g. each 10 seconds or so).
Using data from Mysql for feature tables
The cool thing, that queries with IN
can use MySQL engine tables, so you actually have Mysql manage features data (which it does pretty good, since we deal with updates/deletes on primary key in feature tables).
We just have to create MySQL
engine table:
REATE TABLE users_mysql
(`id` UInt64, `name` String, `category` String, `age` UInt8 )
ENGINE = MySQL('127.0.0.1:3306', 'db', 'users', 'usr', 'pwd')
We assume our Mysql server has users
table in db
database of the same structure. Now we can use it for our query:
SELECT date(dt) AS day, count(*) FROM log
WHERE (user_id IN (
SELECT id FROM users_mysql WHERE category = 'aa' AND age > 50
)) AND (dt > (NOW() - toIntervalYear(1)))
GROUP BY day ORDER BY day DESC LIMIT 5
Note, that nested query is executed on Mysql side, so creating following index (for Mysql users
table) should be used for our case:
CREATE INDEX c_a ON users(category, age)
We’d better not use JOINs here at all, because Clickhouse will first offload data from MySQL and only then — filter it (again, separate article on JOIN
is in plans already).
Summary
Using IN
for star-schema queries performs quite well in most cases in Clickhouse. Still, data denormalization can be considered to improve speed even further. Materialized views can be used to denormalize data on ingestion stage, so denormalized data is accessible in real time. But note, that this will bring in additional disk space usage and downgrade insert performance.