Apache Hudi MOR table with Spark datasource Quick Start

Sivabalan Narayanan
4 min readFeb 26, 2023

--

It dawned on me recently, that Hudi’s website does not have a quick start example for MOR table type. So, thought will put up one for beginners. We can look at how to get started with Hudi with MOR table using spark datasource.

We will go through similar layout as the quick start.

Launching spark-shell

./bin/spark-shell \
--packages org.apache.hudi:hudi-spark2.4-bundle_2.11:0.13.0 \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

TableCreation/Insertion

With spark datasource, there is no specific command to create hudi table. First command to write to a new location will automatically create the hudi table.

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
import org.apache.hudi.common.model.HoodieRecord
import org.apache.spark.sql.functions._

val tableName = "hudi_trips_mor"
val basePath = "file:///tmp/hudi_trips_mor"
val dataGen = new DataGenerator

val inserts = convertToStringList(dataGen.generateInserts(100))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.withColumn("marker_col",lit("val1")).write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)

Note: I am adding a column named “marker_col” to assist w/ validation when querying.

Querying hudi table

No special handling for an MOR table. You can read it as you might read a COW table.

spark.
read.
format("hudi").
load(basePath).createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0").show()

spark.sql("select marker_col, count(*) from hudi_trips_snapshot group by 1 order by 1").show(10, false)

Updates

Let’s add updates to a subset of records.

val updates = convertToStringList(dataGen.generateUpdates(75))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.withColumn("marker_col",lit("val2")).write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)

Let’s query to validate the updated record count.

spark.
read.
format("hudi").
load(basePath).createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("select marker_col, count(*) from hudi_trips_snapshot group by 1 order by 1").show(10, false)

Reason why we don’t see 50 with “val2” for maker_col is that, our data generator may not guarantee unique updates, and hence.

Let’s take a look at “.hoodie” for timeline.

Let’s also take a look at the data directory for one of the partition.

First commit created a base parquet file and the 2nd commit added the log file.

Compaction

Compaction is an optimization technique where new base file and associated log files are compacted to form a new version of the base file. You can read more details here.

Let’s add another update and enable inline compaction.

val updates = convertToStringList(dataGen.generateUpdates(50))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.withColumn("marker_col",lit("val3")).write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option("hoodie.datasource.write.table.type","MERGE_ON_READ").
option("hoodie.compact.inline","true").
option("hoodie.compact.inline.max.delta.commits","2").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)

Note the two additional options we added to enable inline compaction.

"hoodie.compact.inline"="true"
"hoodie.compact.inline.max.delta.commits"="2"

I have left out other compaction configs (like strategy) to pick defaults to keep the quick start simple. You can refer to this blog for all compaction configs.

spark.
read.
format("hudi").
load(basePath).createOrReplaceTempView("hudi_trips_snapshot")

spark.sql("select marker_col, count(*) from hudi_trips_snapshot group by 1 order by 1").show(10, false)

Let’s take a look at the timeline files.

We can see that compaction has kicked in and is complete in the timeline.

We can also take a look at the data directory.

Files are ordered by last mod time. So, we can see that a new base file is created with base time matching the compaction time.

Conclusion

Hope this blog gave you a simple way to get started with MOR table in Apache Hudi.

--

--