Managing Duplicates with Apache Hudi
Most users have a lot of dashboards and reports built out from their Lakehouse tables and happens to be a critical source for taking key business decisions in many cases. Ensuring data is reliable is the minimum requirement that any management will expect for such reports and dashboards.
Data reliability generally means, no data loss and no duplicates. Users could have different requirements wrt handling duplicates with their Lakehouse tables. Let’s go over each requirement and discuss how they can fullfil their needs with Apache Hudi. As seen in previous blogs, Apache Hudi has been designed to be a database with all bells and whistles so you can cater to all different needs of the user. And hence you may see quite a few config knobs. So, this blog series is an attempt to explain some of the configs and in what ways it assists users in meeting their needs.
PreCombine/Ordering field:
Hudi has a requirement to have primary key (record key) defined for any table. We are looking to relax this requirement in upcoming release, but as of 0.12.0, any Hudi table needs to have a record key set. You can read more about diff Key generator options here. For a partitioned dataset, a pair of PartitionPath and RecordKey is unique across entire dataset and for a non-partitioned dataset, record key is unique across entire dataset. To support updates, users have to set preCombine field or ordering field which will be used to deduce the latest version of a given record. For eg,
PartitionPath1, RecordKey1, Val1(…., OrderingVal1, …)
PartitionPath1, RecordKey1, Val2(…., OrderingVal2, …)
If you ingest these two records into hudi, val2 will win over val1 since its ordering value is higher. And hence when you query hudi, you will just see PartitionPath1, RecordKey1, Val2.
There are different ways in which you can set the record keys. For spark-data source, spark-structured streaming and Deltastreamer writes, you can set using https://hudi.apache.org/docs/configurations#hoodiedatasourcewriterecordkeyfield-2 config. For spark-sql writes, you can set while creating the table.
create table hudi_cow_tbl (
id int,
.
.
) using hudi
tblproperties (
type = ‘cow’,
primaryKey = 'id',
preCombineField = 'ts'
);
Similarly, to set preCombine, you need to set https://hudi.apache.org/docs/configurations#hoodiedatasourcewriteprecombinefield for spark-datasource, spark-structured streaming writes. For spark-sql writes, you can set table properties while creating the table
create table hudi_cow_tbl (
id int,
.
.
) using hudi
tblproperties (
type = ‘cow’,
primaryKey = 'id',
preCombineField = 'ts'
);
For Deltastreamer, you need to set it as a top level config.
` — source-ordering-field [ORDERING_FIELD]`
Coming back to the topic of this blog, by default with “upsert” operation, Hudi de-dups incoming batch of records (based on the partition path and reocrd key) to be ingested before it sends the records further to core write layer.
For eg, if you ingest below batch.
PartitionPath1, RecordKey1, Val1(…., OrderingVal1, …)
**PartitionPath1, RecordKey2, Val2(…., OrderingVal2, …)**
PartitionPath1, RecordKey3, Val3(…., OrderingVal1, …)
**PartitionPath1, RecordKey2, Val4(…., OrderingVal5, …)**
Hudi will de-dup this batch, before further processing as below.
PartitionPath1, RecordKey1, Val1(…., OrderingVal1, …)
PartitionPath1, RecordKey3, Val3(…., OrderingVal1, …)
PartitionPath1, RecordKey2, Val4(…., OrderingVal5, …)
Config that drives this is https://hudi.apache.org/docs/configurations#hoodiecombinebeforeupsert
this config is enabled by default.
With “insert” operation, by default such de-dup is disabled since the intent for using “insert” is for immutable datasets. But users can still enable de-dup among the incoming batch by using https://hudi.apache.org/docs/configurations#hoodiecombinebeforeinsert
As the operation type signifies, with “upsert”, Hudi ensures you will always have only one version of the record in the final snapshot. But with “insert”, Hudi avoids any index loop up to honor the intent of the user and if you ingest two versions of the same record across two different commits using “insert” operation, there are chances that you might see duplicate records with final snapshot (since Hudi could route the two different versions of the record to two different file groups).
Let’s see with an illustration of how this might pan out if you ingest two different batches of records in to hudi.
Using “Upsert” operation type:
Batch1:
PartitionPath1, RecordKey1, Val1(…., OrderingVal1, …)
PartitionPath1, RecordKey2, Val2(…., OrderingVal1, …)
PartitionPath1, RecordKey3, Val3(…., OrderingVal1, …)
Batch2:
PartitionPath1, RecordKey1, Val4(…., OrderingVal2, …)
PartitionPath1, RecordKey4, Val5(…., OrderingVal1, …)
PartitionPath1, RecordKey5, Val6(…., OrderingVal2, …)
Final snapshot:
PartitionPath1, RecordKey1, Val4(…., OrderingVal2, …)
PartitionPath1, RecordKey2, Val2(…., OrderingVal1, …)
PartitionPath1, RecordKey3, Val3(…., OrderingVal1, …)
PartitionPath1, RecordKey4, Val5(…., OrderingVal1, …)
PartitionPath1, RecordKey5, Val6(…., OrderingVal2, …)
Using “Insert” operation type:
Batch1:
PartitionPath1, RecordKey1, Val1(…., OrderingVal1, …)
PartitionPath1, RecordKey2, Val2(…., OrderingVal1, …)
PartitionPath1, RecordKey3, Val3(…., OrderingVal1, …)
Batch2:
PartitionPath1, RecordKey1, Val4(…., OrderingVal2, …)
PartitionPath1, RecordKey4, Val5(…., OrderingVal1, …)
PartitionPath1, RecordKey5, Val6(…., OrderingVal2, …)
Final snapshot:
PartitionPath1, RecordKey1, Val1(…., OrderingVal1, …) // may or may not see this depending on small file handling.
PartitionPath1, RecordKey1, Val4(…., OrderingVal2, …)
PartitionPath1, RecordKey2, Val2(…., OrderingVal1, …)
PartitionPath1, RecordKey3, Val3(…., OrderingVal1, …)
PartitionPath1, RecordKey4, Val5(…., OrderingVal1, …)
PartitionPath1, RecordKey5, Val6(…., OrderingVal2, …)
If you are using “bulk_insert” as your write operation, you are definitely bound to see duplicates if you ingest duplicates either in same batch or across different batches/commits.
Dropping duplicates:
For event streaming type of workloads, sometimes users prefers to retain only the first version of the record and drop any newer versions. You can enable https://hudi.apache.org/docs/configurations#hoodiedatasourcewriteinsertdropduplicates if you have this requirement. Just be wary that this might add some additional overhead to your write latency since this does an additional index lookup.
Let’s go over same example and see what happens if you use “insert” operation and set https://hudi.apache.org/docs/configurations#hoodiedatasourcewriteinsertdropduplicates.
Batch1:
PartitionPath1, RecordKey1, Val1(…., OrderingVal1, …)
PartitionPath1, RecordKey2, Val2(…., OrderingVal1, …)
PartitionPath1, RecordKey3, Val3(…., OrderingVal1, …)
Batch2:
PartitionPath1, RecordKey1, Val4(…., OrderingVal2, …)
PartitionPath1, RecordKey4, Val5(…., OrderingVal1, …)
PartitionPath1, RecordKey5, Val6(…., OrderingVal2, …)
Final snapshot:
PartitionPath1, RecordKey1, Val1(…., OrderingVal1, …)
PartitionPath1, RecordKey2, Val2(…., OrderingVal1, …)
PartitionPath1, RecordKey3, Val3(…., OrderingVal1, …)
PartitionPath1, RecordKey4, Val5(…., OrderingVal1, …)
PartitionPath1, RecordKey5, Val6(…., OrderingVal2, …)
Note: RecordKey1 with Val4 is dropped from final snapshot.
Fixing duplicates:
Just incase you happened to not set the right configs and ended up w/ duplicates in your Hudi table, you can use hudi-cli to fix it.
repair deduplicate - duplicatedPartitionPath [PARTITION_PATH_TO_DEDUP] – - repairedOutputPath [OUTPUT_DIR_PATH] - sparkProperties [PATH_TO_SPARK_PROPS_FILE] - sparkMaster [SPARK_MASTER] - sparkMemory [SPARK_MEMORY]
This command will read records from [PARTITION_PATH_TO_DEDUP] and write to [OUTPUT_DIR_PATH] by de-duping the records. And then users can perform `INSERT_OVERWRITE` operation to write to hudi for data from [OUTPUT_DIR_PATH]. Hudi will overwrite existing data in the matching partition with newer data.
Conclusion:
Ensuring no duplicates is very critical for your downstream consumers. If the data is not reliable, you can’t really do much or take any critical business decisions. And cleansing your Lakehouse tables for duplicates might be a nightmare in some cases and you may need to stop your ingestion if you were to fix your tables. So, set the configs appropriately so as to meet your demands upfront with Apache Hudi.