Getting started with Apache Iceberg

The modern world is awash with unimaginable amounts of data created daily by devices, apps and users. Take social media for example: 8.5 billion Google searches, ~100 billion messages on Whatsapp, and over 95 million posts on Instagram. Along with human users, there are millions of other devices and sensors all generating various kinds of data.

With so much data to analyze, traditional data warehouses were unable to handle the volume and variety of the incoming data, giving rise to managed data lakes as a way to augment data warehouses. While data lakes have come a long way due to feature and technological improvements, they still suffer from various limitations. In this article, we’ll cover some of the limitations and how Apache Iceberg helps overcome them.

Data Lake File and Table Formats

At its core, a data lake is just a repository of millions, or even billions, of files. These files could contain data of many different types stored in different formats. File formats specify how tools should interpret the data in a file. Some of the most popular file formats are Parquet, AVRO, ORC, JSON, and CSV. Table formats provide an abstraction of a table to users and tools to efficiently interact with the many underlying data files

The Hive table format was the standard for the last decade but has cons that are very limiting at scale. A few shortcomings of the Hive table format:

  • Requires directory listings for query planning; listing millions of files can take a long time, hurting performance
  • Is inefficient for small updates; rewrites entire partitions for even very small updates
  • Lacks ACID compliance
  • Requires users to be aware of partitioning schemes and explicitly specify them in queries to avoid time-consuming full table scans

In addition, Hive does not support important requirements of modern data platforms such as:

  • Schema evolution
  • Time travel
  • Metadata tracking
  • Row level operations (deletes, upserts)
  • Cross platform support

Some of the most talked about table formats (Apache Iceberg, Apache Hudi, Delta Lake) provide support for these requirements in their own distinctive ways. For the rest of this article, we will focus on Iceberg.

What is Apache Iceberg?

Built by Netflix and donated to the Apache Software Foundation, Iceberg is an open-source table format built to store extremely large, slow-moving tabular data. Iceberg is quickly becoming the de facto standard for managing data in data lakes for several reasons. Iceberg solves many challenges around scale, usability, and performance degradation when working on extremely large data sets. Iceberg supports industry standard file formats such as Apache Parquet, ORC, and Avro. ACID compliance, schema evolution, hidden partitioning, time travel, and performance improvements are several other features that make Iceberg very desirable. Iceberg makes it possible for engines like Spark, Trino, Flink, Presto, Hive, and Impala to safely work with the same tables in parallel.

Learn more about the various reasons to choose Iceberg in these articles:

Iceberg Tables: Powering Open Standards with Snowflake Innovations

5 Compelling Reasons to Choose Apache Iceberg

How Does Apache Iceberg work?

Unlike Hive, where a table is defined as the entire contents of one or more directories, Iceberg tracks individual data files that make up a table. Metadata is tracked in three files (metadata file, manifest list, manifest files) as described in more detail in this blog post.

This metadata design helps resolve many problems with Hive tables, for example:

  • Faster planning and execution since file listing is no longer used to plan jobs
  • Easier and more efficient small updates, eliminating the need to rewrite entire partitions
  • Safer transactions with snapshot isolation; reads and writes are atomic and independent
  • Easier management with schema evolution enabled by physical abstraction

Exploring Apache Iceberg Tables Using Spark

Let’s walk through an example to demonstrate several features of Iceberg. We will use a public data set, apply a few transformations, and see what Iceberg does behind the scenes. For the purposes of this demo, we’ll use Amazon EMR and S3, but a valuable attribute of Iceberg is the growing ecosystem of tools. Because Iceberg is an open spec, it is supported on any cloud object store and by a growing list of query and processing engines.

Get the data and upload to Amazon S3

You can copy NYC Taxi Data from here. For the demo, we only need files from the year 2022.

If you don’t already have an account on AWS, sign up for a free trial. Create an S3 bucket, and copy the files as below.

Supply your destination bucket and prefix:

*To avoid any regional dependencies, make sure the destination bucket is in the same region as the source data (US East N Virginia).

Alternatively, you can also download the files and upload them to your S3 bucket.

Spin up an EMR cluster

Our next step is to spin up an EMR cluster; please follow the steps outlined below.

  • From the AWS console, choose EMR.
  • Click on Create cluster, go into “Advanced options”
  • For the demo, we need a Jupyter Notebook and Spark, so choose JupyterHub, JupyterEnterpriseGateway, Spark
  • In the “Edit software settings box,” we need to supply a configuration for the Iceberg runtime environment, sql extensions, catalog, etc.
  • Use the below screenshot and code block for reference.
[
{
"classification": "spark-defaults",
"properties": {
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.jars.packages": "net.snowflake:snowflake-jdbc:3.13.14,net.snowflake:spark-snowflake_2.12:2.10.0-spark_3.1",
"spark.sql.defaultCatalog": "prodiceberg",
"spark.sql.catalog.prodiceberg": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.prodiceberg.catalog-impl": "org.apache.iceberg.aws.dynamodb.DynamoDbCatalog",
"spark.sql.catalog.prodiceberg.warehouse": "<Your S3 Path>",
"spark.sql.catalog.prodiceberg.dynamodb.table-name": "prodiceberg_metastore"
}
},
{
"classification": "iceberg-defaults",
"properties": {
"iceberg.enabled": "true"
}
}
]

The only change required in the above code block, is the path to S3 “<Your S3 Path>”. This path is used for your warehouse (data and metadata is written to this location).

  • Now let’s give the cluster a name, leaving everything else with the defaults.
  • Now let’s give the cluster a name, leaving everything else with the defaults.
  • Once you click “Create Cluster,” it takes about 10 minutes for the cluster to spin up. You will know that a cluster is ready when you see the status as below.

Create a Jupyter Notebook for PySpark

Once your cluster is ready, create a notebook to run some tests.

  • Click “Notebooks” on the left pane.
  • Supply a name and add a comment/description if you want.
  • For the Cluster, choose the EMR cluster that we just spun up.
  • We can leave the other options with the defaults.
  • Once the notebook is created and ready, click on “Open in JupyterLab.”
  • Double-click on the notebook in the left pane; a kernel selector should pop up. Switch the kernel to PySpark.

Note: If you are using your personal account to run this demo, you might receive an error while switching the kernel “Forbidden workspace not attached to cluster.” This is because you are using your root credentials. Create an IAM user with EMR privileges, log-on as that user, open the notebook, and switch the kernel.

With this, all our configuration is complete and we can get started on the demo.

Create a Table and Load Data

Let’s start a Spark session. We can do that by typing “spark” in a cell and running it.

We now have a Spark session available; let us create a database to house our objects.

spark.sql("create database city_data")

Fetch data into a dataframe: (substitute the path where you uploaded the Parquet files).

df = spark.read.option("header",True).parquet("<Your S3 
Path>/yellow_tripdata_2022-01.parquet")

Check the record count:

df.count()

Persist the contents of the dataframe as a table:

df.writeTo("city_data.yellow_taxi_2022").create()

Verify row count from the table:

spark.sql("""
select count(1) from city_data.yellow_taxi_2022
""").show()
spark.sql("""
describe table city_data.yellow_taxi_2022
""").show()

We just created our first Iceberg table! Let’s take a break from Spark and check out what happened behind the scenes.

To view the data and Iceberg metadata, we will need to navigate to the S3 location supplied while configuring the EMR. At your S3 path, you will notice the database we created:

Inside the database city_data, you will find the table folder “yellow_taxi_2022.”

Each table folder contains a “data” and “metadata” subfolder.

  1. The data folder contains the actual data in PARQUET format.
  2. The metadata folder contains a JSON file (metadata) and two AVRO files (manifest file & manifest list)

To understand the metadata better, here is a screenshot of the metadata.json file.

This blog post goes into detail about metadata. For the sake of this short demo, let us focus on two elements from the JSON file: “schema” and “snapshots.”

The “schema” element tracks the table schema. When the table evolves, changes to the table schema are tracked in the “schemas” array.

The “snapshots” array keeps track of every change to the table state. Here is an expanded view. Every time the table state changes, a new snapshot is created.

Let’s go back to Spark to test this out:

df.count()

Our Spark data frame still has data from the public data set. Let’s append this to the existing table, thus doubling the number of rows.

df.writeTo("city_data.yellow_taxi_2022").append()

New row count:

spark.sql("""
select count(1) from city_data.yellow_taxi_2022
""").show()

Let’s go back to view the data folder. As expected, there is a new data file.

The metadata folder also has a new set of metadata files reflecting the changes made.

Let’s check out what the latest metadata.json file looks like. You will see that there are two snapshots now: 0 and 1 representing the table state before and after the append.

**The manifest list indicates the files that make up a snapshot.

Schema Evolution

Let’s go back to Spark and just alter the table without adding any new data.

spark.sql("""
alter table city_data.yellow_taxi_2022 add columns(misc_comments string)
""").show()
spark.sql("""
describe table city_data.yellow_taxi_2022
""").show()

The new column “misc_comments” showed up; let’s go back to look behind the scenes.

  • Only a new metadata.json file shows up
  • No change in the data folder
  • No change to the manifest list or manifest files

Iceberg schema changes are metadata changes, so data files don’t need to be rewritten to perform the update, sparing you the cost of rewriting or migrating the entire table as you might with other table formats.

Under the “schemas” array, you’ll see a new schema has been generated: 0 and 1, which tracks the latest update to the schema.

Here are the two schemas side by side; you will see 19 fields in the latest schema (schema-id:1).

Now that we’ve viewed how schema evolution works, let’s try time travel.

Time Travel

By just appending .history to the table name, you can list all snapshots and the parent snapshot.

spark.sql("""
select * from city_data.yellow_taxi_2022.history
""").show()

Notice that even though we changed the table twice (doubled the rows, added an extra column), there are only two snapshots — not three. This is because the physical table has only changed once; the other change was a metadata-only change.

Test time travel by reverting to the original state of the table. Let’s fetch the original snapshot:

df = spark.sql("select * from city_data.yellow_taxi_2022.history")
original_snapshot = df.head().snapshot_id

Now, call <catalog>.system.rollback_to_snapshot, by supplying the original snapshot:

spark.sql(f"call prodiceberg.system.rollback_to_snapshot('city_data.yellow_taxi_2022',{original_snapshot})")

Let’s look at the snapshot history again:

spark.sql("""
select * from city_data.yellow_taxi_2022.history
""").show()

You will see that our table reverted to the original snapshot.

Let’s check the counts from the table again; we are back to 2463931.

spark.sql("""
select count(1) from city_data.yellow_taxi_2022
""").show()

If you were to look at the data and metadata folders, you would see no changes to the data folder; only the metadata.json file’s “current snapshot id” is updated to the original snapshot.

Additional Useful Spark Procedures

There are several Spark procedures for Iceberg to support various functions. Let’s try one that allows adding existing Parquet files in place of an Iceberg table. If you already have an existing base of Parquet data, you can spare the compute cost of rewriting the files by simply calling the add_files procedure to add your existing Parquet to an Iceberg table.

spark.sql("""call prodiceberg.system.add_files(
table => 'city_data.yellow_taxi_2022',
source_table => '`parquet`.`s3://jedicetest/test/yellow_tripdata_2022-02.parquet`')
""")

Verify that the new count reflects additional data from the new file just added.

spark.sql("""
select count(1) from city_data.yellow_taxi_2022
""").show()

Conclusion

We covered a few features of the Iceberg table format and how it works behind the scenes. Iceberg, backed by companies like Netflix, Amazon, and Apple, is rapidly gaining adoption. Combining the capabilities of the Snowflake platform with open-source projects like Apache Iceberg and Apache Parquet can empower your organization to solve challenges and support your architecture of choice, be it a data lake, data lake house, or a data mesh.

  • More on Snowflake’s implementation of Iceberg (currently in private preview) can be found here
  • More on how Apache Iceberg enables ACID compliance for data lakes here
  • More on data lakes and Apache Iceberg can be found here

--

--