Flink-based Iceberg Real-Time Data Lake in SmartNews
(Part 2)

SmartNews
SmartNews, Inc
Published in
7 min readJun 12, 2023

Background

In the previous section, we described two different solutions and the advantages and
disadvantages of each. In this section, we will focus on how we solve the problem of small files generated by real-time writing.

Optimization of small files for real-time updates

Let’s first introduce Iceberg Sink’s Write Mode to facilitate understanding of the reasons for small file generation. Since there are cases where data is updated, we have chosen to use Iceberg sink’s Upsert Mode for file updates: each insertion generates two Records — Delete/Insert, which results in wasted storage space and CPU pressure on the downstream Flink writer.

Here we first solve the problem of Upsert writing multiple lines by introducing a Flink State, where the input is KeyBy according to the advertiser key:

  1. If it is found that the current primary key does not exist in the Flink State, i.e. the first time it is written, we will output a RowKind.INSERT record indicating that this is a brand new piece of data
  2. On the contrary, if the advertised primary key is found to already exist in the State, we will output two records to the downstream: Update before / Update after, where of course we will also have a more detailed check on whether the output is needed, such as whether there is a timestamp update, etc. The purpose of this is to further reduce the duplicate data that may be generated by the upstream Kafka

By manipulating the Row Kind in this way, some of the duplicate writes can be reduced, but this is not enough.

Using Flink State to Reduce Record Sent

In addition a large number of small files are generated via the Iceberg Stream Writer, this is based on the Writer writing principle of Flink Sink and is briefly described here first to facilitate understanding of the subsequent optimization options.

Iceberg Sink supports several different modes of distribution, which is how the data is passed to the downstream Writer operator

  1. Equality Field Key Selector — hash the Equality Field of Row Data
  2. Partition Key Selector — hash the Partition Field of Row Data

Equality Field Key Selector can be semantically understood as transferring Row Data to the downstream as primary key hash, which maximizes the write speed using the downstream Writer arithmetic, while Partition Key Selector, which can transfer Row Data with the same Partition Key Selector, which can output Row Data with the same Partition to the same Writer, ensures that the data of the same Partition are written out through the same Writer. Then the next Stream Writer is responsible for outputting all the received data to DFS, such as s3, where it is distinguished whether it is output to the same file or multiple files based on whether the table has Partition information.

In our use case, our data lake is physically partitioned according to Partition, that is, the data of the same hour will only exist under the same path, and the same data file cannot contain data of more than one Partition. Then the downstream Writer will output the file according to the information of Partition after receiving the data, and all Writers will send the written file statistics to the final Committer operator in the Checkpoint stage, and finally the Committer operator will submit all the changes to Iceberg.

Iceberg Stream Write in Flink

So let’s take an example to illustrate the difference between these two different Distribution models: assume that the Checkpoint interval is 20 minutes and 10 Writers are used to write the files.

When we use Equality Field Key Selector for output, we can see that the number of files generated by each each Partition is 90 (3*10 *3 ), 90 files per hour is the theoretical value of the generation rate, if a single data file exceeds the setting after the number of files may be larger, or it may be less than 90, because It is possible that there are no Delete files. It can be seen that according to the number of Records in the current Partition, it is a typical long-tailed data distribution, that is, the closer to the current hour, the larger the amount of data to be processed.

EqualityFieldKeySelector Output Example

When we use the Partition Key Selector for output, the closer the Partition is to the latest time, the more severe the Back Pressure is, which can cause delays in the whole Flink job.

PartitionKeySelector Output Example

The advantage of Equality Field Key Selector is high efficiency, but brings the problem that there are many small files, especially at the end of the long tail, which are small files of tens of kilobytes on average. The advantage of the Partition Key Selector is that the number of small files is small, but for Partitions with large amounts of data, it will cause a lot of back pressure.

So is there a more optimized solution to combine the advantages of these two Shuffles? Here we introduce a Dynamic Shuffle Operator, which can select different Key Selector according to different Partitions: i.e., for the nearest Partitions with huge data volume, choose to use Equality Field Key Selector, and for the long-tailed Partitions Partition Key Selector is chosen, which ensures the efficiency of writing out data while reducing the generation of small files at the end of the long tail.

Dynamic Shuffle Operator Output Example

In our solution, we introduce the Dynamic Shuffle Operator, which performs a physical Partition before feeding the data to the Writer.

The Partition’s policy is dynamically scheduled according to the statistical information of the data processed by the Shuffle Operator in the past:

  1. Firstly, by introducing the Coordinator to solve the message communication between different Shuffle Subtask
  2. Secondly, we need to make sure that different Subtasks follow the same Shuffle strategy when outputting files, because Iceberg needs the same primary key of Row Data in the same Writer output when processing Delete files, for example, we have an existing Insert and then an Update. If these two Row Data are Shuffled according to different Shuffle Strategy, it is likely that these two data will be Shuffled to different Writer operators, which will lead to duplicate data generation.
  3. In addition, the Shuffle Operator is also responsible for sending the processed statistics to the Coordinator, such as the number of each Partition processed. The purpose of this is that the Coordinator, after collecting the statistics from the Shuffle Operator, can dynamically determine the latest For example, when it is found that the latest Partition has already written 70% of the data, the Coordinator can ask Shuffle Operator to switch to the Partition key strategy, which can further reduce the number of files generated within one hour.
ShuffleCoordinator in Dynamic Shuffle Operator

Experiments Comparison

In the experiments here, we set the concurrency of Flink to 20 and compare the number of files generated per hour and the average size within 24 hours.

First we compare the number of new files per hour for the same Partition:

  • +1 means one hour behind the latest hour
  • No Shuffle means that Iceberg’s default Shuffle is used, i.e. Equality Field Key By
  • Dynamic Shuffle is the new Shuffle Strategy

As you can see not only in the latest few Partitions, Dynamic Shuffle writes less files, but also has better results in the long tail of Partitions. Generally speaking, after 1 hour the Dynamic Shuffle Operator will switch the Strategy of the Partition to Partition Key Selector, so the file growth rate of the current hour is basically constant.

The following graph also reflects this long-tail phenomenon: it can be seen that the peak of document generation is generally in the first hour, while the subsequent long-tail hours are basically fixed.

Dynamic Shuffle Operator Result
File Count Improvement

Similarly, for the average file size, the Dynamic Shuffle Operator performs better:

Since the metric taken here is the average file size, and a single Writer write may have a large data file, the size of the delete file is usually smaller because it contains only part of the primary key or location information, so the average size effect is more significant at the nearest hour.

File Size Improvement

Summary and Outlook

  • Compared to Spark + Iceberg v1, Flink’s real-time write solution reduces the cost of Infra by 50%.
  • Double counting and duplicate data files are greatly avoided
  • Real-time performance has been improved from hourly to minute-level
  • Dynamic Shuffle Operator can further allocate the Shuffle Strategy according to the rate of writing to the file

--

--