Scaling ClickHouse: Achieve Faster Queries using Distributed Tables

Suffyan Asad
12 min readSep 29, 2024

--

Introduction

ClickHouse is an open-source Online Analytical Processing (OLAP) database system. It is renowned for its blazing speed, horizontal scaling, and ability to serve big data for time-series and other types of analytics. ClickHouse can scale horizontally by adding more nodes to the cluster. This approach distributes data and compute jobs across multiple nodes, enhancing overall performance.

Created using DALL.E

If you are not familiar with ClickHouse, or looking for a beginners’ guide, I recommend reading my previous article that covers the basics of ClickHouse, goes through its core concepts, and provides hands-on demonstration of its capabilities.

This article covers using Distributed tables in ClickHouse to enhance filtering and aggregation performance.

Distributed Engine — and distributed tables

Distributed engine is used to create distributed tables. Table created with Distributed engine is a virtual entity, and doesn’t hold any data. It is created on top of actual tables, that can have one of many engines provided by ClickHouse’. The Distributed engine enables distributed queries on the underling actual tables, called shards.

The mechanism to create distributed tables is:

  • Create tables on cluster nodes
  • Create the distributed table on top of those tables created on each node

The underlying tables are standard tables that contain the actual data. Distributed table is a virtual entity on top that enables distributed queries and data insertions.

To read more about the Distributed engine, please refer to the following link:

Following is the syntax of creating distributed tables:

Distributed table create statement syntax

Alternatively, if one of the underlying tables exist on the same host that distributed table is being created on, it is possible to use that schema instead of defining it, by using the AS clause:

Distributed table create statement alternate syntax

In the syntax, the following parameters are important:

  • table_name: The name of the distributed table
  • cluster: The cluster on which table is to be created
  • database: The name of the database to create the table in
  • sharding_key: It is an optional parameter. It is required when inserting data into the distributed table. When data is inserted into the distributed table, this sharding_key parameter value is used to route each row to the correct underlying table. Alternatively, it is possible to add data into the underlying tables directly. The sharding key needs to be numeric. Because is an expression, it can be any expression containing table columns that returns an integer.

Inserting data into Distributed table and its population in shards

There are two ways to insert data into the distributed table:

  • Insert data into the shards i.e. the underlying tables directly. This approach is the most flexible as it allows maintaining any sharding mechanism, and according to the documentation, it is also the most optimized, and allows complex sharding logic that can be implemented outside ClickHouse. This mechanism is also recommended for large volume of data.
  • Inserting data directly into the distributed table, and letting it route the rows to appropriate shards. Using this mechanism requires specifying the sharding_key parameter on the Distributed table. Each row is inserted into the appropriate shard based on the sharding key.

The two mechanisms can be depicted as the following diagram:

Data insertion into distributed tables

Each shard can have a weight, and default weight is 1. The weight determines the proportion of the rows sent to that shard, when they are inserted into the distributed table instead of direct insertion to the replicas. The mechanism works by evaluating the shading expression of each new row, and taking a remainder by dividing that value by the total weight of the shard. The remainder then determines which shard the data is inserted into.

If there are 3 shards, two with weight 2, and third with weight 5, the total weight is 2 + 2 + 5 = 9. The remainder is computed by dividing the output of sharding expression of the row by 9. Then the data with remainders 0 and 1 (it is 0 inclusive to 2 non-inclusive) go to the first shard, the data with values 2 and 3 (values 2 inclusive to 4 non-inclusive) go to the second shard, and the data with remainders 4 to 9 go to the third shard.

If all shards have a weight of 1, then remainder 0 go to the first shard, 1 go to the second shard and so on. Also, in this case the data is equally distributed. When a new shard is added, by setting its weight higher, its “acceptance range” of remainders can be increased so that it can get filled up quicker.

Note that if the shards are replicated, then the data is written to all replicas simultaneously by the distributed table, if written to the distributed table directly. According to the documentation, this is not efficient and leads to inconsistencies among replicas of the shards. Therefore, it is recommended to set internal_replication parameter to true for shards that belong to the ReplicatedMergeTree family. When internal_replication is set to true, data is written to the first healthy shard. The data is then internally distributed to the remaining shards.

For more details, refer to the documentation: https://clickhouse.com/docs/en/engines/table-engines/special/distributed#distributed-writing-data

Querying data from distributed table

The main advantage of using Distributed table is to spread the data, to improve query performance. Filters and aggregations are processed by multiple nodes to produce the final results quickly. The select queries are sent to all shards.

Hands-on — Insertion and Querying data in distributed table

Now, lets go through an example of insertion of data into ClickHouse distributed table, and querying it. Additionally, we’ll compare query performance compared to a table that has only one shard and exists on only one node. For that, we need a ClickHouse cluster. Lets create one, a 3 node 1 replica cluster using Docker Compose:

Create a test ClickHouse cluster

The ClickHouse cluster created here consists of 3 Docker container nodes, each having 4GB RAM, and uses 2 CPU cores. The laptop I am using has Apple M1 processor and 16GB RAM. If you are running on a different machine, you can adjust accordingly. First, clone to the code repository that I have created container the Docker file and the docker-compose file:

Make sure you have Docker and Docker compose installed. I prefer using Docker Desktop locally, as it has a nice UI as well. If you do not have Docker installed, install it first by visiting the following link:

Next, with Docker running, open a terminal, and cd into the repository, and run docker compose up:

Command to start a 3 node ClickHouse cluster using Docker compose

The cluster should start shortly:

ClickHouse cluster started

Next, connect to the cluster. The URL of the nodes is:

Node 1: localhost:8123

Node 2: localhost:8124

Node 3: localhost:8125

You can connect with all 3 nodes using the default user and no password.

I prefer using DBeaver Community Edition. It can connect to ClickHouse, and also to older versions using ClickHouse legacy driver:

Connecting to ClickHouse using DBeaver

DBeaver may download the ClickHouse driver when creating connection for the first time.

Connecting to ClickHouse
Connected to all three nodes successfully.

n addition to that, ClickHouse client, which is a command line tool, can also be used. Link:

This set-up has been explained in more detail in my article:

Downloading example data — TLC Trip Records data

I like using TLC Trip Records data, which is a dataset of Taxi trips in New York city. It can be downloaded from:

For this demonstration, I am using Yellow Trip Record files for year 2023. Download the Yellow Taxi Trip Record Parquet files for all months of 2023:

Yellow taxi trip records files

In the ClickHouse repository, there is a folder called data. This folder has been mounted to all 3 nodes at path /var/lib/clickhouse/user_files. In the data folder, there is a folder rides_2023. You can place all the files there, and they become available to all ClickHouse nodes.

Loading data into ClickHouse — table on a single node

First, create a database, and a table to hold raw data using the following commands. Execute them on node 1:

Creating the rides database, and the trips_raw table

The above commands that were just executed do the following:

  • Creates a database called rides on all 3 nodes. Note that any command executed with the ON CLUSTER clause is executed on all nodes of the cluster.
  • Creates a table called trips_raw on the node this command was run on, i.e. node 1.
Newly created database and table

The table uses the MergeTree engine, which is one of the table engines available in ClickHouse. You can refer to the documentation of MergeTree engine family, or my previous article to learn more about table engines available in ClickHouse.

Next, load the data into the table, this can be done using the following command:

Command to load data into trips_raw table

This command needs to be executed on the instance itself, so, connect to node 1. Open a new terminal and run:

Commands to connect to node 1 and start clickhouse client
ClickHouse client

ClickHouse client is already installed on all nodes, and connects to the locally running ClickHouse node. Now, run the command to load the data:

Data loaded into the table

We can run a count query on the table to verify that the data has been loaded:

38.31 million records loaded into trips_raw table.

Next, lets create a trips table, and load data into it. Run:

Create the trips table on Node 1, and load data into it

The commands to load data into the trips table have been split into 4 parts because, due to limited resources present in this test cluster, tying to load all the data at once results in out of memory errors. You can split the data further if these commands still give an error at your end, and use SELECT max(trip_id) FROM rides.trips t; to get the maximum trip_id value to put in the next insert query.

After loading the data, lets run a row count:

38.31 million rows loaded into trips table.

Note that the final number of rows is a little different compared to the raw data because there are some rows that have dates in years other than 2023, and they get filtered out.

Creating and loading data into the distributed table

Next, lets create a distributed table, called trips.rides_distributed_id. The trip identifier column, trip_id is the shading key in this table, and this case, it’ll be a numeric column as it is a requirement. Note that the sharding key is an expression, so if the column itself is not numeric, the sharding key expression can return a numeric result.

Also, as explained earlier, Distributed table in ClickHouse is a virtual entity over real tables on all nodes, and that the distributed table itself doesn’t hold any data. Therefore, a table called rides.trips_part_dist_by_id also needs to be created on all three nodes.

To do both, run the following commands:

Creating the distributed table

Because the CREATE TABLE query contains the ON CLUSTER clause, the table rides.trips_part_dist_by_id gets created on all three nodes. Additionally, we use the ON CLUSTER clause in the code to create distribution table, it gets created on all nodes. If this clause is omitted while creating the distributed table, the distributed table gets created only on the node the command runs on.

Next, load data into the distributed table:

Insert into distributed table

In the insert statements, the data gets inserted directly into the distributed table, and gets routed to the appropriate partition based on the trip_id column value.

After execution of the commands, the following tables are present on each node:

Final set of tables on all nodes

Query performance comparison

Let’s run a query that computes average tip distance, fare amount, and tip amount for each pickup location ID, and orders by highest average tip amount in descending order. The query is:

Test query

If the query is run on ClickHouse client, I get the following results on my system:

Query results.

This query can be executed using a client such as DBeaver, but ClickHouse client provides more information, including a more precise execution time, and number of rows touched etc. On my computer, it took 0.98 seconds.

Lets run it for 10 times. The execution times are:

Query execution times on single table.

Most of the times the query took approximately 0.75 seconds, and average time is 0.8086 seconds overall. Next, lets run the same query on the distributed table:

Test query on distributed table rides.trips_distributed_id
Execution time on the distributed table

On the distributed table, the query took 0.676 seconds on the first attempt. Running it for 10 times. The average time taken by running the same query on the distributed table is 0.4839 seconds.

Here is the comparison:

Execution time comparison

The advantage of using a distributed table can be clearly seen here, as the data is distributed, the query is also distributed, and the results are generated faster. In this example, on my computer, the query on distributed table runs 40.16% faster.

This is a massive difference. In real world, there would be additional time taken by sending data over the network to combine the overall result, but the tables and possibly the ClickHouse cluster would be bigger as well, do the difference could be significant.

Lets compare the query plans of both queries:

SQL to get query plan
Query plan of both queries

The difference here is the ReadFromRemote step, which indicates that the parts of the query were executed on other nodes. Overall, this division of work leads to faster execution.

Note: A nice article and video about query plans in ClickHouse can be found here:

Next, lets examine another example, which involves a filter in addition to grouping and aggregates. The query is:

Query to get data about trips originating from PULocationID = 237

The execution time comparison is:

Query execution time comparison

This query takes and average of 0.7527 seconds, whereas the same query on the distributed table takes 0.4713 seconds on average. The query on the distributed table is 37.39% faster. This again demonstrates that advantage of using distributed table.

Although distributed tables add complexity, they significantly improve query processing by spreading both data and execution across multiple nodes. This horizontal scaling approach is a valuable strategy for ClickHouse, enabling it to handle large datasets and process queries more efficiently. The performance gains achieved through this method make it a worthwhile investment for organizations dealing with substantial data volumes.

Conclusion and Key Takeaways

In conclusion, distributed tables in ClickHouse offer significant performance gains for large-scale data processing. By spreading data and query execution across multiple nodes, ClickHouse achieves faster query times by distributing the workload. Designed to handle massive datasets, ClickHouse offers tools like distributed tables to enhance query performance and improve scalability.

--

--

Suffyan Asad

Data Engineer | Passionate about data processing at scale | Fulbright and George Washington University alum | https://pk.linkedin.com/in/suffyan-asad-421711126