EXPEDIA GROUP TECHNOLOGY — DATA
A Short Introduction to Apache Iceberg
The data lake team at Expedia Group starts working with table formats, adds Hive Metastore support to Apache Iceberg
Table formats have slowly been stealing the spotlight across the big data space as projects like Apache Hudi, Delta Lake and Apache Iceberg mature and disrupt the tried-and-tested legacy data lake technologies in use at most companies worldwide.
Expedia Group™ is one company making that transition, with a data lake backed by S3 that relies on Hive’s Metastore for storing schemas and locations of data in a consistent manner. The data lake wasn’t built to be “cloud native” as it was originally developed to work on on-premise Hadoop. Because of this, a number of workarounds have been implemented (see projects like Waggle Dance, Circus Train and Beekeeper) which increase complexity for maintenance and use of the data lake.
However, newer data lake technologies such as the table formats mentioned above offer alternative solutions that solve a lot of the underlying issues with current data lake implementations. They have been built to take advantage of the cloud and also provide more advanced features such as ACID transactions, time travel or point-in-time queries and partition spec evolution.
The data lake team at Expedia Group was tasked with researching these new technologies to determine if we should be adopting them and this blog post summarises information gathered about Apache Iceberg.
What is Apache Iceberg?
Apache Iceberg is an open table format designed for huge, petabyte-scale tables. The function of a table format is to determine how you manage, organise and track all of the files that make up a table. You can think of it as an abstraction layer between your physical data files (written in Parquet or ORC etc.) and how they are structured to form a table.
The project was originally developed at Netflix to solve long-standing issues with their usage of huge, petabyte-scale tables. It was open-sourced in 2018 as an Apache Incubator project and graduated from the incubator on the 19th of May 2020 🎉.
Iceberg has been designed from the ground up to be used in the cloud and a key consideration was solving various data consistency and performance issues that Hive suffers from when used with data located in S3. Hive keeps track of data at the “folder” level (i.e. not the file level) and thus needs to perform file list operations when working with data in a table. This can lead to performance problems when many of these operations need to be executed. There is also the potential for data to appear missing when file list operations are performed on an eventually consistent object store like S3. Iceberg avoids this by keeping track of a complete list of all files within a table using a persistent tree structure. Changes to a table use an atomic object/file level commit to update the path to a new metadata file containing the locations to all the individual data files.
Another advantage of Iceberg tracking individual files rather than folders is that expensive list operations are no longer needed and this leads to performance improvements when performing operations like querying the data in the table. The table state is stored in a number of different metadata files which are represented in the diagram above, and described in more detail below:
- Snapshot metadata file: contains metadata about the table like the table schema, the partition specification as well as a path to the manifest list.
- Manifest list: contains an entry for each manifest file associated with the snapshot. Each entry includes a path to the manifest file and some metadata about the file, including partition stats and data file counts. These stats can be used to avoid reading manifests that aren’t required for an operation.
- Manifest file: contains a list of paths to related data files. Each entry for a data file includes some metadata about the file, such as per-column upper and lower bounds which can be used to prune files during query planning.
- Data file: the physical data file, written in formats like Parquet, ORC, Avro etc.
Benefits of using the table format
Using the snapshot pattern means that Iceberg can guarantee isolated reads and writes. Readers will always see a consistent version of the data (i.e. no ‘dirty reads’) without the need to lock the table. Writers work in isolation, not affecting the live table and will perform a metadata swap only when the write is complete, making the changes in one atomic commit. Use of snapshots also enables time-travel operations as users can perform various operations on different versions of the table by specifying the snapshot to use.
There are huge performance benefits to using Iceberg as well. Instead of listing O(n) partitions in a table during job planning, Iceberg performs an O(1) RPC to read the snapshot. The file pruning and predicate pushdown can also be distributed to jobs so the Hive metastore is no longer a bottleneck. This also removes the barriers to using finer-grained partitioning. The file pruning available due to the statistics stored for each data file also speeds up query planning significantly.
Using the Iceberg Java API
Iceberg has APIs available in Java and Python. This post focuses on the Java API but the examples shown should be possible using Python too.
To create an Iceberg table, you’ll need a schema, a partition spec and a table identifier:
Schema schema = new Schema(
required(1, "hotel_id", Types.LongType.get()),
optional(2, "hotel_name", Types.StringType.get()),
required(3, "customer_id", Types.LongType.get()),
required(4, "arrival_date", Types.DateType.get()),
required(5, "departure_date", Types.DateType.get())
)PartitionSpec spec = PartitionSpec.builderFor(schema)
.identity("hotel_id")
.build();TableIdentifier id = TableIdentifier.parse("bookings.rome_hotels");
To create your table, you have a couple of catalog options:
HadoopCatalog
supports tables that are stored in HDFS or your local file system.HiveCatalog
uses a Hive Metastore to keep track of your Iceberg table by storing a reference to the latest metadata file.
Catalog catalog = new HadoopCatalog(conf, "hdfs://nn:8020/path/to/table");Table table = catalog.createTable(id, schema, spec);--------------------------------------------------------------------Catalog catalog = new HiveCatalog(conf);Table table = catalog.createTable(id, schema, spec);
If you wanted to use Spark instead:
spark = SparkSession.builder().master("local[2]").getOrCreate();Catalog catalog = new HiveCatalog(spark.sparkContext().hadoopConfiguration());Table table = catalog.createTable(id, schema, spec);
To read a table from Spark:
#HiveCatalog
spark.read.format("iceberg").load("bookings.rome_hotels")#HadoopCatalog
spark.read.format("iceberg").load("hdfs://nn:8020/path/to/table")
Features of Iceberg
Schema Evolution
Iceberg has excellent, inbuilt support for schema evolution that provides guarantees against committing breaking changes to your table.
The examples below show usage of the Iceberg API to update a table’s schema in various ways, such as adding or deleting columns:
#Add columns
table.updateSchema()
.addColumn("hotel_geo_id", Types.LongType.get())
.commit();#Delete columns
table.updateSchema()
.deleteColumn("hotel_name")
.commit();#Rename columns
table.updateSchema()
.renameColumn("arrival_date", "check_in_date")
.commit();#Promote int -> long
table.updateSchema()
.updateColumn("price", Types.LongType.get())
.commit();#Promote float -> double
table.updateSchema()
.updateColumn("float", Types.DoubleType.get())
.commit();#Widen decimal precision
table.updateSchema()
.updateColumn("decimal", Types.DecimalType.of(4, 2))
.commit();#Rearrange columns
table.updateSchema()
.moveAfter("customer_id", "hotel_id")
.moveBefore("hotel_name", "hotel_id")
.commit();
Iceberg makes a guarantee that schema changes are independent and free of side-effects. Iceberg uses a unique ID to track each field in a schema, and maps a field name to an ID. This means you can you can change the name of a field but the Iceberg readers underneath will still use the IDs associated with each field.
Partition Evolution
Due to Iceberg’s implementation of hidden partitioning, Iceberg can also offer partition spec evolution as a feature. This means you can change the granularity or column that you are partitioning by without breaking the table. Partition evolution is a metadata operation and does not eagerly rewrite files, so your old data can co-exist in the table with any new data. This is possible because Iceberg implements split planning — Iceberg carries out a query plan for the first set of data using the old spec, and then a second query plan for the second set with the new spec and combines all files afterwards.
In the diagram above, the booking_table
is initially partitioned by month(date)
up until the 2009–01–01 when the partition spec changes to day(date)
. The old data stays as is in the old partition format and all new data coming in is written in the new format. When the example query is run, Iceberg carries out split planning for each partition spec and can filter out partitions under both specifications by applying either the month
or day
transform to the date
column.
Time travel
As explained earlier, Iceberg keeps a log of previous snapshots of the table and this allows time travel queries or table rollbacks to be performed. You can access snapshot log data from Spark:
spark.read
.format("iceberg")
.load("bookings.rome_hotels.snapshots")
.show(truncate = false)
Which will give you results like:
To run a query against an older snapshot:
spark.read
.format("iceberg")
.option("snapshot-id", 1588341995546L) #Using snapshot ID
.load("bookings.rome_hotels")spark.read
.format("iceberg")
.option("as-of-timestamp", "499162860000") #Using timestamp
.load("bookings.rome_hotels")
If you want to rollback your table to an earlier version:
table.rollback()
.toSnapshotId(1588341995546L)
.commit();table.rollback()
.toSnapshotAtTime(499162860000)
.commit();
Query Engine Support
As seen earlier in this blog post, Iceberg has good support for Apache Spark. You can read and write Iceberg tables using Spark DataFrames, and can read using SparkSQL if you create a temporary view of the table.
There is also a Trino connector available that allows you to read and write Iceberg tables using Trino (formerly known as presto-sql). There is limited support for all of the features mentioned above, but there is active development on a to-do list to add these.
Hive Support
Upon starting our research project on Iceberg, we identified a need to have some integration with Hive as most of our data lake uses Hive currently, and big bang migrations of tables are never going to run smoothly. Our intention was to look at the feasibility of moving some data producers over to using the Iceberg table format, while still allowing these tables to be read by other users so as to prevent the newly-produced data from being silo-ed away.
In our initial evaluation of Iceberg there was no support for reading Iceberg tables from Hive, which was an essential use case for us. It was agreed to spend some time attempting to implement an InputFormat
that would allow a read path between Hive and Iceberg, which resulted in the creation of Hiveberg.
We initially created an InputFormat
, SerDe
and a StorageHandler
that could be used to link a Hive table to an existing Iceberg table. We submitted PRs for this work and initiated discussions with various members of the Iceberg and Hive communities to help us to add Hive functionality to Iceberg. Support for reading Iceberg tables from Hive was added in the 0.10.0 release!
Create a table
To read an Iceberg table from Hive, you must “overlay” an existing Iceberg table with a new, linked table in Hive. To do this, you will need the Iceberg Hive runtime jar, which can be added via the Hive shell:
add jar /path/to/iceberg-hive-runtime.jar;
You can link two types of Iceberg tables — tables created using HadoopTables
or created using HiveCatalog
.
HadoopTables
If the underlying Iceberg table uses HadoopTables
then create your table in Hive using Hive DDL like so:
CREATE EXTERNAL TABLE table_a
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://some_bucket/some_path/table_a';
After doing so, you can now query this table as normal. It should be noted that both MapReduce and Tez query execution engines are supported.
SELECT * from table_a;
HiveCatalog
If the underlying Iceberg table was created using HiveCatalog
then the table is already registered with Hive. To set up your Hive table that links to the Iceberg table, you need to set a few configuration options. There are two different options to choose from.
The first requires setting the iceberg.engine.hive.enabled
property to true
in the Hive configuration file on the classpath of the application that is creating the table. This can be done by editing the hive-site.xml
file. This can also be done programmatically:
Configuration hadoopConfiguration = spark.sparkContext().hadoopConfiguration();
hadoopConfiguration.set(ConfigProperties.ENGINE_HIVE_ENABLED, "true"); //iceberg.engine.hive.enabled=true
The other option is to set the engine.hive.enabled
property to true
on the table properties when creating the Iceberg table, which can be done like this:
Map<String, String> tableProperties = new HashMap<String, String>(); tableProperties.put(TableProperties.ENGINE_HIVE_ENABLED, "true"); //engine.hive.enabled=true
Querying the table
To query the table, one more property needs to be set in Hive:
SET iceberg.mr.catalog=hive;
After this, you should be able to query the table normally, such as:
SELECT * from table_b;
Predicate pushdown
Predicate pushdown has also been added to the Iceberg release, which means the HiveSQL WHERE
clause gets pushed to the Iceberg TableScan
level as well as the Parquet and ORC readers.
Future Work
Time-travel from Hive
During our initial work at adding Hive support, we created a project called Hiveberg, which was our ‘sandbox’ project to work Hive support features. One of the features that is still in development to make it into an Iceberg release is allowing a user to write ‘time-travel’ queries from Hive. The initial work can be found in Hiveberg.
To perform a time-travel query, the user must specify the specific snapshot ID that they want to run their query against:
SELECT * FROM bookings.rome_hotels WHERE snapshot__id = 1234567890;
However, we quickly realised that in order to make this functionality more user friendly, we needed to include a way for users to access the snapshot metadata of an Iceberg table through Hive. To support this, we added an option to create a system table, which is connected to the original Iceberg data table but contains metadata about each table snapshot.
To do this, create another table within Hive using the Hiveberg library:
CREATE TABLE bookings.rome_hotels__snapshots
STORED BY 'com.expediagroup.hiveberg.IcebergStorageHandler'
LOCATION 'hdfs://nn:8020/path/to/table'
TBLPROPERTIES ('iceberg.catalog'='hadoop.catalog')
Please note that it is important to use the same name as your original table plus the suffix __snapshots
. This is what the InputFormat
uses to distinguish between your original data table and the snapshot metadata table.
Querying this table will expose information like:
This should be regarded as a ‘taster’ of functionality that we would like to contribute to the Iceberg project.
Ongoing work
Hive support for Iceberg is a hot topic within the community and there are many features in the process of being added to the main Iceberg repository including:
- Support for the Hive write path
- Column projection
This blog post has shown that Iceberg has many useful features for use as a table format for large scale data lakes. We found the community behind it were very welcoming and easy to engage with, and we enjoyed working together with them. We hope to continue our collaboration in the future.