Open Table Formats — Delta, Iceberg & Hudi
Introductory post on new table formats
What is a Table Format?
Table formats are a way to organize data files. They try to bring database-like features to the Data lake.
Evolution of Table file formats:
Relational Table Format → Directory-oriented Table Formats → Log-oriented Table Format → Unified Open Table Format
Apache Hive is one of the earliest and most used table formats.
Different table formats store the metadata differently. Apache Hive took the directory-based approach. The hive table format suffers from the following issues:
Stale table statistics(partition lifecycle)
The compulsion for user to know physical data layout
Lack of lineage/History
Lack of schema evolution
Hive was written pre-cloud era. It was not written for Object storage (data organization structure of hive is a kind of anti-pattern of object storage). Hence it started to take performance hits as and when new usage patterns emerged in the cloud. Metadata tables in Hive grow rapidly, slowing down its performance.
With the growth of the data domain, new use cases came that demanded a new set of capabilities from the current data lake. Mainly the new system demanded the following as features for the data lake:
Transactions (ACID)
Unified Batch & streaming
Data mutation (Merge/correction option for late data arrival)
Schema enforcement, evolution & versioning
Metadata scaling
Along with this the new use case also needs to be supported:
Time Travel
Concurrent read & write
Independent consumption from storage
Data quality
Pluggable storage
To address these, communities started creating new open table formats. In this blog we will discuss the following three:
Delta table — Databricks
Iceberg — Netflix
Hudi — Uber
Delta Table
Delta tables are parquet tables with a transactional log.
Delta Log — It is a changelog of all the actions performed on the delta table. These actions are namely 1. Add file 2. Remove file 3. Update metadata 4. Set transaction 5. Change protocol and 6. Commit info. The delta log contains JSO files which store that reflect the state of the delta table at any given time.
Data Files — Parquet files that hold the actual data. These are immutable in nature. If in a commit, the row(s) of the file is going to be impacted then all the rows in the current file are loaded to the memory, updates are applied, and written out as a new file. After this, the commit is recorded in the delta log as the older parquet file is removed from the state and the newer parquet file is added to the state.
This table format makes a great choice if you are invested in Databrick's offering.
from pyspark.sql import SparkSession
from delta import DeltaTable
spark = SparkSession.builder \
.appName("Delta with PySpark") \
.config('spark.jars.packages', 'io.delta:delta-core_2.12:2.1.1') \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config(
"spark.sql.catalog.spark_catalog",
"org.apache.spark.sql.delta.catalog.DeltaCatalog",
) \
.config("spark.sql.warehouse.dir", "spark-warehouse") \
.master("local[*]") \
.enableHiveSupport() \
.getOrCreate()
df_sales = spark.read.parquet("dataset/sales.parquet/*parquet")
df_sales.write \
.format("delta") \
.mode("overwrite") \
.option("mergeSchema", True) \
.saveAsTable("sales_delta_managed_delta")
dt = DeltaTable.forName(spark, "sales_delta_managed")
dt.history().select("version", "timestamp").show(truncate=False)
%%sparksql
select * from default.sales_delta_managed limit 5;
update default.sales_delta_managed set amount = 428 where trx_id = 123
dt.history().select("version", "timestamp").show(truncate=False)
Iceberg
In Iceberg metadata is tracked in three files: 1. Metadata files 2. Manifest list 3. Manifest files.
Metadata.json
This file contains info about the metadata of the table. It has follwing subsections:
- Snapshot — It is a complete list of all files in the table for the snapshot. It contains information about table schema, partition spec & location of the manifest list.
- Schemas — It tracks the table schema. All table schema changes are tracked in the schemas array.
- partition-specs — track information about the partition.
- sort-orders
Key Components of a Manifest File
Here are some of the critical fields you’ll find inside a manifest file:
file_path
: This field records the location of the data file in the storage system. It is a string that provides the full path to the file, ensuring that Iceberg can quickly locate the data file when needed.partition_data
: This field contains information about the partition values for the data in the file. Partitioning is a critical aspect of Iceberg's architecture, as it allows the data to be organized to make it easier to filter and query efficiently. Thepartition_data
field ensures that Iceberg can apply the correct partitioning logic during query execution.file_format
: This field specifies the format of the data file (e.g., Parquet, Avro, ORC). Knowing the format is crucial because it determines how Iceberg reads and writes the file, as well as how it can optimize queries against the data.record_count
: Therecord_count
field indicates the number of records contained in the data file. This metadata helps query engines estimate the size of the data set and make decisions about how to optimize query execution.file_size_in_bytes
: This field provides the total size of the data file in bytes. Like therecord_count
, the file size is an essential metric for understanding the scale of the data and for planning efficient scans.value_counts
,null_value_counts
,nan_value_counts
: These fields are metrics that provide detailed statistics about the data in the file.value_counts
gives the total number of values in each column,null_value_counts
tracks the number of null values, andnan_value_counts
records the number of NaN (Not a Number) values. These metrics are invaluable for query optimization, as they help the query engine determine whether a file should be scanned based on the presence or absence of relevant data.lower_bounds
andupper_bounds
: These fields store the minimum and maximum values for each column in the data file. Query engines use these bounds to perform min/max pruning—skipping over data files that do not match the query’s filter criteria. For example, if a query is looking for data within a specific date range, and thelower_bounds
andupper_bounds
of a data file fall outside, the query engine can skip reading that file entirely.
Manifest list —The Manifest List acts as an index that tracks all the manifest files associated with a particular snapshot, summarizing their contents and providing a high-level view of the dataset. This File contains all the manifest and their metrics (range of values for which the manifest file spans for the partition column). Acts as a link between manifest & snapshot. Each entry in the manifest list has the following fields:
- manifest_path — location
- manifest_length — file length in bytes
- content — flag where
0=data
and1=deletes
- partitions — array of field summaries e.g. nullable, upper, lower
Data File — Actual physical file.
Delete Files — Delete files track records that have been deleted as part of “merge-on-read” updates. During queries, the engine reconciles these files with the base data, ensuring that deleted records are ignored.
Puffin Files — Puffin files are a format for tracking binary blobs and other metadata, designed to optimize queries for engines that choose to leverage them.
Partition Stats Files — These files summarize statistics at the partition level, enabling even greater optimization for queries that rely on partitioning.
Each Iceberg table snapshot is associated with a manifest list. It contains an array of structs. Each array’s element keeps track of a single manifest file and includes information such as:
- The manifest file’s location
- The partition this manifest file belongs to
- The upper and lower bounds of the non-null partition field values are calculated across the data files tracked by this manifest file.
Iceberg will help in performance issues related to S3 object listing or Hive Metastore partition enumeration.
Apache Iceberg attempts to create a portable format layer with spec and catalog replacement for Hive.
Note: There is a plugin called Hiveberg that allows us to read the Iceberg table from Hive Metastore.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Icerberg with PySpark") \
.config("spark.jars.packages", "org.apache.iceberg:iceberg-spark-runtime-3.2_2.12:1.0.0") \
.config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
.config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkSessionCatalog",) \
.config("spark.sql.warehouse.dir", "spark-warehouse") \
.config("spark.sql.catalog.spark_catalog.type", "hive") \
.config("spark.sql.catalog.demo", "org.apache.iceberg.spark.SparkCatalog") \
.config("spark.sql.catalog.demo.warehouse", "path/to/warehouse")
.config("spark.sql.catalog.iceberg.type", "hadoop") \
.config("spark.sql.defaultCatalog", "demo")
.master("local[*]") \
.enableHiveSupport() \
.getOrCreate()
df_sales = spark.read.parquet("dataset/sales.parquet/*parquet")
df_sales.write \
.format("iceberg") \
.mode("overwrite") \
.saveAsTable("sales_delta_managed_iceberg")
Configurations to work with Nessie
org.projectnessie:nessie-spark-extensions-3.3_2.12:0.44.0 # Library for working with Nessie-based catalogs like Dremio Arctic
org.projectnessie.spark.extensions.NessieSparkSessionExtensions
spark.sql.extensions="org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions" \
spark.sql.catalog.nessie.uri=$ARCTIC_URI \
spark.sql.catalog.nessie.ref=main \
spark.sql.catalog.nessie.authentication.type=BEARER \
spark.sql.catalog.nessie.authentication.token=$TOKEN \
spark.sql.catalog.nessie.catalog-impl=org.apache.iceberg.nessie.NessieCatalog \
spark.sql.catalog.nessie.warehouse=$WAREHOUSE \
spark.sql.catalog.nessie=org.apache.iceberg.spark.SparkCatalog \
spark.sql.catalog.nessie.io-impl=org.apache.iceberg.aws.s3.S3FileIO
Iceberg with Python
pip install -U "pyiceberg[pyarrow]"
pyiceberg --help
# There are three ways to configure PyIceberg:
# - Using the ~/.pyiceberg.yaml configuration file
# - Through environment variables
# - By passing in credentials through the CLI or the Python API
Content of pyiceberg.yaml
catalog:
sandbox:
type: sql
uri: sqlite:////tmp/warehouse/pyiceberg_catalog.db
warehouse: file:///tmp/warehouse
CLI Commands
pyiceberg list
pyiceberg --catalog sandbox list
pyiceberg --catalog sandbox create namespace quickstart
Python code
from pyiceberg.catalog import load_catalog
catalog = load_catalog('sandbox')
import pyarrow.parquet as pq
df = pq.read_table("/tmp/yellow_tripdata_2023-01.parquet")
table = catalog.create_table(
"quickstart.taxi_dataset",
schema=df.schema,
)
table.append(df)
len(table.scan().to_arrow())
File formats in Iceberg
data — parquet
Metadata — JSON
Manifest List — avro
Manifest — avro
Iceberg ACID support:
In Iceberg, the writer initiates the update, creates the metadata files, and tries to commit the update by swapping the metadata file pointer from the current version to the new version. However, if the writer finds that the snapshot on which the update is based is no longer current, the writer must retry the update based on the new version.
An atomic swap of one table metadata file for another provides the basis for serializable isolation.
Reads are isolated from concurrent writes and always use a committed snapshot of a table’s data.
Iceberg Versioning support:
- Table Level Versioning: Iceberg’s supports branching versions of a table. This capability allows for isolated work and experimentation on a single table.
- Catalog Level Versioning: Nessie, introduces versioning at the catalog level, enabling branching and tagging semantics across multiple tables.
Read & Write paths of Iceberg
Write Path
- The writer writes new data files into storage.
- Creates new metadata files with updated file manifests
- Validates for conflicts with existing metadata
- Commits the new snapshot to the catalog atomically
Writers never modify existing data files. New data is invisible until the commit is complete.
Read Path
Readers continue using their current snapshot during writes. No partial or uncommitted data is ever visible to readers
- Readers only see data through the catalog
- Catalog points to the latest committed metadata file
- Metadata file contains references to valid data files
- This ensures readers only see consistent, committed data
Conflict resolution during write
The Conflict detection happens at commit time, and only one writer can successfully commit at a time. Failed commits will be retried, and don’t affect readers or data consistency.
Hudi
Hudi uses four file types. The base files store the table’s records. This is in parquet format (column-oriented). This is done to optimize data reading. The log files capture changes to records on top of the associated Base File. These files are in Avro format (row-oriented). This is done to optimize reads. Commit files record metadata about each commit operation, including the timestamp and file changes. Tombstone files mark deleted records.
Index — mapping between record key and file group/file id
Timeline — event sequence of all actions performed on the table at different instants.
Datafiles — Actual data file in parquet format.
Primary key — Each record in a Hudi table has a unique identifier called a primary key, consisting of a pair of record keys and the partition’s location to which the record belongs.
Table Types:
Hudi supports two major table types based on the type of workload:
- Copy on Write (CoW): This approach stores data as columnar files (e.g., Parquet) and, during updates, the file is rewritten completely. It is optimized for read-heavy workloads but can be slower for frequent updates due to the need to rewrite files.
- Merge on Read (MoR): This approach stores base data as columnar files (e.g., Parquet), but any changes (updates, deletes) are stored as log files (e.g., Avro). During reads, the base and log files are merged on the fly. MoR is optimized for write-heavy or incremental updates workloads, as it allows more efficient writing while delaying the full merge.
Hudi supports the following read types:
- Snapshot: The queries will see the latest snapshot of the table.
- Time Travel: The queries will read a snapshot of the past.
- Read Optimized: The queries will read the snapshot using only the columnar files.
- Incremental (Latest State): The queries only return new data written since an instant on the timeline.
- Incremental (CDC): This provides database-like change data capture log streams out of Hudi tables.
Hudi supports the following write types:
- Upsert: Hudi first looks up the index to check whether the record is tagged as inserts (new) or updates (existing).
- Insert: This is similar to Upsert, except the index-look-up step.
- Delete: Hudi supports two types of deletes on Hudi table data. Based on the record key, Hudi can soft delete where it retains only the record key and fills null for all the other fields. The other approach is hard delete, which entirely clears all evidence of a record.
- Bulk Insert: Insert or Upsert keeps data in the memory and then implements a sort-based data writing.
- Insert Overwrite: Hudi will rewrite all the partitions that are present in the input.
- Insert Overwrite Table: Hudi will rewrite the whole table.
Key Components
Delta Streamer
Hudi’s ingestion engine, the Delta Streamer, enables incremental ingestion of data into Hudi tables. It is responsible for handling:
- Data Ingestion: Delta Streamer can pull data from multiple sources like Kafka or databases, allowing Hudi to ingest real-time or batch data efficiently.
- Upserts and Deletes: Hudi handles both inserts and updates (upserts), and can delete old or obsolete data, enabling mutability in data lakes.
Delta Streamer supports event-based ingestion and ensures data freshness by applying incremental changes rather than reprocessing the entire dataset.
Write Client
It is responsible for all write operations, including inserts, updates, and deletes. It supports two types of operations:
- Bulk Inserts: Ingests a large volume of data into the table without any existing data constraints.
- Upserts/Deletes: Handles updates or deletes to specific records, which requires maintaining an index of records.
Indexing Layer
The indexing layer enables Hudi to efficiently track data and apply updates or deletes at a granular level.
- Bloom Filter Index: Tracks whether a record exists in a file using Bloom filters.
- Global and Partitioned Index: Hudi supports both global indexes (which track records across the entire dataset) and partitioned indexes (which track records within specific partitions). This index helps Hudi efficiently locate and update specific records.
Hudi’s index ensures that updates are applied efficiently without needing to scan the entire dataset, which is key for maintaining performance when dealing with large datasets.
Timeline Server
Hudi uses the Timeline Server to track the lifecycle of each write operation. It records the status of the data at various stages (commits, cleans, compactions, rollbacks, etc.). This enables:
- Time Travel Queries: The ability to query previous versions of the data by reading from older commit snapshots.
- Incremental Processing: Efficiently retrieving only changed data (deltas) since the last snapshot, which is particularly useful in streaming scenarios.
Table Services
Table services handles background maintenance tasks like compaction, cleaning, and archival.
- Compaction merges log files (Merge on read tables) with the base data files periodically to improve query performance. It balances the trade-off between write performance (by not updating base files immediately) and read performance (by eventually merging base and log files).
- Hudi provides automatic cleaning of old versions of data files and log files to reduce storage needs.
- Old metadata is archived into a more compact format.
Query Layer
Hudi supports a variety of query engines like Apache Spark, Apache Hive and, Presto/Trino.
This seems a good choice if you have varied consumption tools of data lake files and there is a need to manage mutating datasets.
from pyspark.sql import SparkSession
spark = SparkSession.builder \
.appName("Hudiwith PySpark") \
.config("spark.jars.packages", "org.apache.hudi:hudi-spark3.2-bundle_2.12:0.11.1") \
.config("spark.sql.extensions", "org.apache.spark.sql.hudi.HoodieSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.hudi.catalog.HoodieCatalog",) \
.config("spark.sql.warehouse.dir", "spark-warehouse") \
.config("spark.sql.catalog.spark_catalog.type", "hive") \
.master("local[*]") \
.enableHiveSupport() \
.getOrCreate()
df_sales = spark.read.parquet("dataset/sales.parquet/*parquet")
df_sales.write \
.format("hudi") \
.mode("overwrite") \
.saveAsTable("sales_delta_managed_hudi")
df = spark.read. \
format("hudi"). \
option("as.of.instant", "2021-07-28"). \
load(basePath)
df_sales.write.format("hudi"). \
options(**hudi_options). \
mode("append"). \
saveAsTable("sales_delta_managed_hudi")
# create streaming df
df = spark.readStream \
.format("hudi") \
.load(basePath)
# write stream to new hudi table
df.writeStream.format("hudi") \
.options(**hudi_streaming_options) \
.outputMode("append") \
.option("path", baseStreamingPath) \
.option("checkpointLocation", checkpointLocation) \
.trigger(once=True) \
.start()
Summary
Delta Lake is ideal for data lakes and data pipelines, Iceberg is best suited for data warehousing and analytics, while Hudi excels in its intended use cases of real-time data processing and streaming analytics.
Note: There are projects like Delta UniForm or XTable that are trying to bring interoperability between Delta Lake, Apache Hudi, and Apache Iceberg. Onetable’s approach is have same data to be queryable from different system.
For example, expose tables ingested with Hudi as an Iceberg and/or Delta Lake table without requiring a user to copy or move the underlying data files used for that table while maintaining a similar commit history to enable proper point-in-time queries.
Open table formats are simply wrappers around our data store. It commonly uses a series of files to
1. Track schema/partition (DDL) changes on your table.
2. Track table’s data files & their column statistics.
3. Track all the Inserts/Updates/Deletes (DML) on your table.
git clone https://github.com/apache/incubator-xtable.git
cd incubator-xtable
mvn clean package
---
sourceFormat: HUDI
targetFormats:
- DELTA
- ICEBERG
datasets:
-
tableBasePath: s3://tpc-ds-datasets/1GB/hudi/call_center
tableDataPath: s3://tpc-ds-datasets/1GB/hudi/call_center/data
tableName: call_center
namespace: my.db
-
tableBasePath: s3://tpc-ds-datasets/1GB/hudi/catalog_sales
tableName: catalog_sales
partitionSpec: cs_sold_date_sk:VALUE
-
tableBasePath: s3://hudi/multi-partition-dataset
tableName: multi_partition_dataset
partitionSpec: time_millis:DAY:yyyy-MM-dd,type:VALUE
-
tableBasePath: abfs://container@storage.dfs.core.windows.net/multi-partition-dataset
tableName: multi_partition_dataset
---
tableFormatConverters:
HUDI:
conversionSourceProviderClass: org.apache.xtable.hudi.HudiConversionSourceProvider
DELTA:
conversionTargetProviderClass: org.apache.xtable.delta.DeltaConversionTarget
configuration:
spark.master: local[2]
spark.app.name: xtable
---
catalogImpl: io.my.CatalogImpl
catalogName: name
catalogOptions: # all other options are passed through in a map
key1: value1
key2: value2
---
java -jar xtable-utilities/target/xtable-utilities-0.1.0-SNAPSHOT-bundled.jar --datasetConfig my_config.yaml [--hadoopConfig hdfs-site.xml] [--convertersConfig converters.yaml] [--icebergCatalogConfig catalog.yaml]
Xcatalog (a feature of XTable) focuses on synchronizing metadata across different catalogs (HMS, Glue, Unity(WIP), Polaris(WIP)), enhancing interoperability and flexibility in data lakehouse environments.
Happy Reading!!