Open Table Formats — Delta, Iceberg & Hudi

Amit Singh Rathore
Geek Culture
Published in
7 min readJun 2, 2022

--

Introductory post on new table formats

What is a Table Format?

Table formats are a way to organize data files. They try to bring database-like features to the Data lake.

Apache Hive is one of the earliest and most used table formats.

Hive Table format

The hive table format suffers from the following issues:

Stale table statistics(partition lifecycle)
The compulsion for user to know physical data layout
Lack of lineage/History
Lack of schema evolution

Hive was written pre-cloud era. It was not written for Object storage (data organization structure of hive is a kind of anti-pattern of object storage). Hence it started to take performance hits as and when new usage patterns emerged in the cloud. Metadata tables in Hive grow rapidly, slowing down its performance.

With the growth of the data domain, new use cases came that demanded a new set of capabilities from the current data lake. Mainly the new system demanded the following as features for the data lake:

Transactions (ACID)
Unified Batch & streaming
Data mutation (Merge/correction option for late data arrival)
Schema enforcement, evolution & versioning
Metadata scaling

Along with this the new use case also needs to be supported:

Time Travel
Concurrent read & write
Independent consumption from storage
Data quality
Pluggable storage

To address these, communities started creating new open table formats. In this blog we will discuss the following three:

Delta table — Databricks
Iceberg — Netflix
Hudi — Uber

Delta Table

Delta tables are parquet tables with a transactional log.

Delta table format representation

Delta Log — It is a changelog of all the actions performed on the delta table. These actions are namely 1. Add file 2. Remove file 3. Update metadata 4. Set transaction 5. Change protocol and 6. Commit info. The delta log contains JSO files which store that reflect the state of the delta table at any given time.

Data Files — Parquet files that hold the actual data. These are immutable in nature. If in a commit, the row(s) of the file is going to be impacted then all the rows in the current file are loaded to the memory, updates are applied, and written out as a new file. After this, the commit is recorded in the delta log as the older parquet file is removed from the state and the newer parquet file is added to the state.

This table format makes a great choice if you are invested in Databrick's offering.

from pyspark.sql import SparkSession
from delta import DeltaTable

spark = SparkSession.builder \
.appName("Delta with PySpark") \
.config('spark.jars.packages', 'io.delta:delta-core_2.12:2.1.1') \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
) \
.config("spark.sql.warehouse.dir", "spark-warehouse") \
.master("local[*]") \
.enableHiveSupport() \
.getOrCreate()

df_sales = spark.read.parquet("dataset/sales.parquet/*parquet")
df_sales.write \
.format("delta") \
.mode("overwrite") \
.option("mergeSchema", True) \
.saveAsTable("sales_delta_managed_delta")


dt = DeltaTable.forName(spark, "sales_delta_managed")
dt.history().select("version", "timestamp").show(truncate=False)

%%sparksql
select * from default.sales_delta_managed limit 5;
update default.sales_delta_managed set amount = 428 where trx_id = 123

dt.history().select("version", "timestamp").show(truncate=False)

Iceberg

In Iceberg metadata is tracked in three files: 1. Metadata files 2. Manifest list 3. Manifest files.

Metadata.json

This file contains info about the metadata of the table. It has follwing subsections:

  • Snapshot — It is a complete list of all files in the table for the snapshot. It contains information about table schema, partition spec & location of the manifest list.
  • Schemas — It tracks the table schema. All table schema changes are tracked in the schemas array.
  • partition-specs — track information about the partition.
  • sort-orders

Manifest list — File that contains all the manifest and their metrics(range of values for which the manifest file spans for the partition column). Acts as a link between manifest & snapshot.

Manifest — List of data files, their format, location, and metrics (per column bounds for the file or file level like row count)

Data File — Actual physical file.

Iceberg will help in performance issues related to S3 object listing or Hive Metastore partition enumeration.

Apache Iceberg attempts to create a portable format layer with spec and catalog replacement for Hive.

Note: There is a plugin called Hiveberg that allows us to read the Iceberg table from Hive Metastore.

from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName("Icerberg with PySpark") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.0.0") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog",) \
.config("spark.sql.warehouse.dir", "spark-warehouse") \
.config("spark.sql.catalog.spark_catalog.type", "hive") \
.config("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.demo.warehouse", "path/to/warehouse")
.config("spark.sql.catalog.iceberg.type", "hadoop") \
.config("spark.sql.defaultCatalog", "demo")
.master("local[*]") \
.enableHiveSupport() \
.getOrCreate()

df_sales = spark.read.parquet("dataset/sales.parquet/*parquet")
df_sales.write \
.format("iceberg") \
.mode("overwrite") \
.saveAsTable("sales_delta_managed_iceberg")

Configurations to work with Nessie

org.projectnessie:nessie-spark-extensions-3.3_2.12:0.44.0 # Library for working with Nessie-based catalogs like Dremio Arctic
org.projectnessie.spark.extensions.NessieSparkSessionExtensions
spark.sql.extensions="org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions" \
spark.sql.catalog.nessie.uri=$ARCTIC_URI \
spark.sql.catalog.nessie.ref=main \
spark.sql.catalog.nessie.authentication.type=BEARER \
spark.sql.catalog.nessie.authentication.token=$TOKEN \
spark.sql.catalog.nessie.catalog-impl=org.apache.iceberg.nessie.NessieCatalog \
spark.sql.catalog.nessie.warehouse=$WAREHOUSE \
spark.sql.catalog.nessie=org.apache.iceberg.spark.SparkCatalog \
spark.sql.catalog.nessie.io-impl=org.apache.iceberg.aws.s3.S3FileIO

File formats in Iceberg

data — parquet
Metadata — JSON
Manifest List — avro
Manifest — avro

Iceberg ACID support:

In Iceberg, the writer initiates the update, creates the metadata files, and tries to commit the update by swapping the metadata file pointer from the current version to the new version. However, if the writer finds that the snapshot on which the update is based is no longer current, the writer must retry the update based on the new version.

An atomic swap of one table metadata file for another provides the basis for serializable isolation.

Reads are isolated from concurrent writes and always use a committed snapshot of a table’s data.

Iceberg Versioning support:

  • Table Level Versioning: Iceberg’s supports branching versions of a table. This capability allows for isolated work and experimentation on a single table.
  • Catalog Level Versioning: Nessie, introduces versioning at the catalog level, enabling branching and tagging semantics across multiple tables.

Hudi

Index — mapping between record key and file group/file id

Timeline — event sequence of all actions performed on the table at different instants.

Datafiles — Actual data file in parquet format.

This seems a good choice if you have varied consumption tools of data lake files and there is a need to manage mutating datasets.

from pyspark.sql import SparkSession

spark = SparkSession.builder \
.appName("Hudiwith PySpark") \
.config("spark.jars.packages", "org.apache.hudi:hudi-spark3.2-bundle_2.12:0.11.1") \
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog",) \
.config("spark.sql.warehouse.dir", "spark-warehouse") \
.config("spark.sql.catalog.spark_catalog.type", "hive") \
.master("local[*]") \
.enableHiveSupport() \
.getOrCreate()

df_sales = spark.read.parquet("dataset/sales.parquet/*parquet")
df_sales.write \
.format("hudi") \
.mode("overwrite") \
.saveAsTable("sales_delta_managed_hudi")

df = spark.read. \
format("hudi"). \
option("as.of.instant", "2021-07-28"). \
load(basePath)

df_sales.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
saveAsTable("sales_delta_managed_hudi")


# create streaming df
df = spark.readStream \
.format("hudi") \
.load(basePath)

# write stream to new hudi table
df.writeStream.format("hudi") \
.options(**hudi_streaming_options) \
.outputMode("append") \
.option("path", baseStreamingPath) \
.option("checkpointLocation", checkpointLocation) \
.trigger(once=True) \
.start()

Summary

Delta Lake is ideal for data lakes and data pipelines, Iceberg is best suited for data warehousing and analytics, while Hudi excels in its intended use cases of real-time data processing and streaming analytics.

Note: There are projects like Delta UniForm or XTable that are trying to bring interoperability between Delta Lake, Apache Hudi, and Apache Iceberg. Onetable’s approach is have same data to be queryable from different system.

For example, expose tables ingested with Hudi as an Iceberg and/or Delta Lake table without requiring a user to copy or move the underlying data files used for that table while maintaining a similar commit history to enable proper point-in-time queries.

Open table formats are simply wrappers around our data store. It commonly uses a series of files to

1. Track schema/partition (DDL) changes on your table.
2. Track table’s data files & their column statistics.
3. Track all the Inserts/Updates/Deletes (DML) on your table.

git clone https://github.com/apache/incubator-xtable.git
cd incubator-xtable
mvn clean package

---
sourceFormat: HUDI
targetFormats:
- DELTA
- ICEBERG
datasets:
-
tableBasePath: s3://tpc-ds-datasets/1GB/hudi/call_center
tableDataPath: s3://tpc-ds-datasets/1GB/hudi/call_center/data
tableName: call_center
namespace: my.db
-
tableBasePath: s3://tpc-ds-datasets/1GB/hudi/catalog_sales
tableName: catalog_sales
partitionSpec: cs_sold_date_sk:VALUE
-
tableBasePath: s3://hudi/multi-partition-dataset
tableName: multi_partition_dataset
partitionSpec: time_millis:DAY:yyyy-MM-dd,type:VALUE
-
tableBasePath: abfs://container@storage.dfs.core.windows.net/multi-partition-dataset
tableName: multi_partition_dataset
---
tableFormatConverters:
HUDI:
conversionSourceProviderClass: org.apache.xtable.hudi.HudiConversionSourceProvider
DELTA:
conversionTargetProviderClass: org.apache.xtable.delta.DeltaConversionTarget
configuration:
spark.master: local[2]
spark.app.name: xtable
---
catalogImpl: io.my.CatalogImpl
catalogName: name
catalogOptions: # all other options are passed through in a map
key1: value1
key2: value2
---
java -jar xtable-utilities/target/xtable-utilities-0.1.0-SNAPSHOT-bundled.jar --datasetConfig my_config.yaml [--hadoopConfig hdfs-site.xml] [--convertersConfig converters.yaml] [--icebergCatalogConfig catalog.yaml]

Happy Reading!!

--

--

Amit Singh Rathore
Geek Culture

Staff Data Engineer @ Visa — Writes about Cloud | Big Data | ML