ClickHouse + Kafka = ❤

Make distributed queries, not war.

Let me tell you why you’re here. Chances are you need to build a tool that needs to collect data across different sources and build some analytics based on that. In case I’m not 100% right just go ahead and you’ll definitely have some thoughts to enrich your current project with a rich analytics tool.

In fact, this is a basic tutorial of how you can leverage ClickHouse and Kafka combination. Let’s begin.

ClickHouse at your disposal

You might have heard about ClickHouse and Kafka separately. Let’s go a bit deeper and find out what they can do together.

What is ClickHouse? ClickHouse is a column based database system that allows you to solve analytics tasks.

Building analytical reports requires lots of data and its aggregation. Clickhouse is tailored to the high insert throughput because it’s a part of OLAP group (which stands for online analytical processing). It’s not about many single inserts and constant updates and deletes. ClickHouse allows and even requires big-payload inserts, like millions of rows at a time. But it merely handles updates and deletions.

ClickHouse: Too Good To Be True is the great example of how ClickHouse helps to build effective solutions.

When your team meets the requirements to implement an analytics service you probably need to have a common interface for different data sources to make your life easier and also to set the standards for other teams. There are lots of solutions on the market. You can even build your own API or a custom data bus. But remember that requirement for the high throughput? Kafka can easily manage that.

Meet Kafka

Kafka has become one of the best tools for the industry, because it allows you to have a high throughput, message rewinding, it’s fast, it preserves records order and provides high accuracy. More than that, it’s a fault-tolerant tool and it scales well.

Kafka allows you to decouple different subsystems (microservices), meaning to reduce all-to-all connections. So it acts as a common data bus.

Kafka basically consists on the well-synced combination of Producers, Consumers and a Broker (the middleman).

As you might thought producers produce messages to brokers’ topics, brokers store that messages and feed them to the consumers.

Consumers in its order subscribe to topics and starts to consume data. Kafka can store messages as long as you wish, as long as your memory capacity gives you an ability for it and as long as regulatory rules allows you.

Broker includes many topics (like message queues or tables) to store different types of domain objects or messages — money transactional data, payments, operations, users profile changes and anything you can imagine.

Among producers and consumers there is one more element called Streams, which we won’t cover in this article. But in short this is a functionality that’s able to get data from producers, analyse and apply some functions (aggregations, etc.) in real time and feed to consumers.

But what’s main argument for the Kafka? Of course, ClickHouse support. ClickHouse has Kafka engine that facilitates adopting Kafka to the analytics ecosystem.

Building minimalistic payment analytical system

Domain

So let’s make up a contrived example. Let’s have a payment model that looks like that:

Payment
(
id # => primary key
cents # => number of cents that payment holds
status # => boolean - can be (cancelled, completed)
created_at # => creation timestamp
payment_method # => some string holding values like Paypal, Braintree, etc.
version # => timestamp holding a datetime when a record was pushed to Kafka
)

Kafka Engine

The journey starts with a defining a table in clickhouse with Clickhouse Kafka Engine. Assuming you have a set up Kafka cluster (which is a whole different article, and we’re using single-node setup in this article), you should be able to specify all needed information to connect specific Kafka topic to your table. This table is not a target destination for your data, because this kind of table can be considered as a data stream and it allows you to read data only once. In addition, it does not allow you to make background aggregations (more about that later).

There’s how we create such data stream which we’ll name after domain models with a _queue postfix:

CREATE TABLE IF NOT EXISTS payments_queue
(
id UInt64,
status String,
cents Int64,
created_at DateTime,
payment_method String,
version UInt64
)
ENGINE=Kafka('localhost:9292', 'payments_topic', 'payments_group1, 'JSONEachRow');

where 1st argument of the Kafka Engine is the broker, 2nd is the Kafka topic, 3rd is the consumer group (used to omit duplications, ’cause the offset is the same within the same consumer group), and the last argument is the message format. E.g. JSONEachRow means that the data is represented by separated rows with a valid JSON value divided by a newline, but the entire data chunk is not valid JSON.

So, assuming you have a single chance to read a data from this adapter (Kafka table in ClickHouse) you need to have a mechanism to point this data to the places where it can be stored permanently (to some extent).

Permanent storage and consumers

Usually there can be multiple storages (or tables) where we want to finally store some data.

The first type of storage is a mirror of the incoming data that doesn’t apply any aggregations of functions and just stores data as is. We could skip some columns or cut some values but in general such tables can be considered as a row data. You may wonder why we need to keep such kind of data in ClickHouse. Well, we want to have a solid non-processed data collection which we can use to, for instance, compare if there any errors in the way how the data is aggregated, or we can safely compare incoming and stored data by count to verify we’ve got all the data that was sent.

And the second type of data is aggregation. But the truth is you would merely feel happy with the single aggregation table since there are usually multiple analytical reports over that same domain model and that reports have differences — some of them need more data, some of them need a bit transformed data and so on.

So, this is where ClickHouse materialized views comes in.

Materialized views in ClickHouse is not what usually materialized view means in different database systems. Simply it can be explained as insert trigger.

Data flow schema

Why can’t we just directly write to table? Technically we can. But remember that previous section where we’ve discussed that we need multiple tables for the same domain model (mirroring table, and tables for different reports or aggregations).

Let’s build our mirror and aggregation table first and then we’ll connect Kafka table with our final-destination table via the materialized view.

Building destination tables

We’re gonna start with a mirroring table which we can name simply as payments.

We can create it with the following SQL:

CREATE TABLE your_db.payments
(
id UInt64,
status String
cents Int64,
created_at DateTime,
payment_method String,
version UInt64
)
Engine=ReplacingMergeTree()
ORDER BY (id, payment_method, status)
PARTITION BY (toStartOfDay(toDate(created_at)), status);

Let’s quickly discuss those ENGINE, ORDER BY and PARTITION BY parameters. ClickHouse unlike traditional DBMS has an extended functionality that allows us to do some background data manipulations. For instance, while using ReplacingMergeTree table engine we can store only relevant records by replacing old records by the new ones. ClickHouse compares records by the fields listed in ORDER BY and in case it founds similar records it replaces a record with a greater version value. So, the version is a number that’s basically means date of creation (or updating) so the newer records have greater version value.

As to PARTITION BY it’s a parameter that’s used to apply partitioning to the data. In our case we apply partitioning and want to keep our data by day and status. After that we can safely detach, remove entire partition, effectively filter data and do lots of different things but that’s a whole different topic. In addition, there’s suggestion from ClickHouse that says that for an effective performance you need to hold number of partitions in a hundreds , not bigger values.

Heading to the next step, we’re gonna create aggregation table which we can name as completed_payments_sum. In this table we’ll hold payment aggregations that will contain only completed payments. It will be reflected in a materialized view later.

We create it with the following SQL:

CREATE TABLE your_db.completed_payments_sum
(
cents Int64,
payment_method String,
created_at Date
)
ENGINE = SummingMergeTree()
ORDER BY (payment_method, created_at)
PARTITION BY (toStartOfMonth(created_at));

Here we see a different engine. This is actually an aggregating engine that takes all the data and apply summing to it. So assuming we have 10 records for the date and with the same payment_method. Eventually, we’ll have single record with the sum of cents and the same created_at and payment_method.

Building consumers

So it’s time to connect our Kafka Engine table to the destination tables. As I said, we’re gonna do that with materialized views.

So the most easy example is to create materialized view for the mirroring payments table.

CREATE MATERIALIZED VIEW your_db.payments_consumer
TO your_db.payments
AS SELECT *
FROM your_db.payments_queue;

As straightforward as possible. We basically get all data from payments_consumer as is and send it to the payments_table.

Let’s consider more sophisticated example — attaching a consumer to the completed_payments_sum table. Foreword: we need to select only completed payments, so we don’t need to keep status in our aggregating table, ’cause we hold completed payments only.

We create it with the following SQL:

CREATE MATERIALIZED VIEW your_db.completed_payments_consumer
TO your_db.completed_payments_sum
AS SELECT cents, payment_method, toDate(created_at)
FROM your_db.payments_queue
WHERE status = 'completed';

Viola. After some time we’ll have aggregated values in the daily perspective for each payment_method. So, for example we’ll instantly know what is the sum of cents for the 2021–05–21 and the Paypal payment method:

SELECT cents
FROM your_db.completed_payments_sum
WHERE created_at = '2021-05-21' AND payment_method = 'Paypal';

Look at how we don’t use sum(cents) here, ’cause cents is already an aggregated sum value in the completed_payments_sum table.

Lets’s gather all SQL code for a complete picture here:

# creating the queue to connect to Kafka (data stream)CREATE TABLE IF NOT EXISTS payments_queue
(
id UInt64,
status String,
cents Int64,
created_at DateTime,
payment_method String,
version UInt64
)
ENGINE=Kafka('localhost:9292', 'payments_topic', 'payments_group1, 'JSONEachRow');

# creating the mirroring payments table that holds raw data
CREATE TABLE your_db.payments
(
id UInt64,
status String
cents Int64,
created_at DateTime,
payment_method String,
version UInt64
)
Engine=ReplacingMergeTree()
ORDER BY (id, payment_method, status)
PARTITION BY (toStartOfDay(toDate(created_at)), status);

# creating the aggregating table for the completed payments
CREATE TABLE your_db.completed_payments_sum
(
cents Int64,
payment_method String,
created_at Date
)
ENGINE = SummingMergeTree()
ORDER BY (payment_method, created_at)
PARTITION BY (toStartOfMonth(created_at));

# creating the consumer for the mirroring payments table
CREATE MATERIALIZED VIEW your_db.payments_consumer
TO your_db.payments
AS SELECT *
FROM your_db.payments_queue;
# creating the consumer for the aggregating payments tableCREATE MATERIALIZED VIEW your_db.completed_payments_consumer
TO your_db.completed_payments_sum
AS SELECT cents, payment_method, toDate(created_at)
FROM your_db.payments_queue
WHERE status = 'completed';

That’s basically it. Now you can query your destination tables (payments and completed payments sum) and build some analytics based on that. You can create all kinds of tables, collect needed data and build reports based on the info above. Some things that we’ve not covered here such as multi-node (clustered) implementation, replication, sharding — can be written in separate articles. Left your comments about what you’d like to read next!

Thanks for reading this article! See you!