Solution to exactly once insertion from Flink to ClickHouse

SmartNews
SmartNews, Inc
Published in
8 min readNov 4, 2022

Background

SmartNews is a leading global information and news discovery company. Unlike ordinary news apps, SmartNews utilizes its solid data platform and various toolings to help users extend their horizon in real time.

In this blog, we will go over the high level architecture of SmartNews real time processing pipeline and introduce how we handle one of the most difficult problems in the real-time streaming world: exactly once delivery.

Introduction to SmartNews backend

There are 3 major pillars of SmartNews data platform:

Storage

SmartNews data platform hosted hot pools to achieve extremely low E2E latency for time sensitive functions. In addition, a cold pool is used to host data with a very low access rate and is not time sensitive. Both pools are having replicas for data durability.

Query

We mainly use our query engines for data read to help isolate impact caused by different data processing components.

Transformation

For different use cases, different tools are used. For example, we are using ClickHouse for analytics, Spark for batch processing, Flink for real time data processing.

Technical background and solutions

One of our use cases is: deliver reports to users in real time. To achieve low latency and data consistency, we used Flink for complicated data manipulation, and ClickHouse as the destination of Flink. ClickHouse has decent performance for “wide” tables and can scan billions of records within a few seconds or even less.

Flink cluster

As shown in the figure, the cluster is in a specific availability zone (a.k.a AZ), and it has 3 corresponding ZooKeeper to help achieve high availability. Scheduler and Scaler, as well as a series of scheduling systems from YARN helps us dynamically schedule Flink jobs as needed. On the right is a cross-AZ section. This section only has RDS to store Metadata, and another object store to store checkpoints and savepoints.

Since cross AZ traffic is very expensive, all jobs’ containers are scheduled in the same AZ for cost efficiency. Since metadatas are hosted across AZ, in the case of single AZ down, we can use this information to respawn the job in a different AZ. This way, we used the least cost while still maintaining high availability.

Flink jobs are deployed via a per-job cluster fashion. This way single jobs memory leak or CPU leak will not impact other jobs. Each job deployment will start with clean containers to clean up temporary files to prevent disk leak as well.

ClickHouse cluster

The architecture of ClickHouse is relatively similar. In the kubernetes(a.k.a K8S) cluster, we installed a ClickHouse operator, which is responsible for maintaining clusters. As shown in the graph, I displayed 2 clusters to help understand the operator’s job. Both clusters are 2 (shards)×2 (replicas). Then in front of each machine, we will put a K8S service to help advanced users access specific hosts/pods as necessary.

For users who don’t want to access these hosts directly, we also provide a LoadBalancer, to take advantage of ClickHouse distributed table engine. Cluster updates and deletes are as easy as a single click with the help of ClickHouse operator.

ClickHouse insertion principle

ClickHouse provides various table engines for different purposes. MergeTree and Distributed engines are the most commonly used in our case.

MergeTree engine
It is the table engine used to store data on disk. It also automatically merges small data blocks together according to specific rules. Data after merging saves disk and also helps speed up your query. After merging, indexes based on data parts are also reduced. This is another reason why ClickHouse is so fast.

MergeTree supports atomic insertion natively(this means ClickHouse will not introduce duplication as long as upstream is unique), but the conditions for atomic insertion are very harsh: the data part you write needs to be unique. Data part is nothing special but a group of data records. They are ordered and put into one file on disk. As mentioned above, ClickHouse’s index is based on data parts and native deduplication is based on these data parts.

When the data belongs to multiple partitions, obviously it will be in different data parts. When the block size is larger than the part’s maximum limits, it will also be divided into multiple parts. You can increase this limit by trading performance.

However, if a materialized view(a.k.a MV) is attached to the source table, there’s no guarantee of atomicity at all. To simplify, people can think of a materialized view as a simple data processing engine, it does not contain anything particularly complicated but a select statement that writes data to destination tables. The most important thing about MV is, it is triggered by insertion.

As shown in the figure below, MV and local(MergeTree) table insertion run in parallel, which means the performance is decent while atomicity can’t be maintained. If the client disconnected while MV processed row 8 while the local table only processed row 4, ClickHouse doesn’t know how to recover or what to roll back in this case. Especially in most cases MV select clause contains aggregation function.

Besides, multiple MVs can be attached to local tables or MV depends on each other, it might also be very expensive to rollback. Yet we haven’t talked about data replication, which will make atomicity even hard to achieve in this case.

Distributed engine

As mentioned above, using the MergeTree local table can only insert data to a specific host. ClickHouse provides a Distributed table engine. It contains metadata of underlying MergeTree tables like which host this table is on and how to distribute data to a specific host(sharding key) and etc. But it doesn’t know how to recover if data is written partially, even if somehow we managed to make local table insertion atomic. The major advantage of Distributed engine is: you don’t have to deal with data distribution on the client side and you don’t need to aggregate data from different hosts of the cluster. But it also means there’s no easy way to achieve atomicity, only rely on ClickHouse server side mechanism while using Distributed engine.

The principle of Flink exactly once sink

Flink claims that it supports end-to-end exactly once. To understand the principle, let’s assume there are only sinks. Flink will first pre-commit data to the external system. After all 4 operators confirmed their data was in the external system, it will trigger a “real” commit to tell the external system the transaction is safe to close. External system will accept this request and tell the sink transaction is closed. Sinks will start to write the next “batch” of data.

Exactly once insertion from Flink to ClickHouse

The inspiration of Flink exactly once sink is to maintain the state of data insertion in an external system. Naturally, we can create a temporary table in ClickHouse to maintain pre-committed data as show in below image:

  1. Check whether the current data has been inserted in the ClickHouse target table.
    a. If it is inserted, just exit as not necessary to insert again;
    b. If it is no inserted, go to the second step
  2. Check whether a corresponding temporary table has been created.
    a. If not, we will create the corresponding temporary table, and then write the data into the temporary table. This is a bit similar to the pre-commit stage we just mentioned.
    b. If already created, drop the temporary table and recreate it to have a clean start to prevent data duplication. Yes, this means we might waste some resources, but we prefer to trade data accuracy with these resources.
  3. “Commit” the data from the temporary table to the target table. In this step, data commit happens only on ClickHouse side. The commit actually means insert into the local target table from the temporary local table. Since this only happens within a host, it eliminates the impact from network jitter.
  4. Clean up the temporary table.

Each of these four steps can fail. But since the workflow starts with checking data existence from the target table, it is safe to retry. The workflow is a derivation of two-phase commit, helping us achieve an end-to-end, Flink-to-ClickHouse exactly-once solution.

Summary and outlook

Let’s briefly review and summarize the above solution. We talked about the principle of Flink’s exactly one-time insertion, focusing on the principle of Flink’s two-phase commit, and we also talked in depth about many concepts of ClickHouse, such as what is a Distributed engine, what is a MergeTree engine, why atomic insertion is hard to achieve, and problems caused by multi-host insertion and exception.

The most important thing is how do we solve the problem of exactly-once insertion from Flink to ClickHouse. In essence, we use a temporary table to achieve transaction management. This solution trades a little bit of performance and storage overhead with atomicity. This actually leads us to think about whether it is really necessary to achieve exactly-once semantics in real time. Because the ultimate goal is to provide precise results, but data delay will also lead to confusion to users, to some extent data delay also means data quality issue. Thus it means users need to accept some data quality issues if they want data in real time. They can always retrieve accurate data in a batch pipeline though. As people always say, there is no perfect solution but the most suitable solution. Programming is never magic, as long as understanding the principle, a suitable solution will come up.

--

--