Query and Update Hudi Dynamic Dataset in AWS S3 Data Lake With Athena/EMR

Shantanu.G
The Startup
Published in
5 min readJul 25, 2020

Background

Apache Hudi is in use at organizations such as Alibaba Group, EMIS Health, Linknovate, Tathastu.AI, Tencent, and Uber, and is supported as part of Amazon EMR by Amazon Web Services and Google Cloud Platform. Recently, Amazon Athena adds support for querying Apache Hudi datasets in Amazon S3-based data lake. In this blog, I am going to test it and see if Athena can read Hudi format data set in S3.

Preparation — Spark Environment, S3 Bucket

We need Spark to write Hudi data. Login to Amazon EMR and launch a spark-shell:

$ export SCALA_VERSION=2.12
$ export SPARK_VERSION=2.4.4
$ spark-shell \
--packages org.apache.hudi:hudi-spark-bundle_${SCALA_VERSION}:0.5.3,org.apache.spark:spark-avro_${SCALA_VERSION}:${SPARK_VERSION} \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer'
...
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 2.4.4
/_/

Using Scala version 2.12.10 (OpenJDK 64-Bit Server VM, Java 1.8.0_242)
Type in expressions to have them evaluated.
Type :help for more information.
scala>

Now input the following scala code to setup table name, base path and a data generator to generate records for this article. Here we set the basepath to a folder s3://hudi_athena_test/hudi_trips in Amazon S3 bucket, so we can query it later:

import org.apache.hudi.QuickstartUtils._
import scala.collection.JavaConversions._
import org.apache.spark.sql.SaveMode._
import org.apache.hudi.DataSourceReadOptions._
import org.apache.hudi.DataSourceWriteOptions._
import org.apache.hudi.config.HoodieWriteConfig._
val tableName = "hudi_trips"
val basePath = "s3://hudi_athena_test/hudi_trips"
val dataGen = new DataGenerator

Insert data

Generate some new trips, load them into a DataFrame and write the DataFrame into the Hudi table as below.

val inserts = convertToStringList(dataGen.generateInserts(10))
val df = spark.read.json(spark.sparkContext.parallelize(inserts, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Overwrite).
save(basePath)

Create Athena Database/Table

Hudi has a built-in support of table partition. It is enforced in their schema design, so we need to add partitions after create tables. I found a neat command line tool athenareader which supports multiple queries with Athena and other awesome features. This awesome tool is open-sourced by Henry Fuheng Wu, Raymond Won, Nick Cobb, Mingjie Lai, and Matt Ranney from Uber. To install it, just run:

go get -u github.com/uber/athenadriver/athenareader

Then create a file hudi_athena_test.sql with these SQL statements:

DROP DATABASE IF EXISTS hudi_athena_test CASCADE;create database hudi_athena_test;CREATE EXTERNAL TABLE `trips`(
`begin_lat` double,
`begin_lon` double,
`driver` string,
`end_lat` double,
`end_lon` double,
`fare` double,
`rider` string,
`ts` double,
`uuid` string
) PARTITIONED BY (`partitionpath` string)
ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe'
STORED AS INPUTFORMAT 'org.apache.hudi.hadoop.HoodieParquetInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat'
LOCATION 's3://hudi_athena_test/hudi_trips'
ALTER TABLE trips ADD
PARTITION (partitionpath = 'americas/united_states/san_francisco') LOCATION 's3://hudi_athena_test/hudi_trips/americas/united_states/san_francisco'
PARTITION (partitionpath = 'americas/brazil/sao_paulo') LOCATION 's3://hudi_athena_test/hudi_trips/americas/brazil/sao_paulo'
PARTITION (partitionpath = 'asia/india/chennai') LOCATION 's3://hudi_athena_test/hudi_trips/asia/india/chennai'

Then run the following command to execute these SQL statements:

$ athenareader -q hudi_athena_test.sql -a

Query Hudi with Athena

If there is no error, it means the database and tables are created in Athena. So you can query the Hudi trips data from Athena. The below is my query result with athenareader:

athenareader -q "select * from trips" -o markdown

We can query with some condition as well:

athenareader -q "select fare,rider from trips where fare>20" -o markdown

Update Hudi Data in Spark

Hudi can update S3 data in place. Now go back to the spark-shell and update some data with the following commands:

val updates = convertToStringList(dataGen.generateUpdates(10))
val df = spark.read.json(spark.sparkContext.parallelize(updates, 2))
df.write.format("hudi").
options(getQuickstartWriteConfigs).
option(PRECOMBINE_FIELD_OPT_KEY, "ts").
option(RECORDKEY_FIELD_OPT_KEY, "uuid").
option(PARTITIONPATH_FIELD_OPT_KEY, "partitionpath").
option(TABLE_NAME, tableName).
mode(Append).
save(basePath)

The command runs around 5 seconds in my test. After it is done, we can query it with athenareader again.

athenareader -q "select * from trips" -o markdown

We can see the data has been changed, compared with the one we got above:

Update Hudi Data in Hive/Athena

Like Hive, Presto or other big data OLAP query engines, Athena doesn’t support data update, query snapshot or incrementally querying like what you can do in Spark. To verify this, you can launch a hive query in hive shell, and run

select * from hudi_athena_test.trips;

You will get the same result as that in Athena, but if I run an update, I got errors like this:

hive> update hudi_athena_test.trips set fare=1.2345 where rider='rider-243' and partitionpath='asia/india/chennai';
FAILED: SemanticException [Error 10294]: Attempt to do update or delete using transaction manager that does not support these operations.

Then let’s create a snapshot with spark-shell:

spark.
read.
format("hudi").
load(basePath + "/*/*/*/*").
createOrReplaceTempView("hudi_trips_snapshot")

We can query it with Scala:

val commits = spark.sql("select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime").map(k => k.getString(0)).take(50)
val beginTime = commits(commits.length - 2)

Querying with Athena will fail since the data is not materialized:

$ athenareader -q "select distinct(_hoodie_commit_time) as commitTime from hudi_trips_snapshot order by commitTime"
SYNTAX_ERROR: line 1:57: Table awsdatacatalog.hudi_athena_test.hudi_trips_snapshot does not exist

According to the official document querying Hudi dataset in Athena, Athena supports reading of the compacted view of Hudi data.

However, we can still create table view in Athena and query it. Now create view on a Hudi table in Athena:

$ athenareader -q "create view fare_greater_than_40 as select * from trips where fare>40" -a

Query the view:

$ athenareader -q "select fare,rider from fare_greater_than_40"
FARE RIDER
43.4923811219014 rider-213
63.72504913279929 rider-284
90.25710109008239 rider-284
93.56018115236618 rider-213
49.527694252432056 rider-284
90.9053809533154 rider-284
98.3428192817987 rider-284

At the time of this writing, AWS Glue crawler doesn’t support Hudi but you can check Add Newly Created Partitions Programmatically into AWS Athena schema.

Reference:

--

--