Quickly start using Apache Hudi on AWS EMR

Ritik Kaushik
2 min readJul 9, 2023

--

This article is gonna be small, I will not be explaining what is hudi and how it works internally. The goal will be to setup it on you EMR machine. Using Hudi on EMR is super easy as it comes pre-installed on EMR.

Versions Used

EMR:- emr-5.30.1
Spark:- 2.4.5
Hudi:- 0.5.2

You just need to pass the jars and some confs in spark-submit command. These jars comes pre-installed on EMR.

spark-submit --name "hudi_setup" 
--jars /usr/lib/hudi/hudi-spark-bundle.jar,/usr/lib/spark/external/lib/spark-avro.jar
--conf "spark.serializer=org.apache.spark.serializer.KyroSerializer"
--conf "spark.sql.hive.convertMetaStoreParquet=false"
your_file.py

I have used emr version 5.30.1 in my company. If you are working with another version, these steps might not work as it is, you might face some challenges because with every emr version hudi version also changes, in my case my emr version is 5.30.1 and version of hudi is 0.5.2. In the latest versions of EMR, you might not need to pass spark-avro.jar in the jar files.
You can use the comments section to let me know if the steps working for you or not.

Create a COPY ON WRITE TABLE in Hudi.

from pyspark.sql.functions import *
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
inputDF = spark.createDataFrame(
[
("100", "Ritik", "23", "India", "UP"),
("101", "Lokesk", "23", "India", "KA"),
("102", "Romil", "25", "France", "CA"),
("103", "Nishant", "25", "Australia", "SE")
],
["id", "name", "age", "country", "state"]
)

hudiOptions = {
'hoodie.table.name': 'test_fact',
'hoodie.datasource.write.recordkey.field': 'id',
#this is required, it is the primary key column in your hudi table. In our case we can id as it has unique values
'hoodie.datasource.write.keygenerator.class': 'org.apache.hudi.keygen.ComplexKeyGenerator',
#this is required, in my case i am going to partiton my table by country and state
#that's why i used ComplexKeyGenerator. In most of the cases either you will want to
#parition only by one column OR do not want to partition at all, so you'll use
#SimpleKeyGenerator or NonpartitionedKeyGenerator
'hoodie.datasource.write.partitionpath.field': 'country,state',
#While writing I will partition my table by country and state
'hoodie.datasource.write.precombine.field': 'age',
#In case of two records with same id comes into table, so when we do select
#query on table hudi will give the record which has more age
}

inputDF.write\
.format('org.apache.hudi')\
.option('hoodie.datasource.write.operation', insert')\
.options(**hudiOptions)\
.mode('overwrite')\
.save("your_s3_path or hdfs path")

Since I am using hudi version 0.5.2, most of the things I have to pass into hudi options variable. In latest versions, you don’t need to pass most of the options as hudi will pick default values for them. Like for precombine field option hudi will take the timestamp value by default, for keygenerator class hudi will pick the value based on the partitions you have given.

Upsert operation on this table

Now suppose you want to change the age of a person.

updateDF = inputDF.where("id in ('100')").withColumn('age', lit('24'))
updateDF.write\
.format('org.apache.hudi')\
.option('hoodie.datasource.write.operation', 'upsert')\
.options(**hudiOptions)\
.mode('append')
.save("your table s3 path or hdfs path")

That’s it. I haven’t gone much in depth. But if you like me to cover more about hudi, let me know in the comments section.

--

--