Sitemap
Expedia Group Technology

Stories from the Expedia Group Technology teams

EXPEDIA GROUP TECHNOLOGY — DATA

A Short Introduction to Apache Iceberg

11 min readJan 26, 2021

--

A photo of an Iceberg
“Iceberg” by Магадан is licensed under CC BY-NC-SA 2.0
decorative separator

What is Apache Iceberg?

A diagram showing the Iceberg table format
Figure 1: The Iceberg table format
decorative separator

Benefits of using the table format

Using the Iceberg Java API

Schema schema = new Schema(
required(1, "hotel_id", Types.LongType.get()),
optional(2, "hotel_name", Types.StringType.get()),
required(3, "customer_id", Types.LongType.get()),
required(4, "arrival_date", Types.DateType.get()),
required(5, "departure_date", Types.DateType.get())
)
PartitionSpec spec = PartitionSpec.builderFor(schema)
.identity("hotel_id")
.build();
TableIdentifier id = TableIdentifier.parse("bookings.rome_hotels");
Catalog catalog = new HadoopCatalog(conf, "hdfs://nn:8020/path/to/table");Table table = catalog.createTable(id, schema, spec);--------------------------------------------------------------------Catalog catalog = new HiveCatalog(conf);Table table = catalog.createTable(id, schema, spec);
spark = SparkSession.builder().master("local[2]").getOrCreate();Catalog catalog = new HiveCatalog(spark.sparkContext().hadoopConfiguration());Table table = catalog.createTable(id, schema, spec);
#HiveCatalog
spark
.read.format("iceberg").load("bookings.rome_hotels")
#HadoopCatalog
spark.read.format("iceberg").load("hdfs://nn:8020/path/to/table")

Features of Iceberg

#Add columns
table.updateSchema()
.addColumn("hotel_geo_id", Types.LongType.get())
.commit();
#Delete columns
table.updateSchema()
.deleteColumn("hotel_name")
.commit();
#Rename columns
table.updateSchema()
.renameColumn("arrival_date", "check_in_date")
.commit();
#Promote int -> long
table.updateSchema()
.updateColumn("price", Types.LongType.get())
.commit();
#Promote float -> double
table.updateSchema()
.updateColumn("float", Types.DoubleType.get())
.commit();
#Widen decimal precision
table.updateSchema()
.updateColumn("decimal", Types.DecimalType.of(4, 2))
.commit();
#Rearrange columns
table.updateSchema()
.moveAfter("customer_id", "hotel_id")
.moveBefore("hotel_name", "hotel_id")
.commit();
A diagram showing a table named “booking_table” with two partition specifications.
Figure 2: Diagram showing a table with two partition specifications.
spark.read
.format("iceberg")
.load("bookings.rome_hotels.snapshots")
.show(truncate = false)
A snapshot of the table data
Figure 3: Snapshot table data.
spark.read
.format("iceberg")
.option("snapshot-id", 1588341995546L) #Using snapshot ID
.load("bookings.rome_hotels")
spark.read
.format("iceberg")
.option("as-of-timestamp", "499162860000") #Using timestamp
.load("bookings.rome_hotels")
table.rollback()
.toSnapshotId(1588341995546L)
.commit();
table.rollback()
.toSnapshotAtTime(499162860000)
.commit();
decorative separator

Hive Support

Create a table

add jar /path/to/iceberg-hive-runtime.jar;
CREATE EXTERNAL TABLE table_a 
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://some_bucket/some_path/table_a';
SELECT * from table_a;
Configuration hadoopConfiguration = spark.sparkContext().hadoopConfiguration();
hadoopConfiguration.set(ConfigProperties.ENGINE_HIVE_ENABLED, "true"); //iceberg.engine.hive.enabled=true
Map<String, String> tableProperties = new HashMap<String, String>();     tableProperties.put(TableProperties.ENGINE_HIVE_ENABLED, "true"); //engine.hive.enabled=true
SET iceberg.mr.catalog=hive;
SELECT * from table_b;

Predicate pushdown

decorative separator

Future Work

SELECT * FROM bookings.rome_hotels WHERE snapshot__id = 1234567890;
CREATE TABLE bookings.rome_hotels__snapshots
STORED BY 'com.expediagroup.hiveberg.IcebergStorageHandler'
LOCATION 'hdfs://nn:8020/path/to/table'
TBLPROPERTIES ('iceberg.catalog'='hadoop.catalog')
A snapshot of the table data
Figure 4: Snapshot table metadata.
decorative separator

Ongoing work

--

--

Responses (4)