Apache Iceberg in SmartNews

SmartNews
SmartNews, Inc
Published in
7 min readOct 11, 2022

Project Background

SmartNews puts great emphasis on data security, privacy, governance and compliance, in addition to access rights measures to ensure data is only granted on a strictly need-to-know basis. SmartNews follows all data privacy laws and regulations such as GDPR, CCPA. One of the key challenges for the data platform was to meet its To Be Forgotten requirement, which requires the company to delete all user information, both online and offline, within a specified period of time after a user requests deletion. The original Hive table-based offline data required a costly rewrite of the entire partition if any data needed to be deleted. As more and more data is accumulated, faster and faster, this part of the cost becomes more and more prominent.

One of the tables involved was the user behavior table, one of the most basic and widely used tables across the company. The table is partitioned on a daily basis and no longer meets user requirements for data timeliness. Of course it is not technically difficult to increase the granularity of the table to the hourly level, but because it is partitioned by day and action (dt/action), and there are hundreds of actions per day, i.e. hundreds of partitions are submitted to the Hive Metastore every day. If the granularity of the partition is increased to the hourly level, the number of partitions submitted per day will increase by 24 times, and the number of partitions submitted to hive reaches tens of thousands, putting a lot of pressure on the Hive Metastore, so this was not done before.

To solve the problem of high cost of data deletion and low data timeliness, the team redesigned and re-implemented the corresponding pipeline for this table.

How We Choose

Data deletion is one of the highlights of the data lake technology, with file-level (rather than Hive’s directory-level) tracking and statistics that allow it to skip over irrelevant files and significantly reduce IO. In our data deletion scenario, the percentage of data being deleted each time is very small, so file-level pruning can be a huge efficiency boost.

The data lake does not require partition information to be stored in the Hive Metastore, but rather in the metadata files, so data lake technology can also be a good solution to the HMS bloat problem.

The key selection is therefore not whether to go on a data lake or not, but to focus on which data lake solution to choose.

The main data lake technologies are Apache Iceberg, Apache Hudi and Delta Lake, and after evaluation we finally chose Iceberg for the following reasons.

  • Delta Lake was not examined in depth as it was not completely open source.
  • The company’s tables do not have Primary Keys, whereas Hudi requires tables to have PKs, while Iceberg does not.
  • Iceberg has a unique id associated with each column, not by name, whereas Hudi identifies different columns by name, so re-enabling a column with the same name could lead to data anomalies.

How We Implement

The whole implementation is mainly done by 3 Flink jobs, each with the following functions:

Flink Job1 is responsible for partitioning the raw log files into three levels by dt/hour/action and outputting the intermediate files in avro format.

This job uses S3’s MPU (Multi Part Upload) mechanism to achieve high frequency checkpoint while avoid small file problems. The output avro is a customized (remove header, block related information, only keep the data itself) row format, and supports simple merging of file parts. The job also improves StreamingFileWriter to emit partition open and close events, and produces a partition completion signal file (SUCCESS file) based on the idle timeout threshold for use by Flink Job2.

Flink Job2 is responsible for converting the avro output of the previous job into a standard parquet format and submitting it to iceberg, generating a final SUCCESS file based on the SUCCESS file of the upstream job, indicating the completion of a daily or hourly partition to trigger the processing of the batch system (Airflow, etc.). As the events received by this job are bursty, the traffic is very unstable and the computation is relatively heavy, so the stateless file format transformation is shifted to Knative, which has better scalability, to avoid the Flink job taking up a lot of resources and idling, thus significantly improving the computation efficiency (~5 times). Flink completes the Iceberg submission.

Flink Job3 is responsible for synchronizing the state of the Iceberg tables to another Hive Metastore (HMS2) in real time. There are 2 Hive Metastore in the company, both need to access the iceberg table, so both sides register the Iceberg table, Flink Job2 only updates HMS1, while the job listens to the iceberg table new metadata file generation event, and is responsible for updating the metadata of the table in HMS2 (e.g. metadata_location, numFiles, numRows, totalSize) in HMS2, enabling the synchronous update of two HMSs in one Iceberg table.

All three Flink Jobs are connected by Kafka. When the upstream S3 file is generated, it is synchronized to Kafka through the s3 event notification mechanism, which in turn connects the three Jobs.

Data is mainly accessed externally via Trino, Spark and Hive, while table maintenance is done in Spark; maintenance includes expire_snapshots, remove_orphan_files, and occasional data repair requests.

Challenges

During the implementation of the project, the following key challenges were encountered.

1. No support for multiple Hive metastore accesses for a single table

Iceberg does not support registering the same table in two HMSs and keeping them in sync. We managed to update the metadata_location in the HMS table properties to keep them in sync, which was a tricky solution.

2. Ecological Maturity

The company widely uses presto/hive/spark to access offline tables, but these engines have different levels of completion of support for iceberg.

The company used presto 343 which does not support iceberg v2 format, even then the latest trino 370, the official documentation shows that the v2 was not supported, but the actual test shown that v2 Copy On Write table can meet the project needs, so upgrade to Trino 370 and use iceberg v2 CoW tables.

Hive 2.x’s Tez engine threw an ArrayIndexOutOfBoundsException when querying Iceberg tables, requiring the introduction of TEZ-4248 and a recompile of Tez 0.9.2 to resolve the issue. For Hive 3.x, you need to use Tez 0.10.1.

3. Query Performance

Overall query efficiency has improved, but query planning has become heavier.

1) For Hive, the AM memory configuration needs to be upgraded in order to complete common query planning.

2) If the partition key is type-transformed before filtering, Trino’s predicate push down does not work, resulting in very slow query planning. For example, in Tableau queries, especially when incremental refresh is enabled, users often convert date(dt) into date type for filtering, while the type of dt is string, if the query has where date(dt) > xxx, then the planning stage needs to scan all the manifest file, which is very time consuming. Users need to be educated to avoid this usage by changing to dt > cast(xxx as varchar) to avoid this. This issue has been brought to the attention of the community, trinodb-12925.

4. Integration with offline systems

Using Flink to write Iceberg, the data itself can be updated up to the minute level. But offline systems usually rely on explicit completion signals for processing, e.g. Airflow usually waits for a hive partition, or success file, to trigger a downstream task. Iceberg tables, on the other hand, currently have no native mechanism for outputting partition completion signals to trigger downstream jobs.

In our implementation, the Flink job in the business layer determines the completion of the partition based on the length of the data break, and outputs the SUCCESS file, which makes the job logic relatively complex.

Benefits

After converting the original Hive tables into Iceberg format, the benefits are also very obvious.

1. 85% improvement in data deletion efficiency

We measured the infrastructure costs associated with the Hive solution compared to the Iceberg solution for deletions of the same amount of data and found that the cost reduction of 85% for Iceberg table deletion operations was significant.

2. The timeliness of the data was optimized from daily to hourly

Partition granularity was improved and no additional pressure was put on the Hive Metastore. On the contrary, because the partition information is not stored in the HMS, it actually reduces the pressure on the HMS significantly.

3. More efficient data revision

Sometimes the generated data is partially wrong due to bugs in the business logic, the previous Hive approach requires rewriting the entire partition, which is time-consuming and costly; whereas with Iceberg tables, the changes can be made through Spark’s MERGE INTO syntax, which is much faster.

Summary

In this article, we have introduced the first large-scale implementation of Apache Iceberg in SmartNews through a concrete example, and introduced our thinking, implementation solutions, challenges encountered, and benefits achieved. In fact, Iceberg is currently used in a number of other projects, for example, in the advertising scenario, the exposure, click and conversion events are separated by a long time, so it is necessary to revise the previous partition, SmartNews uses Iceberg’s transaction mechanism to avoid the problem of unreadability during the revision of the Hive table; and plans to use Iceberg’s MERGE INTO function to achieve the problem of data joins under large windows. We are also evaluating the CDC into the lake option, and so on.

Finally, many thanks to the Apache Iceberg community for enabling us to solve some long-standing fundamental technical issues.

--

--