How Apache Iceberg enables ACID compliance for data lakes

In this article, we will look at how the Apache Iceberg table format allows concurrent I/O (writes and reads) operations while providing ACID compliance. We will also go through a simple demo of running concurrent operations and see the results.

As data lakes evolved, one of the main constraints they faced was in supporting ACID transactions. Without these transactional properties, using data lakes became very difficult. Challenges arose related to supporting updates/deletes, data consistency issues, and handling concurrency of multiple readers and writers. While traditional RDBMS had addressed these requirements long ago, they did so in a tightly controlled environment of a single DB engine. A data lake would need to support ACID transactions over a large scale of data, using file formats, while allowing multiple engines to work concurrently to ensure data is always consistent and no operation corrupts the data.

As an open table format, Apache Iceberg allows multiple writers and readers, including different engines, to perform ACID transactions on the same data set in a data lake simultaneously.

Iceberg also solves some other challenges faced by the existing table formats of the day, such as partition evolution, schema evolution, efficient pruning, and more. Those topics are outside the scope of this article, but if you want more information, watch this presentation from Iceberg co-creator Ryan Blue.

Iceberg Concepts for ACID Compliance

I’ll start by laying out some concepts, and then we will go into more depth.

A — Iceberg writes support removing and adding files in a single operation, and writes are never partially visible, providing atomicity of operations.

C — Iceberg uses optimistic concurrency locking to make sure any concurrent writes to the table do not lead to inconsistent data. Readers only see already committed data for a read-consistent view.

I — Iceberg uses snapshot and serializable isolation levels to ensure that reads and concurrent writes are isolated.

D — Apache Iceberg ensures transaction semantics are durable; when a transaction is committed, it is permanent. Iceberg also leverages the data durability of Object Storage to ensure transactions are durable.

Let’s look at what all this means and how Iceberg achieves this. We will begin with a brief overview of how Iceberg organizes table metadata and data files.

Iceberg Table Data and Metadata Organization

Let’s start from the bottom of this organization to simplify the explanation.

Data File: Data files contain the fields and values in an Iceberg table (rows and columns). Data files can be in Apache Parquet, Avro, or ORC format. (There are also Delete files, which I am not covering here. Delete files encode rows of a table that are deleted in existing data files.)

Manifest File: Manifest files contain the list of all data files that constitute a table. They also contain each file’s partition data and its metrics. Manifest files are stored in Avro format.

Manifest List: A manifest list contains the list of manifest files belonging to a snapshot of a table. There is one manifest list file for each snapshot. A snapshot represents the state of a table at a particular time. A new manifest list is written for each attempt to commit a snapshot and is used to access the complete set of data files in the table. A manifest list file is stored in Apache Avro format, and the file name starts with the snapshot ID that it tracks.

Metadata File: The metadata file keeps the location of the manifest list files, along with the information about a table’s schema, snapshots, partitions, and the latest snapshot of the table. All changes to table state create a new metadata file and replace the old metadata with an atomic swap. This produces a linear history of table versions and provides the basis for isolation. Metadata files are kept in JSON file format.

Catalog: At the top level of the hierarchy of the Iceberg table layout is the catalog. Catalog is used for defining, creating, loading, and managing tables. It is required to identify a table in SQL queries and tracks the table with the location of the current metadata file. So anytime a table is updated, the catalog will have the location of the latest metadata file. The readers and writers refer to the catalog to get the metadata file and refresh it.

Iceberg supports a number of catalogs including Hive Metastore, AWS Glue, Nessie, or a table in an RDBMS through JDBC connect. The essential requirement for a catalog is that it must support atomic transactions to properly execute atomic Iceberg table commits and read serializable isolation.

You can refer to these Iceberg documentation links for an introduction and more information on table specifications:

Introduction

Iceberg Table Spec

Iceberg Table Isolation and Locking for ACID Transactions

Writes

Writes follow optimistic concurrency locking, meaning a writer optimistically assumes that the current table version will not be changed before it can commit its update. The writer initiates the update, creates the metadata files, and tries to commit the update by swapping the metadata file pointer from the current version to the new version.

As per the Iceberg Table Specs

“An atomic swap of one table metadata file for another provides the basis for serializable isolation.”

However, if the writer finds that the snapshot on which the update is based is no longer current, the writer must retry the update based on the new version. Some operations that do not have to write new data files support retry by reapplying the metadata changes.

The success of concurrent write operations also depends on the isolation level in a table. Apache Iceberg supports two isolation levels: serializable and snapshot. By default, the isolation level is serializable but can be changed to snapshot.

Here are the specifics of concurrent write operation behavior for both the isolation levels

“While serializable is the strongest isolation level in databases, snapshot isolation is beneficial for environments with many concurrent writers. The serializable isolation level guarantees that an ongoing UPDATE/DELETE/MERGE operation fails if a concurrent transaction commits a new file that might contain rows matching the condition used in UPDATE/DELETE/MERGE. For example, if there is an ongoing update on a subset of rows and a concurrent transaction adds a new file with records that potentially match the update condition, the update operation must fail under the serializable isolation but can still commit under the snapshot isolation.”

Reads

Reads are isolated from concurrent writes and always use a committed snapshot of a table’s data. Every reader refers to the particular table state, as pointed out by the existing metadata file loaded last by the reader (which it acquires by referring to the catalog). Until the reader refreshes (reloads) the metadata file to pick up the new metadata location, all the reads continue to be directed to the last state pointed to by the existing metadata file. The metadata file contains the corresponding manifest list files for the snapshots (the manifest list contains the location of manifest files which, in turn, hold the location of data files). This ensures a consistent version of data is always read from a completed operation and not from an operation in progress.

Readers don’t acquire any locks on data.

Demo of Iceberg Tables with ACID Compliance

Now let’s see these concepts in practice. If you’d like to follow along, you’ll need admin access to accounts on AWS (free trial) and Snowflake (free trial).

First, configure Amazon EMR to be able to run Iceberg libraries. You can also refer to this article for more info on configuring the EMR cluster with Iceberg.

Once you are in the Amazon EMR console, click on Create cluster then Go to advanced options and apply the same software configurations as shown in the image below.

In the Edit software settings, enter the following JSON object in the box, using the URL of the desired S3 storage location.

[
{
"classification": "spark-defaults",
"properties": {
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions",
"spark.jars.packages": "net.snowflake:snowflake-jdbc:3.13.14,net.snowflake:spark-snowflake_2.12:2.10.0-spark_3.1",
"spark.sql.defaultCatalog": "prodiceberg",
"spark.sql.catalog.prodiceberg": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.prodiceberg.catalog-impl": "org.apache.iceberg.aws.dynamodb.DynamoDbCatalog",
"spark.sql.catalog.prodiceberg.warehouse": "URL of the S3 path of Iceberg warehouse",
"spark.sql.catalog.prodiceberg.dynamodb.table-name": "prodiceberg_metastore"
}
},
{
"classification": "iceberg-defaults",
"properties": {
"iceberg.enabled": "true"
}
}
]

Now, let’s examine how the Iceberg table behaves with jobs coming from multiple clusters. We will also set up an Iceberg database and table in the process.

Notebook One

Let’s create a database in the Iceberg warehouse and a table inside it. I will use Parquet files available in a public S3 bucket provided by Snowflake that contain Citi Bike trips data to populate this table.

  1. Create a DB in Iceberg warehouse, read example data files from a public S3 bucket, and load into a table inside the DB.
spark.sql("CREATE DATABASE citibike_demo_db")
df = spark.read.parquet("s3://snowflake-workshop-lab/citibike-trips-parquet/2022/01/09/")
df.show()
df.writeTo("citibike_demo_db.trips").create()

2. Sample the data we have just loaded.

%%sql
SELECT * FROM citibike_demo_db.trips limit 100

3. Rename the duration column to tripduration for better understanding.

%%sql
ALTER TABLE citibike_demo_db.trips RENAME COLUMN DURATION TO TRIPDURATION

4. Add a new column and call it tripduration_minutes.

%%sql
ALTER TABLE citibike_demo_db.trips ADD COLUMN tripduration_minutes float AFTER tripduration

5. Perform an update from cluster 1.

Next, we will perform the concurrent write test by trying to update the column tripduration_minutes from two clusters at the same time. Execute the following on notebook one (running on cluster one) and the command after it in notebook two (running on cluster two) simultaneously.

%%sql
UPDATE citibike_demo_db.trips
SET tripduration_minutes = tripduration/60

This command from notebook one succeeds.

Notebook Two

Execute the following command on notebook two running on cluster two to try and update the same column at the same time.

5. Perform a simultaneous update from cluster 2.

%%sql
UPDATE citibike_demo_db.trips
SET tripduration_minutes = tripduration/100

This time, the command fails with error messages in the screenshot. (I have truncated the screenshot). With Optimistic Concurrency locking, both write operations start. The first write creates a new snapshot and the table metadata is atomically swapped. The second write operation fails while committing as it finds from metadata that the snapshot on which it is based is no longer current, and finds new files in the current snapshot potentially containing the records matching the update condition. Since the default isolation level is serializable, the second update operation fails. This allows for safe concurrency and consistency.

Verify the Table Content by a Read Operation

The power of an open table format like Iceberg is that other tools can safely work with the same table, and I don’t mean just Spark-based tools. For example, you can read the same files in Amazon S3 from Snowflake using an external table. Snowflake’s support for Iceberg with External Tables is in private preview at the time of publication and can be enabled by Snowflake on request.

6. To connect to Snowflake and run a test query using Snowflake’s engine from any one of the PySpark notebooks, you can run the following code. Use the Snowflake account URL, username, password, database, schema and warehouse you created in your Snowflake trial account for this demo, in the following code in place of ****** in the corresponding line.

# Connect to Snowflake from Pyspark notebook and run a query tro retrieve current database name
SNOWFLAKE_SOURCE_NAME = "net.snowflake.spark.snowflake"
sfOptions = {
"sfURL" : "******",
"sfUser" : "******",
"sfPassword" : "******",
"sfDatabase" : "******",
"sfSchema" : "******",
"sfWarehouse" : "******",
"sfrole" : "accountadmin"
}
df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("query", "select current_database()") \
.load()
df.show()

7. Create an Iceberg External Table in Snowflake.

Now, create an External Table in Snowflake with the code below in the notebook, pointing to the storage location of Iceberg in S3. Notice that you can specify the table format as Iceberg, in which case Snowflake will use the Iceberg metadata for efficient pruning.

For Snowflake to point to the storage location, you need to create an external stage as per this doc and then refer to that in the external table definition. The code below illustrates creation of the external stage and the external table.

from pyspark.sql import Row, SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark = SparkSession.builder.appName("snowflake").getOrCreate()
spark.sparkContext._jvm.net.snowflake.spark.snowflake.Utils.runQuery(sfOptions, "create or replace stage external_stage_name url = 'replace with iceberg table data folder location' CREDENTIALS = ( cloud_specific_credentials );")
spark.sparkContext._jvm.net.snowflake.spark.snowflake.Utils.runQuery(sfOptions, "create or replace external table trips table_format='iceberg' location= @external_stage_name file_format=(type=PARQUET) snapshot_location = 'replace with latest iceberg table metadata file path';")
# Optional command to refresh Snowflake Iceberg external table to latest metadata file in case the Snowflake external table was created before the update operation
# spark.sparkContext._jvm.net.snowflake.spark.snowflake.Utils.runQuery(sfOptions, "alter external table trips refresh snapshot 'replace with latest iceberg table metadata file path';")
spark.stop

8. Perform read operation from Snowflake.

df = spark.read.format(SNOWFLAKE_SOURCE_NAME) \
.options(**sfOptions) \
.option("query", "select tripduration, tripduration_minutes from trips;") \
.load()
df.show()

You will notice that the read from Snowflake reflects the new column with updated values as per the successful write operation. Also note that the reads are based on the table snapshot as per the metadata file referred while creating or refreshing the table. As a result, reads are isolated to that particular snapshot, always referring to a committed operation when metadata has been atomically swapped, resulting in a consistent view of data.

Conclusion: Apache Iceberg and ACID Compliance

We looked at how the Iceberg table format supports ACID transactions by using optimistic concurrency control and serializable snapshot isolation. We also saw a practical example of the same.

As an open-source table format, Apache Iceberg offers important benefits to data lake users looking to handle the volume, concurrency, and scale of big data. Iceberg provides support for multiple file types, technical data metastores, and processing engines. Iceberg is seeing broad adoption across several customers and is very much community driven with contributions from several large organizations.

What questions do you have about Iceberg tables and ACID compliance? List them in the comments section. And, if you know someone who works with data lakes, consider sharing this article with them.

Attributions

https://iceberg.apache.org/spec/

https://iceberg.apache.org/docs/latest/

https://iceberg.apache.org/javadoc/0.11.1/org/apache/iceberg/IsolationLevel.html

https://www.youtube.com/watch?v=nWwQMlrjhy0&t=138s

https://tabular.io/blog/emr-spark-and-iceberg/

--

--