Flink-based Iceberg Real-Time Data Lake in SmartNews
In SmartNews, the data lake mainly stores advertising data, including events such as clicks/conversions collected from the server, and some other dimensional information, which can be understood as one big table. The main advertising data is stored on Kafka, and events collected on the server side are written directly to Kafka Topic in real time, while other dimensional table information, such as advertiser information, statistics, etc., is mainly stored in Mysql or Hive, and is generally updated in real time or at the hourly level. Currently this data lake is consumed by downstreams for ETL or real-time reporting data, and it’s the unified entry point for downstream data, so we will include as many dimensions as possible.
Based on these basic background presentations, we can see some of the technical challenges that this data lake needs to address:
- The first, is deduplication by advertiser key, the main reason for this is that upstream data is collected by event for each ad, for example a click/conversion on an ad generates two records, we need to level all these events, and secondly the upstream Kafka data may contain duplicate data.
- The second challenge is the need to update certain fields, such as the timestamp of an event, which requires us to calculate the time of the latest one, which requires us to have update operations on the data lake.
- Finally, there is the downstream near real-time read, which requires the data lake to have simultaneous write/read operations, and Hive is rewriting data in a way that affects the queries that are happening downstream, which requires us to create a new solution.
Data lake practice based on Iceberg v1 format
Through a period of research, we have found that Iceberg is a good solution to some of these technical difficulties:
- Iceberg supports concurrent writes and reads.
- Iceberg supports column-level statistics, improving the efficiency of downstream queries.
- schema evolution — metadata level updates without the need to update data files.
Iceberg currently supports two formats:
- Iceberg v1 format supports large volumes of analytical data tables.
- v2 supports row-level updates.
Our first solution used the v1 format, mainly because the v2 format was relatively new at the time we designed the current solution and because there were not many query engines (hive/presto) that supported v2.
In this solution, we used the Spark calculation engine for flattening and de-weighting all ad events by primary key and then de-join all dimensional table information, here we switched the upstream data source to s3 files for the following main reasons:
- This solution is an hourly solution and does not need to read stream data in real time.
- We designed the Spark task as a minimal execution unit — limiting its target data source to one hour of the day, so that it can be read directly from the s3 file path partition
- This is also designed to be less fault tolerant, for example a large spark task failure will result in the failure of the whole task, whereas a task failure for a particular hour will only result in the scheduler retrying the current hour
To avoid some double counting, we also check whether the current hour has added new files since the last time the Spark task was started, and control the start and retry of Spark tasks via Airflow. This is a limitation of the solution, as the theoretical value of this refresh window is 30 days, but given the cost and other factors, the solution only refreshes the last 96 hours. operations and write operations occur simultaneously.
The shortcomings of this solution are
- Higher hardware resource usage.
- Wasted computational resources — only about 1% of the total rows need to be updated.
- Waste of storage resources — all data needs to be rewritten for each Overwrite.
- Locking issues with parallel commits to Iceberg.
Flink-based real-time update of the data lake (Iceberg v2)
Spark + Iceberg v1 can isolate read and write issues well, but some of the previously mentioned shortcomings were also technical difficulties that plagued us. After doing some thorough research, we decided to adopt Flink + Iceberg v2 for real-time updates:
- Iceberg v2 supports row-level updates.
- Flink Real-Time Writes — Merge On Read.
- MySQL CDC streaming solution for dimension join.
In this new solution, we stream the upstream data source to Kafka, and at the same time, input the MySQL dimension table information to the Flink task by way of CDC, Flink then broadcasts this dimension table information to the state, and finally Flink writes the data to the data lake in real time through IcebergSink’s upsert mode to write the data to the data lake in real time. In the offline tasks we also use airflow to start some spark tasks to do the merging of data files at regular intervals, mainly to solve the problem of small files, and we will also introduce the optimisation in detail in the later sections.
Compared to the previous solution, we can see below differences:
In Flink + Iceberg v2 solution, we can see that there is a significant advantage over the Spark + Iceberg v1 solution in all areas except for the number of output files. In terms of hardware costs, the Flink + Iceberg v2 solution also reduces costs by at least 50%, while improving the latency.
In the next article of this series, we will focus on how we have optimized the generation of small files.