Different Query types with Apache Hudi

Sivabalan Narayanan
6 min readMay 29, 2023

--

Introduction

It is a known fact that write to read ratio in an analytical system is generally 1:100 or even more and the reads are done by various consumers for different purposes ranging from real time dashboarding to weekly reports to downstream ETL pipelines and much more. So, to cater to such various diverse requirements, Apache Hudi offers different query types. This blog goes over different query types that Hudi offers.

Snapshot or Realtime query

This is the most commonly and widely used query which gives you the latest snapshot of the entire dataset. 90%+ users generally use this query time. The default query type in hudi is snapshot/realtime query and hence no additional query params are required.

Let’s walk through a simple illustration.

Setup:

I am re-using the quick start guide that is shared here. I am adding a new column named “batchId” for every batch of data being ingested and setting value as “batch_1”, “batch_2”, etc to distinguish different batches being ingested.

Commit1: Insert 100 records with batchID “batch_1”

Commit2: Update all 100 records with batchId “batch_2”

Commit3: Update 75 records with batchId “batch_3”

Commit4: Update 50 records with batchId “batch_4”

Commit5: Update 25 records with batchId “batch_5”

Note: I chosen different set of records in each commit, so that we can articulate some of the examples clearly.

Snapshot query will return state of the table after all ingests (after commit5)

scala> spark.sql("select batchId, count(*) from tbl group by 1 order by 2 desc").show()
+ - - - -+ - - - - +
|batchId|count(1)|
+ - - - -+ - - - - +
|batch_2| 49|
|batch_3| 27|
|batch_4| 15|
|batch_5| 9|
+ - - - -+ - - - - +

Time travel query or Timestamp as of query

You can query the table snapshot as of an older point in time. This might come in very handy when you are building downstream ETLs/ having consumers where you are required to join multiple tables so that you can query every table as of time “tN”. If not for this support, when consuming data from N tables some table could have more updates while others are yet to catch up. So, this might be very critical in such cases.

Here are the commit times for reference:

ls /tmp/hudi_trips_mor/.hoodie/ | grep deltacommit | grep -v inflight | grep -v requested
20230527155829098.deltacommit // commit1
20230527155833243.deltacommit // commit2
20230527162835090.deltacommit // commit3
20230527162925939.deltacommit // commit4
20230527162941496.deltacommit // commit5

So, lets try out the time travel query for commit 2 and commit4.

For commit2:

scala> spark.read.format("hudi").option("as.of.instant","20230527155833243").load(basePath).createOrReplaceTempView("tbl")
scala> spark.sql("select batchId, count(*) from tbl group by 1 order by 2 desc").show()
+ - - - -+ - - - - +
|batchId|count(1)|
+ - - - -+ - - - - +
|batch_2| 100|
+ - - - -+ - - - - +

Since we updated all 100 records in commit2 which was inserted in commit1, we are seeing all records with batch_2.

For commit4:

scala> spark.read.format("hudi").option("as.of.instant","20230527162925939").load(basePath).createOrReplaceTempView("tbl")
scala> spark.sql("select batchId, count(*) from tbl group by 1 order by 2 desc").show()
+ - - - -+ - - - - +
|batchId|count(1)|
+ - - - -+ - - - - +
|batch_2| 55|
|batch_3| 28|
|batch_4| 17|
+ - - - -+ - - - - +

Since on commit3 and commit4, we updated a subset of records, we are seeing records from all commits that happened until commit4.

Incremental Query

This is one of the defining features for Apache Hudi. You can query a hudi table to fetch only the new changes after a given commit time. So, if you were to build a ETL job sinking to a silver hudi table consuming from a bronze hudi table, this incremental query will come in very handy. Its very performant compared to triggering a snapshot query and then filtering. Also, a snapshot query may not give you all change entries between two different times. For eg, if a record has been updated in commit3, commit4 and commit5, snapshot query might give you the value of the record after commit5 only. It may not return the value at commit4. So, when requirement necessitates, you can’t go w/ snapshot query.

You need to set the query type config and begin instant time config. End instant time config is optional.

Let’s take a look at few examples.

Trying out incremental query with begin time as commit3.

scala> spark.read.format("hudi").
option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
option(BEGIN_INSTANTTIME_OPT_KEY, "20230527155833243").
load(basePath).createOrReplaceTempView("tbl1")
scala> spark.sql("select batchId, count(*) from tbl1 group by 1 order by 2 desc").show()
+ - - - -+ - - - - +
|batchId|count(1)|
+ - - - -+ - - - - +
|batch_3| 50|
|batch_4| 23|
|batch_5| 16|
+ - - - -+ - - - - +

Since we only gave “begin instant time” and no end instant time, hudi served all updates after begin instant time up until latest. Thats why we are seeing results similar to snapshot query. Btw, please do remember that begin instant time is exclusive here. In other words, if you trigger an incremental query with begin instant time as tN, hudi will serve changes whose commit time > being time. Those that matches the begin instant time may not be served.

Lets try to limit the incremental query just for 1 commit. You may need to set the config end instant time for it.

scala> spark.read.format("hudi").
| option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
| option(BEGIN_INSTANTTIME_OPT_KEY, "20230527155833243"). // commit2
| option(END_INSTANTTIME.key,"20230527162835090"). // commit3
| load(basePath).createOrReplaceTempView("tbl1")
scala>
scala> spark.sql("select batchId, count(*) from tbl1 group by 1 order by 2 desc").show()
+ - - - -+ - - - - +
|batchId|count(1)|
+ - - - -+ - - - - +
|batch_3| 75|
+ - - - -+ - - - - +

As you could see, we triggered an incremental query w/ begin time as commit2 (exclusive) and end time as commit3 and so the output shows all records between commit2 and commit3 which contains batchId as “batch_3”.

Lets try the same w/ begin as commit2 and end as commit4.

scala> spark.read.format("hudi").
| option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_INCREMENTAL_OPT_VAL).
| option(BEGIN_INSTANTTIME_OPT_KEY, "20230527155833243"). // commit2
| option(END_INSTANTTIME.key,"20230527162925939"). // commit4
| load(basePath).createOrReplaceTempView("tbl1")
scala>
scala> spark.sql("select batchId, count(*) from tbl1 group by 1 order by 2 desc").show()
+ - - - -+ - - - - +
|batchId|count(1)|
+ - - - -+ - - - - +
|batch_3| 58|
|batch_4| 28|
+ - - - -+ - - - - +

Since a subset of records in commit3 was also updated in commit4, we see records from both batches.

Read Optimized query

This is a special type of query thats applicable only for MOR table type. Please check out this blog on both table types in Hudi.

Hudi always strives to give more flexibility to end users to cater to different needs of the user. On that lens, this is yet another flexibility for hudi users. In the well known CAP theorem, not always everyone prefers consistency over availability. There could be users who prefer availability over consistency. So, users who prefers consistency can go with snapshot query, but those who might prefer faster query times, or availability over consistency can go with Read Optimized query. As the name suggests, these are very much optimized for reads. In case of snapshot query in MOR table, merging of base files and log files will happen during reads incurring some additional latency. But for those willing to trade off consistency over availability, can go with read optimized query. This will ignore any log files and only serve data from the base data files.

Lets check out what does read optimized query return for the table of interest.

scala> spark.read.format("hudi").
| option(QUERY_TYPE_OPT_KEY, QUERY_TYPE_READ_OPTIMIZED_OPT_VAL).
| load(basePath).createOrReplaceTempView("tbl1")
scala>
scala> spark.sql("select batchId, count(*) from tbl1 group by 1 order by 2 desc").show()
+ - - - -+ - - - - +
|batchId|count(1)|
+ - - - -+ - - - - +
|batch_1| 100|
+ - - - -+ - - - - +

So, in above scenario, even though we have made 5 commits, if we trigger a read optimized query, data will match commit1’s data since all subsequent commits added new logs files for existing base files.

Once compaction kicks in, read optimized will match the snapshot query just after that. But again if there are new log files added, read optimized might differ from snapshot query as expected.

For dashboards reporting week over week stats or some historical data, may not be interested in the last 1 to 2 hours of data. For such use-cases, they can get improved read latency with this query type. But if consistency matters, it is recommended to go with snapshot query though.

Conclusion

Hudi offers diverse types of query to cater to different needs and use-cases. Hope this blog helped you gain knowledge about all different offerings.

--

--