Speed up your write latencies using Bucket Index in Apache Hudi

Sivabalan Narayanan
4 min readApr 8, 2023

--

Hudi employs indexing on the write path to detect updates vs inserts and to route updates to the same file group deterministically. Hudi supports different indexing options out of the box like bloom, simple, hbase, bucket, global_bloom, global_simple etc. We will discuss about bucket index in Apache Hudi and how is it different from other indexes.

Write Skeleton

This is what happens under the hood for one batch of ingestion into hudi.

One of the critical stage is the indexing stage among all of the above stages. Mostly the index look up will dictate the write latencies, since every other stage is reasonably bounded or deterministic. But the indexing latency is dependent on a lot of factors like, total data in your table, data being ingested, partitioned vs non-partitioned, regular index vs global index, update spread, record key temporal characteristics, etc. So, often times we see engineers/developers spend time trying to reduce the index look up time.

Bucket Index

Bucket index is very special and different compared to every other index supported by Hudi. Every other index has some way of indexing and index look up involves looking up the indexed metadata and deducing the record locations. Whereas, incase of bucket index, its hash of record key or based on some column by which hudi determines where the record is located. IMO, we could have named this StaticHashIndex only instead of BucketIndex. Anyways, as you could imagine, computing a hash is O(1) w/o any IO, and so, you save all the time required for indexing during writes.

Only drawback with bucket index is, the num buckets per partition has to be defined upfront for a given table. For eg, when you start a fresh table, you can define 16 buckets per partition, and hudi will allocate 16 file groups for every partition in the table. So, incoming records are hashed out to mod by 16 and then routed to corresponding file groups. Write handle for each file group will deduce inserts / updates and will merge the records based on that.

Performance comparison

I ran a very small scale experiment just to elucidate the difference b/w bloom index and bucket index.

Data set characteristics:

7GB in total size of ~13M records with 10 partitions equally distributed.

Upsert: Sampled 50% of the total batch and tried upserting.

Total write latency for the upsert(2nd commit) comparing bloom index and bucket index.

Here is the spark UI screen shots for both. With bucket index, you can visibly see there is no stages involved for index lookup, where as for bloom index, we do see couple of stages/jobs in there for index tagging.

Bloom Index spark UI

Bucket Index spark UI

Code snippet:

Bloom index:

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord

val tableName = "hudi_bloom_index"
val basePath = $TARGET_LOCATION
val inputPath = $INPUT_LOCATION // with parquet dataset as input.

val df = spark.read.format("parquet").load(inputPath)
df.cache

df.write.format("hudi").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(PARTITIONPATH_FIELD_OPT_KEY, "partition").
option(RECORDKEY_FIELD_OPT_KEY, "key").
option("hoodie.table.name", tableName).
option("hoodie.metadata.enable","false").
option("hoodie.datasource.write.operation","insert").
mode(Overwrite).
save(basePath)

// upsert 50% of same batch.

df.sample(0.5).write.format("hudi").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(PARTITIONPATH_FIELD_OPT_KEY, "partition").
option(RECORDKEY_FIELD_OPT_KEY, "key").
option("hoodie.table.name", tableName).
option("hoodie.metadata.enable","false").
mode(Append).
save(basePath)

Note: in EMR bloom index is default. Locally, SIMPLE index is the default.

Bucket index:

We have set 12 as the num buckets.

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord

val tableName = "hudi_bucket_index"
val basePath = $TARGET_LOCATION
val inputPath = $INPUT_LOCATION // with parquet dataset as input.

val df = spark.read.format("parquet").load(inputPath)
df.cache

df.write.format("hudi").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(PARTITIONPATH_FIELD_OPT_KEY, "partition").
option(RECORDKEY_FIELD_OPT_KEY, "key").
option("hoodie.table.name", tableName).
option("hoodie.metadata.enable","false").
option("hoodie.index.type","BUCKET").
option("hoodie.index.bucket.engine","SIMPLE").
option("hoodie.storage.layout.partitioner.class","org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner").
option("hoodie.bucket.index.num.buckets","12").
option("hoodie.datasource.write.operation","insert").
mode(Overwrite).
save(basePath)

Upsert 50% of records:

df.sample(0.5).write.format("hudi").
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(PARTITIONPATH_FIELD_OPT_KEY, "partition").
option(RECORDKEY_FIELD_OPT_KEY, "key").
option("hoodie.table.name", tableName).
option("hoodie.metadata.enable","false").
option("hoodie.index.type","BUCKET").
option("hoodie.index.bucket.engine","SIMPLE").
option("hoodie.storage.layout.partitioner.class","org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner").
option("hoodie.bucket.index.num.buckets","12").
mode(Append).
save(basePath)

Configs of interest:

These are the configs to set if you prefer to use bucket index.

option("hoodie.index.type","BUCKET").
option("hoodie.index.bucket.engine","SIMPLE").
option("hoodie.storage.layout.partitioner.class","org.apache.hudi.table.action.commit.SparkBucketIndexPartitioner").
option("hoodie.bucket.index.num.buckets","12")

Note: If you prefer to use bucket index for a table, you have to start fresh. You can’t switch from bloom to bucket half way through.

Conclusion

You can speed up your write latencies by a large degree if your use-case fits well with bucket index. You can also choose any column of your choice for the hashing, if not record key will be used for the hashing. Hope the blog was useful in understanding the bucket index w/ Apache Hudi.

--

--