Apache Hudi: Copy-on-Write Explained

Wojciech Walczak
Plumbers Of Data Science
11 min readDec 23, 2022

You are responsible for handling batch data updates. Your current Apache Spark solution reads in and overwrites the entire table/partition with each update, even for the slightest change. It sucks, and you know it.

If this description matches your current situation, you should get familiar with Apache Hudi’s Copy-on-Write storage type. The diagram below compares these two approaches.

Classic Spark approach to updating data vs. Hudi-based approach

This tutorial will consider a made up example of handling updates to human population counts in various countries. We won’t clutter the data with long UUIDs or timestamps with millisecond precision. Instead, we will try to understand how small changes impact the overall system.

Hudi has an elaborate vocabulary. By following this tutorial, you will become familiar with it. Pay attention to the terms in bold.

Basic Setup

All you need to run this example is Docker. Executing this command will start a spark-shell in a Docker container:

The /etc/inputrc file is mounted from the host file system to make the spark-shell handle command history with up and down arrow keys. Remove this line if there’s no such file on your operating system.

Once the Spark shell is up and running, copy-paste the following code snippet. There, you can find a tableName and basePath variables — these define where Hudi will store the data. Also, two functions, upsert and showHudiTable are defined. We will use these to interact with a Hudi table. All the important pieces will be explained later on.

These functions use global variables, mutable sequences, and side effects, so don’t try to learn Scala from this code. Let’s focus on Hudi instead!

First Upsert to a Hudi Table

It’s 1920, the First World War ended two years ago, and we managed to count the population of newly-formed Poland. Let’s save this information to a Hudi table using the upsert function.

// in spark-shell
upsert(
Array(
"""
{
"year": 1919,
"country": "poland",
"partition": "continent=europe",
"population": 21
}
""",
"""
{
"year": 1920,
"country": "poland",
"partition": "continent=europe",
"population": 24
}
"""
),
mode=Overwrite
)

If you’re observant, you probably noticed that the record for the year 1919 sneaked in somehow. Turns out we weren’t cautious enough, and some of our test data (year=1919) got mixed with the production data (year=1920). We can blame poor environment isolation on sloppy software engineering practices of the 1920s.

Technically, this time we only inserted the data, because we ran the upsert function in Overwrite mode. When the upsert function is executed with the mode=Overwrite parameter, the Hudi table is (re)created from scratch. But what does upsert mean? It’s a combination of update and insert operations. By default, Hudi’s write operation is of upsert type, which means it checks if the record exists in the Hudi table and updates it if it does. Conversely, if it doesn’t exist, the record gets created (i.e., it’s inserted into the Hudi table).

Let’s take a look at the data. Run showHudiTable() in spark-shell. The output should be similar to this:

+-------+---------+----+----------+
|country|partition|year|population|
+-------+---------+----+----------+
| poland| europe|1920| 24|
+-------+---------+----+----------+

At the highest level, it’s that simple. You have a Spark DataFrame and save it to disk in Hudi format.

You can find the mouthful description of what Hudi is on project’s homepage:

Hudi is a rich platform to build streaming data lakes with incremental data pipelines on a self-managing database layer, while being optimized for lake engines and regular batch processing.

For now, let’s simplify by saying that Hudi is a file format for reading/writing files at scale.

Sneak Peek at the Internals

OK, we added some JSON-like data somewhere and then retrieved it. What’s the big deal? And what really happened? Let’s start by answering the latter question first.

Recall that in the Basic setup section, we have defined a path for saving Hudi data to be /tmp/hudi_population. Let’s take a look at this directory:

$ tree /tmp/hudi_population/
/tmp/hudi_population/
└── continent=europe
└── 31347451-2a16-4660-8d08-322bec706967-0_0-28-34_20221204181530798.parquet
1 directory, 1 file

A single Parquet file has been created under continent=europe subdirectory. There are many more hidden files in the hudi_population directory. These are internal Hudi files. To see them all, type in tree -a /tmp/hudi_population.

There’s also some Hudi-specific information saved in the parquet file. To see the full data frame, type in: showHudiTable(includeHudiColumns=true). You will see Hudi columns containing the commit time and some other information. According to Hudi documentation:

A commit denotes an atomic write of a batch of records into a table.

By executing upsert(), we made a commit to a Hudi table.

Again, if you’re observant, you will notice that our batch of records consisted of two entries, for year=1919 and year=1920, but showHudiTable() is only displaying one record for year=1920. Let’s open the Parquet file using Python and see if the year=1919 record exists.

# Interactive Python session. In /tmp/hudi_population/continent=europe/
>>> import pandas as pd
>>> df=pd.read_parquet('31347451-2a16-4660-8d08-322bec706967-0_0-28-34_20221204181530798.parquet')
>>> df['year']
0 1920
Name: year, dtype: int64
>>>

No, clearly only year=1920 record was saved. Why? What happened to our test data (year=1919)? To explain this, let’s take a look at how writing to Hudi table is configured:

// see 'Basic setup' section for a full code snippet
def upsert(jsonStrings: Array[String], mode: SaveMode = Append) =
df.write.format("hudi").
...
option(RECORDKEY_FIELD_OPT_KEY, "country").
option(PARTITIONPATH_FIELD_OPT_KEY, "partition").
option(PRECOMBINE_FIELD_OPT_KEY, "year").
...

The two attributes which identify a record in Hudi are record key (see: RECORDKEY_FIELD_OPT_KEY) and partition path (see: PARTITIONPATH_FIELD_OPT_KEY). In our configuration, the country is defined as a record key, and partition plays a role of a partition path. The combination of the record key and partition path is called a hoodie key.

If the input batch contains two or more records with the same hoodie key, these are considered the same record. That’s precisely our case:

{
"country": "poland", # record key
"partition": "continent=europe", # partition path
"year": 1919
},
{
"country": "poland", # record key
"partition": "continent=europe", # partition path
"year": 1920
}

To fix this issue, Hudi runs the deduplication step called pre-combining. The PRECOMBINE_FIELD_OPT_KEY option defines a column that is used for the deduplication of records prior to writing to a Hudi table. The pre-combining procedure picks the record with a greater value in the defined field. In our case, this field is the year, so year=2020 is picked over year=1919.

Let’s recap what we have learned so far:

  • Upsert is a combination of insert and update operations when writing data.
  • Record key and partition path uniquely identify a record in Hudi.
  • The combination of the record key and partition path is called hoodie key.
  • A commit atomically writes a batch of records to a Hudi table.
  • For identical pairs of record keys and partition paths within a commit, Hudi pre-combines them during the writing phase and only saves the one with the largest value in a field defined as PRECOMBINE_FIELD_OPT_KEY.
  • Hudi stores metadata in hidden files under the directory of a Hudi table.
  • Hudi stores additional metadata in Parquet files containing the user data.

Copy-on-Write in Practice

Five years later, in 1925, our population-counting office managed to count the population of Spain:

// in spark-shell
upsert(
Array(
"""
{
"year": 1925,
"country": "spain",
"partition": "continent=europe",
"population": 22.1
}
"""
)
)
showHudiTable()

The showHudiTable() function will now display the following:

+-------+---------+----+----------+
|country|partition|year|population|
+-------+---------+----+----------+
| poland| europe|1920| 24.0|
| spain| europe|1925| 22.1|
+-------+---------+----+----------+

On the file system, this translates to a creation of a new file:

$ tree /tmp/hudi_population/
/tmp/hudi_population/
└── continent=europe
├── 31347451-2a16-4660-8d08-322bec706967-0_0-28-34_20221204181530798.parquet
└── 31347451-2a16-4660-8d08-322bec706967-0_0-61-70_20221204201944363.parquet
1 directory, 2 files

The Copy-on-Write storage mode boils down to copying the contents of the previous data to a new Parquet file, along with newly written data. We can show it by opening the new Parquet file in Python:

# in /tmp/hudi_population/continent=europe/
>>> import pandas as pd
>>> df=pd.read_parquet('31347451-2a16-4660-8d08-322bec706967-0_0-61-70_20221204201944363.parquet')
>>> df[['country', 'year', 'population']]
country year population
0 poland 1920 24.0
1 spain 1925 22.1

As we can see, Hudi copied the record for Poland from the previous file and added the record for Spain. Internally, this seemingly simple process is optimized using indexing. Our use case is too simple, and the Parquet files are too small to demonstrate this. Imagine that there are millions of European countries, and Hudi stores a complete list of them in many Parquet files. Thanks to indexing, Hudi can better decide which files to rewrite without listing them.

File Layout Explained

For a few times now, we have seen how Hudi lays out the data on the file system. Let’s explain, using a quote from Hudi’s documentation, what we’re seeing (words in bold are essential Hudi terms):

The following describes the general file layout structure for Apache Hudi:

- Hudi organizes data tables into a directory structure under a base path on a distributed file system;

- Tables are broken up into partitions;

- Within each partition, files are organized into file groups, uniquely identified by a file ID;

- Each file group contains several file slices

- Each file slice contains a base file (.parquet) produced at a certain commit […]

These concepts correspond to our directory structure, as presented in the below diagram. The .hoodie directory is hidden from out listings, but you can view it with the following command: tree -a /tmp/hudi_population.

Hudi file layout and essential Hudi terms.

Hudi controls the number of file groups under a single partition according to the hoodie.parquet.max.file.size option. Once a single Parquet file is too large, Hudi creates a second file group.

Make Use of Partitions

Another mechanism that limits the number of reads and writes is partitioning. Let’s imagine that in 1930 we managed to count the population of Brazil:

// in spark-shell
upsert(
Array(
"""
{
"year": 1930,
"country": "brazil",
"partition": "continent=south_america",
"population": 33.6
}
"""
)
)
showHudiTable()

Our Hudi table now looks like this:

+-------+-------------+----+----------+
|country| partition|year|population|
+-------+-------------+----+----------+
| brazil|south_america|1930| 33.6|
| poland| europe|1920| 24.0|
| spain| europe|1925| 22.1|
+-------+-------------+----+----------+

Which translates to the following on disk:

$ tree /tmp/hudi_population/
/tmp/hudi_population/
├── continent=europe
│ ├── 31347451-2a16-4660-8d08-322bec706967-0_0-28-34_20221204181530798.parquet
│ └── 31347451-2a16-4660-8d08-322bec706967-0_0-61-70_20221204201944363.parquet
└── continent=south_america
└── b71af367-2b92-4b66-adc9-8e0bcd88a975-0_0-104-117_20221204204646470.parquet
2 directories, 3 files

Since Brazil’s data is saved to another partition (continent=south_america), the data for Europe is left untouched for this upsert.

What About the “Up” in Upsert?

You’re probably getting impatient at this point because none of our interactions with the Hudi table was a proper update. Until now, we were only inserting new records.

Let’s imagine that in 1935 we managed to count the populations of Poland, Brazil, and India. Spain was too hard due to ongoing civil war.

// in spark-shell
upsert(
Array(
"""
{
"year": 1935,
"country": "brazil",
"partition": "continent=south_america",
"population": 37.2
}
""",
"""
{
"year": 1935,
"country": "india",
"partition": "continent=asia",
"population": 296
}
""",
"""
{
"year": 1935,
"country": "poland",
"partition": "continent=europe",
"population": 30.1
},
"""
)
)
showHudiTable()

The resulting Hudi table looks as follows:

+-------+-------------+----+----------+
|country| partition|year|population|
+-------+-------------+----+----------+
| brazil|south_america|1935| 37.2|
| poland| europe|1935| 30.1|
| spain| europe|1925| 22.1|
| india| asia|1935| 296.0|
+-------+-------------+----+----------+

Now, we can see that:

  • The year and population for Brazil and Poland were updated (updates).
  • Data for Spain wasn’t updated (copy).
  • Data for India was added for the first time (insert).

To put it metaphorically, look at the image below.

Photo by Nana Smirnova on Unsplash

Your old school Spark job takes all the boxes off the shelf just to put something to a few of them and then puts them all back. With Hudi, your Spark job knows which packages to pick up. All the other boxes can stay in their place.

Use the Timeline

Hudi represents each of our commits as a separate Parquet file(s). It may seem wasteful, but together with all the metadata, Hudi builds a timeline. It is possible to time-travel and view our data at various time instants using a timeline.

To quickly access the instant times, we have defined the storeLatestCommitTime() function in the Basic setup section. We are using it under the hood to collect the instant times (i.e., the commit times). That’s why it’s important to execute showHudiTable() function after each call to upsert(). Let’s see the collected commit times:

scala> print(commitTimes)
ListBuffer(20221204201944363, 20221204204646470, 20221204205836423, 20221204210042289)

Let’s see what was the state of our Hudi table at each of the commit times by utilizing the “as.of.instant” option:

// in spark-shell
commitTimes.forEach(
commitTime => spark.
read.
format("hudi").
option("as.of.instant", commitTime).
load(basePath).
select("country", "year", "population")
.show
)

And the output is:

+-------+----+----------+ 
|country|year|population|
+-------+----+----------+
| poland|1920| 24|
+-------+----+----------+
+-------+----+----------+
|country|year|population|
+-------+----+----------+
| poland|1920| 24.0|
| spain|1925| 22.1|
+-------+----+----------+
+-------+----+----------+
|country|year|population|
+-------+----+----------+
| brazil|1930| 33.6|
| poland|1920| 24.0|
| spain|1925| 22.1|
+-------+----+----------+
+-------+----+----------+
|country|year|population|
+-------+----+----------+
| brazil|1935| 37.2|
| poland|1935| 30.1|
| spain|1925| 22.1|
| india|1935| 296.0|
+-------+----+----------+

That’s it. That’s how our data was changing over time!

Let’s recap what we have learned in the second part of this tutorial:

  • In Copy-on-Write storage mode, Hudi copies existing unaffected data from a file slice to a new Parquet file, to which it is upserting new data.
  • The Copy-on-Write procedure uses several optimizations like pre-combining or indexing.
  • The directory structure maps nicely to various Hudi terms like file group or file slice.
  • Another optimization is partitioning, as it limits the number of rewritten files only to those belonging to a partition affected by a particular write operation.
  • Using commit times, it is possible to travel through Hudi’s timeline and inspect how the data was changing.

Summary

In this tutorial, we:

  • Introduced Hudi’s Copy-on-Write storage mode.
  • Showed how Hudi stores the data on disk in a Hudi table.
  • Explained how records are inserted, updated, and copied to form new commits.
  • Mentioned some optimizations like partitioning, pre-combining, and indexing.
  • Presented how Hudi timeline can be used to view past states of the Hudi table.

That’s a lot, but let’s not get the wrong impression here. We’re not Hudi gurus yet. This tutorial didn’t even mention things like:

  • Merge-on-Read storage mode and data streaming / CDC use cases.
  • Deleting the data.
  • Schema evolution.
  • Concurrency control.
  • Indexing techniques.

Let’s not get upset, though. Regardless of the omitted Hudi features, you are now ready to rewrite your cumbersome Spark jobs!

Thanks for reading! Let me know if you would like a similar tutorial covering the Merge-on-Read storage type.

Resources

  • Hudi’s official Spark Guide — my inspiration to write this piece was this tutorial. It’s comprehensive, and you can learn a lot from it (including things not covered here), but it’s hard to build basic intuitions based on the proposed example.
  • Open Table Formats — Delta, Iceberg & Hudi — short comparison of Hudi to other table file formats.
  • Apache Hudi — The Data Lake Platform — to simplify things, I stated that Hudi is just a way to read/write files. This blog post does justice to the more complete view, in which Hudi is a platform for managing data in Data Lakes.
  • Apache HUDI — A beginner’s guide — the ideas implemented in Hudi are transferred straight from the world of databases. This blog post describes the relation between Hudi’s features and relational databases and no-SQL databases.

--

--