ClickHouse — fast, deduplicated reads

Using ClickHouse partitions for supercharged query performance

Jesse Grodman
7 min readNov 18, 2024

Contents

  1. Who is this article for
  2. Background
  3. ClickHouse is fast, but deduplication is eventual
  4. Partitions to the rescue
  5. Preventing part proliferation
  6. Reducing CPU usage of parallelized ‘final’
  7. Bonus: Realtime data
  8. How we partition
  9. Summary

Who is this article for

This article is for anyone using ClickHouse who wants to achieve performant queries with immediate deduplication, rather than the eventual deduplication semantics naturally provided. We will accomplish this without the creation of many parts, and while keeping CPU utilization low.

Background

I am a software engineer at Triple Whale. We use ClickHouse as our production database: we ingest data from a large number of channels, and make it available via a web application. We use ClickHouse because it is extremely performant and open source.

Our goal is simple: fast, deduplicated reads

ClickHouse is fast, but deduplication is eventual

From the ClickHouse docs (emphasis mine):

Deduplication refers to the process of removing duplicate rows of a dataset. In an OLTP database, this is done easily because each row has a unique primary key-but at the cost of slower inserts. Every inserted row needs to first be searched for and, if found, needs to be replaced.

ClickHouse is built for speed when it comes to data insertion. The storage files are immutable and ClickHouse does not check for an existing primary key before inserting a row-so deduplication involves a bit more effort. This also means that deduplication is not immediate-it is eventual, which has a few side effects:

- At any moment in time your table can still have duplicates (rows with the same sorting key)

- The actual removal of duplicate rows occurs during the merging of parts

- Your queries need to allow for the possibility of duplicates

Three rows in a ReplacingMergeTree with the same row_id being deduplicated by an offline merge operation. Source: https://altinity.com/blog/clickhouse-replacingmergetree-explained-the-good-the-bad-and-the-ugly

ClickHouse uses eventual deduplication semantics; this means that at any point in time, the table (likely) has duplicates. This makes inserts performant, as explained in the docs quoted above — but it leaves queries with the task of handling or tolerating duplication. An initial solution is to use a table engine that supports deduplication (ReplacingMergeTree, CollapsingMergeTree, VersionedCollapsingMergeTree) in conjunction with the query modifier final. Some queries can avoid final with special query logic, but this doesn’t cover all cases. This accomplishes deduplication, but final degrades query performance when operating on a lot of data.

Partitions to the rescue

These articles by ClickHouse and Altinity demonstrate the performance cost associated with using final — and how partitions can solve that. By using partitions with do_not_merge_across_partitions_select_final = 1, we parallelize the deduplication. Partitions can also help with group by performance.

By using a high-cardinality partition key, we mitigate the performance cost of using final because it is performed in parallel across all partitions. We have now solved our goal: fast, deduplicated queries. However, we have introduced two new issues:

A) Using a high-cardinality partition key results in the creation of many parts. This is because inserted records which would previously have been inserted into the same part are now inserted into separate parts, since they are now in different partitions.

B) Parallelizing final across partitions increases CPU usage. The reason partitions make final faster is because it is parallelized, which increases CPU usage.

Preventing part proliferation

Having too many parts is the #1 deadly sin listed by ClickHouse in their Getting Started blog post. It makes queries slower, increases startup time, and increases merge pressure. To solve for this, we use two strategies:

A) async inserts. We use the async_insert ClickHouse setting, and also batch on the client side.

B) ingest tables: for each partitioned table, we have an associated table with the same schema — but which is partitioned by ingest time, in 15 minute buckets. This is similar to the buffer table solution suggested in the getting started blog post linked above. The partitioning of the ingest tables looks like this:

partition by (toYYYYMMDD(updated_at), toHour(updated_at), intDiv(toMinute(updated_at), 15))

All inserts are performed to the ingest table, and every 15 minutes we insert one partition from the ingest table into the actual table - in order of the partition key. By performing the insert in order of the partition key, we ensure that records that were inserted in the last 15 minutes and belong to the same partition are inserted into the same part. We are trading some additional ingest latency for fewer parts. We orchestrate this with Temporal.

insert into my_table
select * from my_ingest_table
where _partition_id=<x>
order by <partition_key>

alter table my_ingest_table drop partition <partition_id>

We have accomplished our goal (fast, deduplicated reads), and solved issue A: too many parts — in practice, most of our partitions have a single active part. We are left with issue B: high CPU usage, caused by parallelizing final across partitions.

Reducing CPU usage of parallelized ‘final’

To reduce the CPU usage of our queries which are using final with do_not_merge_across_partitions_select_final across a potentially large number of partitions, we utilize an aspect of parts called levels. Each part has a level; 0 means that the part has never been merged, and the level is incremented each time the part is merged.

Our technique: for each partition, we read only the active part with the highest level. This means that if data has been inserted to the table, but has not been “bubbled up” to the highest part in that partition, it will not be “visible” to our app.

We are making the same tradeoff we made before: introducing a bit more ingest latency, this time in exchange for lower CPU usage (see below for the actual query).

This is not a traditional usage of parts or levels — but it works! (The default maximum part size is 150 GB, which we have not reached. If we do, we could increase this limit or reduce the granularity of our partitions)

Additionally, we want to guarantee that ingested data will be visible (i.e. in the highest-level part in its partition) within a certain amount of time. ClickHouse provides no guarantee as to when merges will happen.

We experimented with min_age_to_force_merge_seconds, but found it caused more merges than we expected, including many with only one part as input to the merge.

Instead, every fixed number of minutes, we query for a list of partitions in each table with more than one active part, and run optimize table ${table} partition id ‘${partition_id}’ final. We orchestrate this with Temporal.

We now reap an additional benefit of partitions: we only need to merge data up to the highest-level part in its partition, rather than the entire table. There is less merge pressure, and our data becomes visible (i.e. in the highest-level part in its partition) sooner.

We have now solved our goal (fast, deduplicated queries) while addressing issues A (too many parts) and B (high CPU usage).

Bonus: Realtime data

Earlier we introduced ingest latency to solve our two problems:

  • At insert time we introduced ingest latency using ingest tables, to reduce the number of created parts
  • At query time we introduced ingest latency using part levels, to avoid the expensive compute of final

For the time series data we ingest, we want the data to be available to users in realtime. To achieve this, we bypass these two steps for any record we ingest which is in the most recent 3 days of data — i.e. where event_date >= today() — 2

1. These records bypass the ingest table, and go straight to the actual table. We pay for this reduction in ingest latency with some extra parts (but not many, as only 3 extra partitions are getting inserts)

2. At query time, we use the final modifier on these 3 partitions, rather than the level filter. Logically, querying a table now looks like this:

Thus,select * from my_db.my_table becomes:

SELECT *
FROM my_db.my_table FINAL
WHERE date_field >= today() - 2
UNION ALL
SELECT *
FROM my_db.my_table
WHERE date_field < today() - 2
and (
_part IN (
SELECT name
FROM (
SELECT partition,
name,
level,
row_number() OVER (
PARTITION BY partition
ORDER BY level DESC
) AS rn
FROM system.parts
WHERE active
AND (table = 'my_table')
AND (database = 'my_db')
)
WHERE rn = 1
)
)

How we partition

For small tables, we do not partition. We use ReplacingMergeTree with the final query modifier.

For large tables:

  • For time series tables, we partition by event_date
  • For non time series tables, we partition by a hash of the customer id, mod 1000: sipHash64(customer_id) % 1000

We choose the partition based on three factors:

  1. The insert pattern. If records of similar dates are generally inserted together, we use event_date. If records belonging to a specific customer are generally inserted together, we use the hash of customer_id
  2. The query pattern. Whichever we generally filter by (event_date or customer_id) is a better choice for partition key — if we filter by both, then either is okay
  3. Partitioning by event_date has an added benefit, which is that we can get realtime data by bypassing the ingest latency we added, as discussed above

Summary

This article describes how we at Triple Whale maintained ClickHouse’s query speed while achieving fully deduplicated query semantics — all while minimizing the creation of excessive parts and CPU usage.

Thank you to Matt Elkherj and Cheskel Twersky for feedback on this article

--

--

Jesse Grodman
Jesse Grodman

Written by Jesse Grodman

Software Engineer @ Triple Whale

Responses (2)