Paimon v.s. Hudi write efficiency

Apache Hudi
apache-hudi-blogs
Published in
11 min readMar 25, 2024

Translated from the original post (Chinese) by Anryg(安瑞哥)

The reason why I want to make such a comparison is that last week I saw an article posted on the Flink official account, which included a eye-catching comparison of the write efficiency of two data lake products, Paimon and Hudi.

Quoted from Flink official public account

It immediately attracted my attention. Judging from this comparison chart, Hudi was defeated in all aspects and behaved like an “Adou who cannot be helped.”

Just when I was curious about what kind of scenario this test conclusion was based on, I browsed through the full text and found that it didn’t seem to tell me the specific test details, but directly announced to you: “Well… this is our conclusion, it is just so awesome.”

But you know, there is no specifics about the scenarios (what is the data source? What is the table schema? How is the resource configuration during writing? How is the parallelism set? etc.), and test results without any process description, I definitely don’t believe it.

So on impulse, I posted my doubts about the test conclusion on WeChat, and soon attracted the attention of Alibaba Cloud technical experts, who asked me why I doubted their test results, then I walked through my views and doubts.

But it’s useless to just suspect, in order to confirm my above doubts and show respect for other people’s test results.

I decided to test it myself and expose the whole process to you.

(This test is based on Flink1.15, Hadoop3.1, Paimon0.6, Hudi0.13)

0. Preparation before test

Let’s talk about the data source first:

Since we are doing a write test, we must first find out where the data source comes from, and the selection of this data source is very important in my opinion.

For example, its reading efficiency must not become the bottleneck of the entire data processing process. In other words, the flow rate of the upstream data source must be greater than the downstream data writing speed , so that the writing efficiency can be truly reflected. Otherwise, this test is bullshit.

In order to ensure this, I poured all the data from a large file into the Kafka topic in advance, and then used Flink to consume this topic (using its default speed). In this way, there is no data source reading. Take the bottleneck problem.

As for why? The sequential reading efficiency of the disk (reading Kafka) must be much greater than the writing efficiency of indexing based on specific fields (writing Paimon or Hudi tables).

As for the data content, it is still my previous online behavior log. It has 9 fields, the file size is: 3.1G, and the number of data lines is: 24633164.

1. Flink writes Paimon: code preparation

For the Paimon table, its table schema is as follows:

CREATE TABLE if not exists data_from_flink2paimon01 (
`client_ip` STRING ,
domain STRING,
`time` STRING,
target_ip STRING,
rcode STRING,
query_type STRING,
authority_record STRING,
add_msg STRING,
dns_ip STRING,
PRIMARY KEY(client_ip,domain,`time`,target_ip) NOT ENFORCED
)

In order to facilitate testing, I use String type for everything (don’t learn from me).

The code for Flink to read Kafka and write Paimon table is as follows:

package com.anryg.bigdata.paimon;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.types.Row;
/**
* @DESC: 通过 Flink DS 跟 SQL 混合模式读取 kafka 数据写入 Paimon 数据湖
* @Auther: Anryg
* @Date: 2023/12/19 20:36
*/
public class FlinkFromKafka2Paimon {
public static void main(String[] args) {
//获取流任务的环境变量
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
.enableCheckpointing(10000, CheckpointingMode.EXACTLY_ONCE); //打开checkpoint功能
env.getCheckpointConfig().setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/flink_checkpoint/FlinkDSFromKafka2Paimon01"); //设置checkpoint的hdfs目录
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //设置checkpoint记录的保留策略
/**创建flink table环境对象*/
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
/**添加 Kafka 数据源*/
KafkaSourceBuilder<String> kafkaSourceBuilder = KafkaSource.<String>builder() //一定注意这里的<String>类型指定,否则后面的new SimpleStringSchema()会报错
.setBootstrapServers("192.168.211.107:6667")
.setTopics("test")
.setGroupId("FlinkFromKafka2Paimon01")
.setStartingOffsets(OffsetsInitializer.timestamp(1703474520000L)) //确认消费模式为特定时间
.setValueOnlyDeserializer(new SimpleStringSchema());

/**将数据源生成DataStream对象*/
DataStreamSource<String> kafkaDS = env.fromSource(kafkaSourceBuilder.build(), WatermarkStrategy.noWatermarks(), "kafka-data");
/**将原始DS经过处理后,再添加schema*/
SingleOutputStreamOperator<Row> targetDS = kafkaDS.map((MapFunction<String, String[]>) line -> {
JSONObject rawJson = JSON.parseObject(line);
String message = rawJson.getString("message"); //获取业务数据部分
String[] msgArray = message.split(","); //指定分隔符进行字段切分
return msgArray;
}).filter((FilterFunction<String[]>) array -> {
if (array.length == 9) return true;
else return false;
}).map((MapFunction<String[], Row>) array -> Row.of( array[0], array[1], array[2], array[3], array[4], array[5], array[6], array[7], array[8]))
.returns(
Types.ROW_NAMED(
new String[]{"client_ip", "domain", "time", "target_ip", "rcode", "query_type", "authority_record", "add_msg", "dns_ip"},
Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING, Types.STRING)
);
/**将目标DS转化成Table对象*/
Table table = tableEnv.fromDataStream(targetDS);
/**创建属于paimon类型的catalog*/
tableEnv.executeSql("CREATE CATALOG hdfs_catalog WITH ('type' = 'paimon', 'warehouse' = 'hdfs://192.168.211.106:8020/tmp/paimon')");
/**使用创建的catalog*/
tableEnv.executeSql("USE CATALOG hdfs_catalog");
/**建paimon表*/
tableEnv.executeSql("CREATE TABLE if not exists data_from_flink2paimon01 (`client_ip` STRING ,domain STRING,`time` STRING,target_ip STRING,rcode STRING,query_type STRING,authority_record STRING,add_msg STRING,dns_ip STRING,PRIMARY KEY(client_ip,domain,`time`,target_ip) NOT ENFORCED)");
/**将 Kafka 数据源登记为当前catalog下的表*/
tableEnv.createTemporaryView("kafka_table", table);
/**将Kafka数据写入到 paimon 表中*/
tableEnv.executeSql("INSERT INTO data_from_flink2paimon01 SELECT * FROM kafka_table");
}
}

Note that in the code, in order to accurately consume all the Kafka data I wrote in advance, I used the timestamp method to specify the offset of data consumption.

2. Flink writes Hudi: code preparation

For the Hudi table, its table schema is as follows:

The code for Flink to read Kafka and write to the Hudi table is as follows:

package com.anryg.bigdata.hudi;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.KafkaSourceBuilder;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.streaming.api.CheckpointingMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.data.GenericRowData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.hudi.configuration.FlinkOptions;
import org.apache.hudi.util.HoodiePipeline;
import java.util.HashMap;
import java.util.Map;
/**
* @DESC: Flink 读取 Kafka 写 Hudi 表
* @Auther: Anryg
* @Date: 2023/9/3 14:54
*/
public class FlinkDSFromKafka2Hudi {
public static void main(String[] args) throws Exception {
//获取流任务的环境变量
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment()
.enableCheckpointing(Long.parseLong(args[0]), CheckpointingMode.EXACTLY_ONCE); //打开checkpoint功能
//env.setRuntimeMode(RuntimeExecutionMode.BATCH)
env.getCheckpointConfig().setCheckpointStorage("hdfs://192.168.211.106:8020/tmp/flink_checkpoint/FlinkDSFromKafka2Hudi01"); //设置checkpoint的hdfs目录
env.getCheckpointConfig().setExternalizedCheckpointCleanup(CheckpointConfig.ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION); //设置checkpoint记录的保留策略

KafkaSourceBuilder<String> kafkaSource = KafkaSource.<String>builder() //获取kafka数据源
.setBootstrapServers("192.168.211.107:6667")
.setTopics("test")
.setGroupId("FlinkDSFromKafka2Hudi01")
.setStartingOffsets(OffsetsInitializer.timestamp(1703474520000L)) //确认消费模式为特定时间
.setValueOnlyDeserializer(new SimpleStringSchema());

/**将数据源生成DataStream对象*/
DataStreamSource<String> kafkaDS = env.fromSource(kafkaSource.build(), WatermarkStrategy.noWatermarks(), "kafka-data");
DataStream<RowData> targetDS = kafkaDS.map((MapFunction<String, String[]>) line -> {
JSONObject rawJson = JSON.parseObject(line);
String message = rawJson.getString("message"); //获取业务数据部分
String[] msgArray = message.split(","); //指定分隔符进行字段切分
return msgArray;
}).filter((FilterFunction<String[]>) array -> {
if (array.length == 9) return true;
else return false;
}).map((MapFunction<String[], RowData>) array -> {
GenericRowData rowData = new GenericRowData(9);
rowData.setField(0, StringData.fromString(array[0]));
rowData.setField(1, StringData.fromString(array[1]));
rowData.setField(2, StringData.fromString(array[2]));
rowData.setField(3, StringData.fromString(array[3]));
rowData.setField(4, StringData.fromString(array[4]));
rowData.setField(5, StringData.fromString(array[5]));
rowData.setField(6, StringData.fromString(array[6]));
rowData.setField(7, StringData.fromString(array[7]));
rowData.setField(8, StringData.fromString(array[8]));
return rowData;
});

Map<String, String> options = new HashMap<>();
options.put(FlinkOptions.PATH.key(), "hdfs://192.168.211.106:8020/tmp/hudi/data_from_flink2hudi01");
//options.put(FlinkOptions.TABLE_TYPE.key(), HoodieTableType.COPY_ON_WRITE.name());
options.put("table.type", args[1]); //指定写入类型
options.put("file.format", "ORC"); //默认为Parquet
//options.put(FlinkOptions.PRECOMBINE_FIELD.key(), "client_ip,domain,time,target_ip");
HoodiePipeline.Builder builder = HoodiePipeline.builder("data_from_flink2hudi01")
.column("client_ip string")
.column("domain string")
.column("`time` string")
.column("target_ip string")
.column("rcode string")
.column("query_type string")
.column("authority_record string")
.column("add_msg string")
.column("dns_ip string")
.pk("client_ip","domain","`time`","target_ip") //主键字段
.options(options);
builder.sink(targetDS, false);
env.execute();
}
}

I originally planned to write in Scala, but it turned out that as described in the previous article, there is still a problem that the types of the Scala API and the Java API cannot be interoperable (mutually converted) in certain places (I still miss the Spark API).

3. First test comparison

The test conditions are as follows:

Originally, I thought that taskmanager would directly use the default memory (1.7G), and then just use parallelism of 1, because considering that the amount of text data I poured into Kafka was only 3.1G, but after testing, I found that it would not work. Just gave me OOM .

Therefore, I thought about re-adjusting the parameters (based on the principle of saving resources, I tried several times).

3.1 Flink writes Hudi table

Since the data is stored in Kafka in advance, after starting the program, there must be backpressure in the data source, and the writing end is very busy.

Since it is a streaming task, even if it consumes all the data I poured into Kafka, the task will not exit. In order to accurately observe the exact time when the program consumes the data, I can only stare at this page until the known amount of data is consumed, and then record the time spent by the program at this time.

So, I waited and waited and waited!

After more than an hour of long waiting, the number I was looking forward to, 24633164, finally appeared (all the data in Kafka has been consumed).

A writing feature here is that the data is written very quickly in the early stage, but in the later stage, due to some reason of compacting, the data writing speed became slower.

Then, the status of the task instantly changed from “busy” to “idle”.

The entire process takes approximately: 82 minutes.

A total of 260–2= 258 data files are generated (excluding 2 metadata directories).

Note that I am using the COPY_ON_WRITE table here.

3.2 Flink writes Paimon table

You may not believe it, but when I use the same data source and the same degree of parallelism to write the Paimon table, after the task is submitted, it actually looks like this:

The status kept spinning here, and when I came back from a 10-min break, it was still spinning.

I was puzzled, where is the problem? The key point is that I tried many times but it didn’t work (the cluster resources are sufficient).

So, I subsequently changed the parallelism to 7, 6, 5, 4, 3, 2, and 1. As a result, it worked fine from 6 onwards.

Finally, I used parallelism of 6 and finally got it running:

Although the parallelism is only 6, the efficiency of writing Paimon with Flink is obviously faster than writing Hudi.

Likewise, it took about half an hour to complete all data writing without me blinking.

The total time taken is about: 29 minutes.

The total number of files generated is 99.

Summary:

From the actual measurement, for the current comparison, Paimon’s writing efficiency and writing effect (number of files), the writing speed is more than 2 times that of Hudi, while the number of files is less than half of Hudi’s.

However, compared with the official test of Flink, the conclusion that writing efficiency is 12 times faster than Hudi COW table is not reflected here at all (of course, the amount of data tested is different ).

Comparison conclusion given by Flink official public account

But then, something strange happened. When I used the read API to read Paimon’s table and Hudi’s table data, only Hudi’s data was successfully read.

As for the Paimon table, it cannot be read no matter what (works only with small data volume, at the level of several millions).

4. Second test comparison

The test conditions are as follows:

4.1 Flink writes Hudi table

At this time, in addition to extending the checkpoint time to 2 minutes, Hudi’s table type was also changed to MERGE_ON_READ.

As for why the memory of tasksmanger should be changed to 6G, it is because when running on 4G, OOM occurs.

Let’s take a look at the results after running:

Likewise, I watched it finish with my eyes wide open, so if I pinched it at this time, I was sure.

It can be clearly seen that after switching to the MERGE_ON_READ table, the efficiency is so much faster: From the previous more than an hour, it suddenly became: 9 minutes and 36 seconds.

The number of files generated has also changed from the previous 258 to: 41 (43 minus 2 metadata folder directories).

4.2 Flink writes Paimon table

This time it is finally possible to use 8 degrees of parallelism. The other conditions are as shown in the table above. After writing 24633164 Kafka records, the conclusion is as follows:

The total time taken is about: 13 minutes and 8 seconds.

The number of files generated is 58.

Did you notice that Hudi has outperformed this time?

summary:

Compared with Paimon, Hudi performs better than Paimon in terms of writing speed and the number of generated files after changing the table type to MERGE_ON_READ.

This time, Hudi’s writing efficiency is more than 3 minutes faster than Paimon, and the number of files generated is 17 fewer than Paimon.

There is a big discrepancy with the conclusion given by Flink’s official website (of course, the amount of test data is different):

5. Third test comparison

The test conditions are as follows:

5.1 Flink writes Hudi table

I first use the MERGE_ON_READ table type to test and see the conclusion directly:

Compared with the last time, there is not much difference, the time taken is: 11 minutes and 20 seconds.

The number of files generated also remains unchanged:

Still 41.

Then use the COPY_ON_WRITE table type to test and see the conclusion:

At this time, the time taken to write all the data is: 24 minutes and 30 seconds.

The number of generated files is 90–2= 88.

5.2 Flink writes Paimon table

Let’s go straight to the conclusion:

The time it takes to write all the data is: 16 minutes and 27 seconds (watching the entire process).

The number of data files written is 54.

summary:

Judging from actual measurements, Hudi’s MOR table is better than Paimon in terms of writing speed and the number of files generated.

Hudi’s COW table is just the opposite . Its writing speed and number of files generated are worse than Paimon’s, but this gap seems to be gradually narrowing as the checkpoint time increases.

At last

Judging from the results of this actual measurement comparison, although the coverage is not complete, it can actually explain the problem and once again verify the sentence:

Any conclusion about performance testing must be based on specific scenarios to be meaningful, otherwise it will be nonsense and misleading.

Then, the test conclusions based on this test scenario are as follows:

Although there is no reverse conclusion as I posted in my WeChat circle, it is still very different from the official one.

Of course, this may be related to the closer integration between Hudi and Spark. Next time I have a chance, I will test it with Spark again (looking forward to a real reversal).

So what do you think of the conclusion of this test?

--

--