How We Established High Volume Log System Statistics

Mert Kanber
Insider Engineering
6 min readSep 15, 2020

The high volume of data today means that businesses now have the potential to better understand the behavior of their customers in the past and present, as well as perhaps in the future. Therefore, how and where these businesses store this continuously expanding data pool has become more important than ever. While enlarging the data pool, infrastructure should also provide consistent levels of performance.

We practitioners of the technological arts have a tendency to show the effect of our development with proof of concept. At Insider, we are dealing with gigabytes of daily data to utilize it in multiple ways. One of them is to demonstrate the usage and return on the investment of our products.

While storing high volumes of data, we also had to think about the performance itself. We are having 2+ years of data over more than 800 partners. This much data requires approximately 800M row processing on a single node and for that, we needed to build a system which has to store the data without using up too much resources, communicate with APIs and should handle several hundred of queries per second. Therefore, we chose ClickHouse as our data warehouse which processes hundreds of millions to more than a billion rows and tens of gigabytes of data per single server per second.

Query process time of millions of rows

ClickHouse is a column-oriented database management system. Scales well both horizontally and vertically. Since columnar storage format fits more data in RAM, it significantly reduces latency for most kinds of queries. This means that larger queries are being computed two or three times faster compared to row-oriented databases which are having the same I/O throughput and CPU capacity.

We used Amazon M5 Series instances to create our ClickHouse cluster with four nodes with multiple Apache ZooKeeper to store data and distribute it among other nodes. We have also placed our database instances in a VPC to make it more secure, cost-optimized and to reduce the latency.

ClickHouse nodes and ZooKeeper nodes inside of a VPC

Since we are dealing with stream like data from multiple sources and we aim to demonstrating real-time statistics, Amazon Kinesis Data Delivery Streams were the best choice for us which can continuously capture gigabytes of data per second. For that purpose, we created two Kinesis Data Firehose to store our logs into Amazon S3 to be processed by a Lambda. We put a limit to our delivery streams to create s3 objects when the conditions are met, that way we were able to control the load of write operations with ease.

General flow of our new data ingestion system.

Lambda was designed to consume S3 Objects to validate, transform and assign it to corresponding log group since we do not know which log is under which log group. After assigning logs to corresponding groups, we have built JSON files and attached it one after another for stream-like bulk data insertion to our ClickHouse database. At first, we thought that streaming directly into a single ClickHouse node would be enough while the APIs read the data from the remaining nodes. We have seen that this has increased the CPU usage drastically due to lack of resources. Instead of changing the instance type to increase the resource, we have started using an engine called Buffer. Buffer is a table engine which buffers data to write in RAM, and periodically flushes it to another table.

CPU Utilization before Buffer
CPU Utilization after Buffer

Buffer allowed us to optimize the load on the node by writing the data to RAM and also allowed us to flush the data from RAM by conditions. When the conditions are met, it writes data to its destination tables which are our main tables to store the data. Since data replication on the cluster is handled by ReplicatedMergeTree engine on table without depending on sharding, one destination from Buffer was enough to store replicas meta in Apache ZooKeeper. Apache ZooKeeper is essentially a service for distributed systems offering a hierarchical key-value storage for large distributed systems. ReplicatedMergeTree is a special table engine to store metadata of the inserted data in a Apache ZooKeeper and removes duplicate entries by their key-value mapping

To be able to serve the collected data, we have built an internal API using Akka Actors. This API was designed to fetch data from ClickHouse, do statistical computation and serve computed data to be used on our statistical dashboards. This API is also placed in the same VPC to reduce the cost, latency and to make it more secure.While lambda function was continuously handling the insertion to ClickHouse, this API was also doing insertion to understand which campaign corresponds to which goal, builder types for the given payload. This computation on the database was consuming too much resources and was increasing the response time of the API itself. We have realized that we can grab those information on the query run-time to reduce the load on the database and to reduce the latency. Instead of sending 3 request for 1 write and 2 read operations, we will be sending 1 request for all the need by using groupArray and arrayJoin run-time query functions

Sample query format to format the information.

This helped us to;

  • Reduce the API response time.
  • Reduce the CPU usage.
  • And code became cleaner.

While building a system, providing a stabile service is essential. However there may an unexpected issue occur. In our case, that issue was happening on the data ingestion part since API instances are automatically scales up if there are no healthy instances. For that, we have built an additional system to retry uninserted data to be inserted.We have been logging the information of each insertion to ClickHouse regarding fails and successes. For instance, if there is an insertion error due to connection failure or validation on the data, we are logging it to a separated MySQL database which holds S3 object path to be retried by a scheduled lambda. Therefore we manage to prevent data to be lost. Yet the retry system has a limit of 5 retries for lambda function to try to insert if there is a error.

General flow of retry mechanism

Our focus was no data loss. At first we tried to build our real-time system with the relational databases which takes up too much time for queries to be processed in traditional ways. We have realized that capabilities of the traditional ways cannot provide the desired output for the real-time statistical system. By analysing users’ needs and choosing the right technology, we have built a system which is flexible enough to provide any need without any down-time. It has been a couple of months since we are using ClickHouse now. We have learned that we need to think twice before building a system and we need to choose the right technology for our needs.

Want to be a part of dream team? Check out our career page!

--

--