Delta Lake: build modern Lakehouses

Alessandro Gangi
Data Reply IT | DataTech
19 min readDec 28, 2023

Introduction

In the age of data-driven decision-making, organizations across various industries are increasingly reliant on efficient and reliable methods to store, manage, and analyze vast amounts of data. Traditional data lakes, which serve as repositories for diverse and large-scale data, have often encountered challenges related to data quality, consistency, and reliability. However, the landscape of data lakes has been transformed with the emergence of new technologies which have revolutionized the way modern data lakes are constructed and operated.

Delta Lake official logo
Delta Lake logo — delta.io

Delta Lake is one of the most popular projects that addresses the shortcomings of traditional data lakes and introduces powerful capabilities that allow organizations to build robust and scalable data lakes capable of supporting their data-driven initiatives. It is defined as:

an open-source storage framework that enables building a
Lakehouse architecture with compute engines including Spark, PrestoDB, Flink, Trino, and Hive and APIs for Scala, Java, Rust, Ruby, and Python. — delta.io

It runs on top of existing data lakes, such as Apache Hadoop Distributed File System (HDFS), or cloud-based object storage systems like Amazon S3, Azure Data Lake Storage, Google Cloud Storage, and others. Delta Lake offers ACID (Atomicity, Consistency, Isolation, Durability) transactions, schema enforcement, and metadata management capabilities, making it a powerful framework for big data analytics and data engineering workflows.

This article aims to provide an overview of Delta Lake's key features with the help of a Jupyter Notebook: first, it will go through the main operations that are made available by the framework and then it will discuss its key features.

Working with Delta Lake

In this section we discuss the main operations of Delta Lake, combined with Spark, with the help of a Jupyter Notebook: first, we create a sample dataset and configure the Spark session, then we run some basic CRUD operations and finally, we test other Delta Lake-specific commands.

Initial setup

Let’s start by defining a simple dataset that will be used later in this article. It contains 10 records representing random people with some features:

Initial dataset
Initial dataset

Now we can initialize the Spark session: in this article, we are using the PySpark library and Spark 3.3 version (Delta Lake — Spark compatibility matrix).

from pyspark.sql import SparkSession

# Init spark session with some Delta Lake-specific configurations
spark = SparkSession.builder.appName("spark_delta") \
.config("spark.jars", f"{BASE_PATH}/jars/delta/delta-core_2.12-2.3.0.jar,{BASE_PATH}/jars/delta/delta-storage-2.3.0.jar") \
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
.config("spark.sql.sources.partitionOverwriteMode", "dynamic") \
.config("spark.databricks.delta.retentionDurationCheck.enabled", "false") \
.config("spark.sql.repl.eagerEval.enabled", True) \
.getOrCreate()

We are using some custom configurations: in particular, with the first configuration we are loading delta-core and delta-storage jars (that can be downloaded from the official Delta Lake website), while the second and the third configurations are needed to make the framework work. The remaining ones will come in handy later in the article.

Now that we have both a CSV dataset and a working Spark session, we can read the data and write it in delta format as a parquet table, partitioning the data on the city column:

# Read dataset
df = spark.read.format("csv").option("header", True).option("inferSchema", True).load(people_dataset_path)

# Write partitioned parquet table
df.write.format("delta").mode("overwrite").partitionBy("city").save(f"{BASE_PATH}/data/delta/people")

This is the content of the target folder where the Delta table has been saved to:

-> /
-> city=MILANO/
----> part-00000-83a6308b-1301-48f2-9764-df574e50d3d7.c000.snappy.parquet (NEW)
-> city=ROMA/
----> part-00000-ec07f30b-6e78-4b31-9cbb-02dd5349f58c.c000.snappy.parquet (NEW)
-> city=TORINO/
----> part-00000-bd2f6919-0811-4895-a38b-7b0d76d6476a.c000.snappy.parquet (NEW)
-> _delta_log/
----> 00000000000000000000.json (NEW)

There are 3 folders (one for each distinct city in our dataset) containing the parquet data files plus an additional _delta_log folder. This last folder’s content represents all transaction information, and it gets updated every time we perform an operation on the table: these logs are fundamental as all of the Delta Lake key features rely on them. Let’s see the content of the first and only log:

{
"commitInfo": {
...
"operation": "WRITE",
"operationParameters": {
"mode": "Overwrite",
"partitionBy": "[\"city\"]"
},
...
}
}
...
{
"metaData": {
"id": "0d1346a9-f428-4109-a55f-36b5ccd5f49b",
"format": {
"provider": "parquet",
"options": {}
},
"schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"surname\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"gender\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"height\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"weight\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}},{\"name\":\"city\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ts_ins\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}}]}",
"partitionColumns": [
"city"
],
...
}
}
{
"add": {
"path": "city=MILANO/part-00000-35792e36-956c-4172-b7e8-13f71170414c.c000.snappy.parquet",
"partitionValues": {
"city": "MILANO"
},
...
}
}
{
"add": {
"path": "city=ROMA/part-00000-ec07f30b-6e78-4b31-9cbb-02dd5349f58c.c000.snappy.parquet",
"partitionValues": {
"city": "ROMA"
},
...
}
}
{
"add": {
"path": "city=TORINO/part-00000-bd2f6919-0811-4895-a38b-7b0d76d6476a.c000.snappy.parquet",
"partitionValues": {
"city": "TORINO"
},
...
}
}

It clearly shows the commit information related to the write operation we just run and the schema of the table. Also, it’s important to notice how the log is keeping track of the parquet data files currently referenced by the table (see the add objects).

Now it’s time to perform some basic CRUD operations on our Delta table. First of all, we need to be aware that there are two different ways to operate on a Delta table:

  • using Spark APIs
  • using DeltaTable APIs

Spark Dataframe Writer APIs allow us to perform read and write operations while the DeltaTable APIs can be used to perform update, delete, and other commands.

Write (append) — Spark APIs

Adding new records to an existing Delta Lake table is easy: we only need to create a Spark Dataframe containing the new records and then write it with append mode and delta format.

# Create a Spark dataframe with the new record
fields = df_delta.schema.fields

df_append_record = spark.createDataFrame([
[11, "Michelle", "Blu", "F", 159, 49.0, "TORINO", datetime.strptime('2019-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')]
], StructType(fields))

# Append the new record to the existing delta table
df_append_record.write.format("delta").mode("append").partitionBy("city").save(f"{BASE_PATH}/data/delta/people")

This is the resulting table: we can see the new record at the bottom.

The resulting dataset after the append operation
The resulting dataset after the append operation

We can check again the content of the Delta table folder: we have an additional parquet file (containing the record we just added) inside the city=TORINO partition and a new delta log.

-> /
-> city=MILANO/
----> part-00000-83a6308b-1301-48f2-9764-df574e50d3d7.c000.snappy.parquet
-> city=ROMA/
----> part-00000-ec07f30b-6e78-4b31-9cbb-02dd5349f58c.c000.snappy.parquet
-> city=TORINO/
----> part-00000-bd2f6919-0811-4895-a38b-7b0d76d6476a.c000.snappy.parquet
----> part-00007-c5095d4b-9d80-4829-a0d8-96518966d913.c000.snappy.parquet (NEW)
-> _delta_log/
----> 00000000000000000000.json
----> 00000000000000000001.json (NEW)

If we inspect the new delta log we can clearly see how the new parquet file is being referenced by the Delta table:

{
"commitInfo": {
...
"operation": "WRITE",
"operationParameters": {
"mode": "Append",
"partitionBy": "[\"city\"]"
},
...
}
}
{
"add": {
"path": "city=TORINO/part-00007-c5095d4b-9d80-4829-a0d8-96518966d913.c000.snappy.parquet",
"partitionValues": {
"city": "TORINO"
},
...
}
}

Write (overwrite) — Spark APIs

In the previous example, we saw what happens after an append operation. Let’s try now to insert a new record using overwrite mode:

# Create a Spark dataframe with the new record
fields = df_delta.schema.fields

df_overwrite_record = spark.createDataFrame([
[12, "Andrea", "Arancione", "M", 195, 87.4, "ROMA", datetime.strptime('2019-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')]
], StructType(fields))

# Insert overwrite the new record into the existing delta table
df_overwrite_record.write.format("delta").mode("overwrite").partitionBy("city").save(f"{BASE_PATH}/data/delta/people")

This is the resulting table: we can see how the new record has been added overwriting all the previous records in the same partition (city=ROMA).

NOTE: the only records in the same partition have been overwritten as we used the spark.sql.sources.partitionOverwriteMode:dynamic configuration for the Spark session. Otherwise, the whole table’s content would have been replaced by the new record.

The resulting dataset after the overwrite operation
The resulting dataset after the overwrite operation

Let’s check the content of the Delta table folder: we have an additional parquet file (containing the record we just added) inside the city=ROMA partition but the previous parquet file is still there (soon we’ll understand why). There is also a new delta log.

-> /
-> city=MILANO/
----> part-00000-83a6308b-1301-48f2-9764-df574e50d3d7.c000.snappy.parquet
-> city=ROMA/
----> part-00000-ec07f30b-6e78-4b31-9cbb-02dd5349f58c.c000.snappy.parquet
----> part-00007-e75e8418-347b-416d-84bc-9ac1bcaed385.c000.snappy.parquet (NEW)
-> city=TORINO/
----> part-00000-bd2f6919-0811-4895-a38b-7b0d76d6476a.c000.snappy.parquet
----> part-00007-c5095d4b-9d80-4829-a0d8-96518966d913.c000.snappy.parquet
-> _delta_log/
----> 00000000000000000000.json
----> 00000000000000000001.json
----> 00000000000000000002.json (NEW)

If we inspect the new delta log we can clearly see how the new parquet file is being referenced by the Delta table while the previous file reference has been removed (check the remove object):

{
"commitInfo": {
...
"operation": "WRITE",
"operationParameters": {
"mode": "Overwrite",
"partitionBy": "[\"city\"]"
},
...
}
}
{
"add": {
"path": "city=ROMA/part-00007-e75e8418-347b-416d-84bc-9ac1bcaed385.c000.snappy.parquet",
"partitionValues": {
"city": "ROMA"
},
...
}
}
{
"remove": {
"path": "city=ROMA/part-00000-ec07f30b-6e78-4b31-9cbb-02dd5349f58c.c000.snappy.parquet",
"dataChange": true,
"extendedFileMetadata": true,
"partitionValues": {
"city": "ROMA"
},
...
}
}

Update — DeltaTable APIs

DeltaTable APIs allow us to update the content of an existing delta table. First, we need to read the table:

from delta.tables import DeltaTable

# Read table with DeltaTable APIs
table = DeltaTable.forPath(spark, f"{BASE_PATH}/data/delta/people")

Now we can perform an update: Mario Rossi (id=1) gained some weight (from 77.2 kg to 78.5 kg) in the last month so let’s update his record.

# Update "weight" column of records with specific "id" column value
table.update(
condition = f.col("id") == 1,
set = {
"weight": f.lit(78.5)
}
)

# Show the table (as Spark DataFrame)
table.toDF().sort("id")

We can see the new value in the first record:

The resulting dataset after the update operation
The resulting dataset after the update operation

We can check the content of the Delta table folder: we have an additional parquet file inside the city=TORINO partition. This file contains the record with the new weight we just updated. Delta Lake never deletes a parquet file during a CRUD operation: the key, as we already anticipated, is the usage of delta logs.

-> /
-> city=MILANO/
----> part-00000-83a6308b-1301-48f2-9764-df574e50d3d7.c000.snappy.parquet
-> city=ROMA/
----> part-00000-ec07f30b-6e78-4b31-9cbb-02dd5349f58c.c000.snappy.parquet
----> part-00007-e75e8418-347b-416d-84bc-9ac1bcaed385.c000.snappy.parquet
-> city=TORINO/
----> part-00000-46be2786-e183-439f-b4d5-be411a09ea52.c000.snappy.parquet (NEW)
----> part-00000-bd2f6919-0811-4895-a38b-7b0d76d6476a.c000.snappy.parquet
----> part-00007-c5095d4b-9d80-4829-a0d8-96518966d913.c000.snappy.parquet
-> _delta_log/
----> 00000000000000000000.json
----> 00000000000000000001.json
----> 00000000000000000002.json
----> 00000000000000000003.json (NEW)

Let’s see the content of the new delta log: as expected, the parquet file containing the record with the old weight values is not referenced anymore while a new parquet file has been added. There are also some useful operation metrics that help us understand the process.
NOTE: the “added” parquet file contains all the records from the “removed” one with the new values that got updated (in this case only the weight of Mario Rossi).

{
"commitInfo": {
...
"operation": "UPDATE",
"operationParameters": {
"predicate": "(id#3342 = 1)"
},
"operationMetrics": {
"numRemovedFiles": "1",
"numCopiedRows": "3",
"numAddedFiles": "1",
"numUpdatedRows": "1",
...
},
...
}
}
{
"remove": {
"path": "city=TORINO/part-00000-bd2f6919-0811-4895-a38b-7b0d76d6476a.c000.snappy.parquet",
"dataChange": true,
"extendedFileMetadata": true,
"partitionValues": {
"city": "TORINO"
},
...
}
}
{
"add": {
"path": "city=TORINO/part-00000-46be2786-e183-439f-b4d5-be411a09ea52.c000.snappy.parquet",
"partitionValues": {
"city": "TORINO"
},
...
}
}

Delete — DeltaTable APIs

We are only missing the delete operation: again, this can be performed through DeltaTable APIs. Let’s delete from our dataset all people from TORINO:

# Delete records where "city" is equals to a specific value
table.delete(
f.col("city") == "TORINO"
)

table.toDF().sort("id")

This is the resulting dataset where people from TORINO have been removed from:

The resulting dataset after the delete operation
The resulting dataset after the delete operation

Even after a delete operation, no parquet files have been deleted:

-> /
-> city=MILANO/
----> part-00000-83a6308b-1301-48f2-9764-df574e50d3d7.c000.snappy.parquet
-> city=ROMA/
----> part-00000-ec07f30b-6e78-4b31-9cbb-02dd5349f58c.c000.snappy.parquet
----> part-00007-e75e8418-347b-416d-84bc-9ac1bcaed385.c000.snappy.parquet
-> city=TORINO/
----> part-00000-46be2786-e183-439f-b4d5-be411a09ea52.c000.snappy.parquet
----> part-00000-bd2f6919-0811-4895-a38b-7b0d76d6476a.c000.snappy.parquet
----> part-00007-c5095d4b-9d80-4829-a0d8-96518966d913.c000.snappy.parquet
-> _delta_log/
----> 00000000000000000000.json
----> 00000000000000000001.json
----> 00000000000000000002.json
----> 00000000000000000003.json
----> 00000000000000000004.json (NEW)

In fact, only their references have been removed from the table as we can see in the new delta log:

{
"commitInfo": {
"operation": "DELETE",
"operationParameters": {
"predicate": "[\"(city = 'TORINO')\"]"
},
"operationMetrics": {
"numRemovedFiles": "2",
"numDeletedRows": "5",
...
},
...
}
}
{
"remove": {
"path": "city=TORINO/part-00000-46be2786-e183-439f-b4d5-be411a09ea52.c000.snappy.parquet",
"dataChange": true,
"extendedFileMetadata": true,
"partitionValues": {
"city": "TORINO"
},
...
}
}
{
"remove": {
"path": "city=TORINO/part-00007-c5095d4b-9d80-4829-a0d8-96518966d913.c000.snappy.parquet",
"dataChange": true,
"extendedFileMetadata": true,
"partitionValues": {
"city": "TORINO"
},
...
}
}

Key Features

Now that we had a look at how Delta Lake handles standard CRUD operations with the help of delta logs, we can finally discuss some of the key features that make it a very appealing tool when building modern data lake houses.

Time Travel

Delta Lake’s time travel feature leverages metadata management and versioning to allow users to access and analyze historical snapshots of data stored in the data lake. By specifying a timestamp or version number, users can query data as it existed at a specific point in time and track the changes made to a dataset over time. This capability provides organizations with valuable insights into the evolution of their data, supports auditing and compliance needs, and allows for easy rollback to previous versions if necessary.

This is possible thanks to:

  • delta (transaction) logs that keep track of the operations performed on the table
  • data files which are kept even after being unreferenced from the delta table

Let’s have a look at a quick example that shows how to read a specific version of our Delta table. First, we retrieve all available versions:

# Retrieve all versions of the table
table_history_df = table.history().select("version", "timestamp", "operation")
table_history_df
Table versions history
Table versions history

Then, we can simply use Spark APIs with a custom option to get the table status at a specific version:

# Read a specific version of the table, before the previous deletion of some records
old_table_df = spark.read.format("delta").option("versionAsOf", "3").load(f"{BASE_PATH}/data/delta/people/")
old_table_df.sort("id")

As we can see we are able to read the status of the table before the delete operation that we ran before:

Table version #3 (before delete operation)
Table version #3 (before delete operation)

NOTE: it’s also possible to restore a specific version of a table. In this case, a new delta log and a new version will be created (previously we only read a specific version without restoring it and thus without creating new delta logs and versions).

Vacuum

Delta Lake does not delete (parquet) data files even after performing delete operations: after some time this may become expensive from a storage point of view. When the vacuum command is executed, it identifies and deletes files that are no longer referenced by the latest Delta Lake transaction log that are older than the retention period specified. These files are typically created as a result of operations like updates, deletes, and compactions performed on the Delta table.

Let’s see it in action: we simply need to provide the retention time (in hours) in order to delete unreferenced data files that are older than it.

NOTE: by default, Delta Lake prevents users from using low retention times. For the purpose of this article, we removed such limitations by setting the spark.databricks.delta.retentionDurationCheck.enabled:false in the Spark session.

# delete all unused parquet files
hours_to_keep = 0
table.vacuum(hours_to_keep)

Let’s see what happens inside the table folder after running the command:

-> /
-> city=MILANO/
----> part-00000-83a6308b-1301-48f2-9764-df574e50d3d7.c000.snappy.parquet
-> city=ROMA/
----> part-00007-e75e8418-347b-416d-84bc-9ac1bcaed385.c000.snappy.parquet
-> city=TORINO/
-> _delta_log/
----> 00000000000000000000.json
----> 00000000000000000001.json
----> 00000000000000000002.json
----> 00000000000000000003.json
----> 00000000000000000004.json
----> 00000000000000000005.json (NEW)
----> 00000000000000000006.json (NEW)

The unreferenced parquet files are gone: instead, we have two new delta logs that we can inspect to verify that the operation was performed successfully.

NOTE: After running the vacuum operation, some of the old table versions may become not available. This is expected as we are deleting some underlying data files.

{
"commitInfo": {
"operation": "VACUUM START",
"operationParameters": {
"retentionCheckEnabled": false,
"defaultRetentionMillis": 604800000,
"specifiedRetentionMillis": 0
},
"readVersion": 4,
"operationMetrics": {
"numFilesToDelete": "4",
"sizeOfDataToDelete": "6927"
},
...
}
}
{
"commitInfo": {
"operation": "VACUUM END",
"operationParameters": {
"status": "COMPLETED"
},
"readVersion": 5,
"operationMetrics": {
"numDeletedFiles": "4",
"numVacuumedDirectories": "4"
},
...
}
}

Schema evolution

Schema evolution is a feature that allows users to easily change a table’s schema to accommodate data that is changing over time. Most commonly, it’s used when performing an append (or overwrite) operation, to automatically adapt the schema to include one or more new columns.

Having in mind the risks of a flexible schema, it may be useful to consider the schema evolution feature when there is the need for:

  • adding a new column to the table
  • dropping an existing column from the table
  • other (changing a data type of an existing column, renaming an existing column, …)

In most cases, the table will only need a change in the metadata without having to rewrite data files.

NOTE: This is not the default Delta Lake behavior as it will stick to the Schema Enforcement if not told otherwise. We will see that feature later.

Let’s try to add a new column to our table by appending a new record with an additional column named address:

# Create a Spark dataframe with the new record
fields = table.toDF().schema.fields
df_append_new_col = spark.createDataFrame([
[13, "Gaspare", "Grigi", "M", 175, 67.4, "TORINO", datetime.strptime('2019-01-01 00:00:00', '%Y-%m-%d %H:%M:%S'), "Via Mazzini 14"]
], StructType(fields + [StructField('address', StringType())]))

When appending the new record, we need to specify the mergeSchema option to enable the Schema Evolution feature. Also, we need to read again the table to view the changes:

# Specify the mergeSchema otpion when writing
df_append_new_col.write.format("delta").mode("append").partitionBy("city").option("mergeSchema", "true").save(f"{BASE_PATH}/data/delta/people")

# Show table AFTER reading it again
table = DeltaTable.forPath(spark, f"{BASE_PATH}/data/delta/people")
table.toDF().sort("id")

The resulting table has the new address column: the record we just appended shows the expected value while all the previous ones have a null value.

The resulting table after adding a column with Schema Evolution enabled
The resulting table after adding a column with Schema Evolution enabled

As always, we can check the content of our table’s folder to verify that a new parquet file has been created in the city=TORINO partition.

-> /
-> city=MILANO/
----> part-00000-83a6308b-1301-48f2-9764-df574e50d3d7.c000.snappy.parquet
-> city=ROMA/
----> part-00007-e75e8418-347b-416d-84bc-9ac1bcaed385.c000.snappy.parquet
-> city=TORINO/
----> part-00007-a0d7c11c-cecf-4ad1-8d50-ac6c1cca6972.c000.snappy.parquet (NEW)
-> _delta_log/
----> 00000000000000000000.json
----> 00000000000000000001.json
----> 00000000000000000002.json
----> 00000000000000000003.json
----> 00000000000000000004.json
----> 00000000000000000005.json
----> 00000000000000000006.json
----> 00000000000000000007.json (NEW)

The new delta log clearly shows how the schema of the table has been updated with the new column (check metadata object):

{
"commitInfo": {
"operation": "WRITE",
"operationParameters": {
"mode": "Append",
"partitionBy": "[]"
},
"operationMetrics": {
"numFiles": "1",
"numOutputRows": "1",
"numOutputBytes": "1974"
},
...
}
}
{
"metaData": {
"id": "72fab31a-753d-4a04-8ae8-7fa9eae3fc3a",
"format": {
"provider": "parquet",
"options": {}
},
"schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"surname\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"gender\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"height\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"weight\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}},{\"name\":\"city\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}},{\"name\":\"ts_ins\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{}},{\"name\":\"address\",\"type\":\"string\",\"nullable\":true,\"metadata\":{}}]}",
"partitionColumns": [
"city"
],
...
}
}
{
"add": {
"path": "city=TORINO/part-00007-a0d7c11c-cecf-4ad1-8d50-ac6c1cca6972.c000.snappy.parquet",
"partitionValues": {
"city": "TORINO"
},
"dataChange": true,
...
}
}

Now we can try to drop the existing address column: In order to do that we first need to define some table properties: that creates a new version of the table with a new delta log.

-> /
-> city=MILANO/
----> part-00000-83a6308b-1301-48f2-9764-df574e50d3d7.c000.snappy.parquet
-> city=ROMA/
----> part-00007-e75e8418-347b-416d-84bc-9ac1bcaed385.c000.snappy.parquet
-> city=TORINO/
----> part-00007-a0d7c11c-cecf-4ad1-8d50-ac6c1cca6972.c000.snappy.parquet
-> _delta_log/
----> 00000000000000000000.json
----> 00000000000000000001.json
----> 00000000000000000002.json
----> 00000000000000000003.json
----> 00000000000000000004.json
----> 00000000000000000005.json
----> 00000000000000000006.json
----> 00000000000000000007.json
----> 00000000000000000008.json (NEW)

As always, we can visualize its content:

{
"commitInfo": {
...
"operation": "SET TBLPROPERTIES",
"operationParameters": {
"properties": "{\"delta.minReaderVersion\":\"2\",\"delta.minWriterVersion\":\"5\",\"delta.columnMapping.mode\":\"name\"}"
},
"readVersion": 7,
"isolationLevel": "Serializable",
"isBlindAppend": true,
"operationMetrics": {},
"engineInfo": "Apache-Spark/3.3.2 Delta-Lake/2.3.0",
"txnId": "9818ab54-4f0e-4538-bec1-152727253dd1"
}
}
...
}

Now we can drop the address column:

spark.sql("""
ALTER TABLE spark_catalog.default.people
DROP COLUMN address
"""
)

We need to read again the table to see schema changes:

# Show table AFTER reading it again
table = DeltaTable.forPath(spark, f"{BASE_PATH}/data/delta/people")
table.toDF().sort("id")

The resulting table does not have the address column anymore:

The resulting table after dropping a column with Schema Evolution enabled
The resulting table after dropping a column with Schema Evolution enabled

If we check the content of our table’s folder we notice new parquet files:

-> /
-> city=MILANO/
----> part-00000-83a6308b-1301-48f2-9764-df574e50d3d7.c000.snappy.parquet
-> city=ROMA/
----> part-00007-e75e8418-347b-416d-84bc-9ac1bcaed385.c000.snappy.parquet
-> city=TORINO/
----> part-00007-a0d7c11c-cecf-4ad1-8d50-ac6c1cca6972.c000.snappy.parquet
-> _delta_log/
----> 00000000000000000000.json
----> 00000000000000000001.json
----> 00000000000000000002.json
----> 00000000000000000003.json
----> 00000000000000000004.json
----> 00000000000000000005.json
----> 00000000000000000006.json
----> 00000000000000000007.json
----> 00000000000000000008.json
----> 00000000000000000009.json (NEW)

If we analyze the new transaction log we can see that:

  • the schema has been updated in the metadata object to reflect the changes
  • no data files have been rewritten: dropping a column is a metadata operation
{
"commitInfo": {
"operation": "DROP COLUMNS",
"operationParameters": {
"columns": "[\"address\"]"
},
"engineInfo": "Apache-Spark/3.3.2 Delta-Lake/2.3.0",
...
}
}
{
"metaData": {
"schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{\"delta.columnMapping.id\":1,\"delta.columnMapping.physicalName\":\"id\"}},{\"name\":\"name\",\"type\":\"string\",\"nullable\":true,\"metadata\":{\"delta.columnMapping.id\":2,\"delta.columnMapping.physicalName\":\"name\"}},{\"name\":\"surname\",\"type\":\"string\",\"nullable\":true,\"metadata\":{\"delta.columnMapping.id\":3,\"delta.columnMapping.physicalName\":\"surname\"}},{\"name\":\"gender\",\"type\":\"string\",\"nullable\":true,\"metadata\":{\"delta.columnMapping.id\":4,\"delta.columnMapping.physicalName\":\"gender\"}},{\"name\":\"height\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{\"delta.columnMapping.id\":5,\"delta.columnMapping.physicalName\":\"height\"}},{\"name\":\"weight\",\"type\":\"double\",\"nullable\":true,\"metadata\":{\"delta.columnMapping.id\":6,\"delta.columnMapping.physicalName\":\"weight\"}},{\"name\":\"city\",\"type\":\"string\",\"nullable\":true,\"metadata\":{\"delta.columnMapping.id\":7,\"delta.columnMapping.physicalName\":\"city\"}},{\"name\":\"ts_ins\",\"type\":\"timestamp\",\"nullable\":true,\"metadata\":{\"delta.columnMapping.id\":8,\"delta.columnMapping.physicalName\":\"ts_ins\"}}]}",
"partitionColumns": [
"city"
],
"configuration": {
"delta.columnMapping.mode": "name",
"delta.columnMapping.maxColumnId": "9"
},
...
}
}

Schema enforcement

The Schema Enforcement feature in Delta Lake ensures that data written to the data lake adheres to a predefined schema: when data is written to Delta Lake, Schema Enforcement validates the data against the predefined schema. It checks if the data adheres to the specified data types, field names, and any other constraints defined in the schema. If the data violates the schema, Delta Lake rejects the write operation, preventing inconsistent or incompatible data from being stored.

By enforcing schema compliance, Delta Lake provides data quality and consistency within the data lake. It prevents data inconsistencies, reduces the risk of data errors, and facilitates seamless schema evolution. The Schema Enforcement feature ensures that data conforms to the defined structure, making it easier to manage, process, and analyze data within the Delta Lake environment.

All these benefits help us understand why it is the default behavior for Delta tables (instead of Schema Evolution). Let’s see this feature in action: we can add a new column by appending a record with an additional column job column:

# Create a Spark dataframe with the new record
fields = table.toDF().schema.fields
df_append_new_col = spark.createDataFrame([
[14, "Lucia", "Turchese", "F", 171, 66.4, "ROMA", datetime.strptime('2019-01-01 00:00:00', '%Y-%m-%d %H:%M:%S'), "Programmer"]
], StructType(fields + [StructField('job', StringType())]))

This time we don’t need any specific option when appending the new record as Schema Enforcement is the default behavior:

# Append new record to the table
try:
df_append_new_col.write.format("delta").mode("append").save(f"{BASE_PATH}/data/delta/people")
except Exception as e:
print("Error when appending new record: ", e)

We immediately receive an error that states a schema mismatch is preventing the append operation from being performed successfully.

Error when appending new record:  A schema mismatch detected when writing to the Delta table (Table ID: 882ceacc-adae-4877-a4f8-9fe9c7513a01).
To enable schema migration using DataFrameWriter or DataStreamWriter, please set:
'.option("mergeSchema", "true")'.
For other operations, set the session configuration
spark.databricks.delta.schema.autoMerge.enabled to "true". See the documentation
specific to the operation for details.

Table schema:
root
-- id: integer (nullable = true)
-- name: string (nullable = true)
-- surname: string (nullable = true)
-- gender: string (nullable = true)
-- height: integer (nullable = true)
-- weight: double (nullable = true)
-- city: string (nullable = true)
-- ts_ins: timestamp (nullable = true)


Data schema:
root
-- id: integer (nullable = true)
-- name: string (nullable = true)
-- surname: string (nullable = true)
-- gender: string (nullable = true)
-- height: integer (nullable = true)
-- weight: double (nullable = true)
-- city: string (nullable = true)
-- ts_ins: timestamp (nullable = true)
-- job: string (nullable = true)

Now let’s see what happens when we try to drop a column: the operation is performed without any error.

# Create a Spark dataframe with the new record
fields = table.toDF().schema.fields
fields.remove(StructField('gender', StringType()))

df_append_drop_col = spark.createDataFrame([
[15, "Marcella", "Fucsia", 161, 56.4, "ROMA", datetime.strptime('2019-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')]
], StructType(fields))

# Write record
df_append_drop_col.write.format("delta").mode("append").partitionBy("city").save(f"{BASE_PATH}/data/delta/people")

If we read again the table we can verify the schema is still the same: the gender column has not been removed, it has been filled with null values.

# Show table AFTER reading it again
table = DeltaTable.forPath(spark, f"{BASE_PATH}/data/delta/people")
table.toDF().sort("id")
The resulting table after dropping a column with Schema Enforcement enabled
The resulting table after dropping a column with Schema Enforcement enabled

As always, we can check the table’s folder to verify that new parquet files have been created in every partition: these new data files contain the same records as before, with null values for the gender column.

-> /
-> city=MILANO/
----> part-00000-83a6308b-1301-48f2-9764-df574e50d3d7.c000.snappy.parquet
-> city=ROMA/
----> part-00007-57b11a8e-6c13-42f4-b94a-3cb6014b3ac3.c000.snappy.parquet (NEW)
----> part-00007-e75e8418-347b-416d-84bc-9ac1bcaed385.c000.snappy.parquet
-> city=TORINO/
----> part-00007-a0d7c11c-cecf-4ad1-8d50-ac6c1cca6972.c000.snappy.parquet
-> _delta_log/
----> 00000000000000000000.json
----> 00000000000000000001.json
----> 00000000000000000002.json
----> 00000000000000000003.json
----> 00000000000000000004.json
----> 00000000000000000005.json
----> 00000000000000000006.json
----> 00000000000000000007.json
----> 00000000000000000008.json
----> 00000000000000000009.json
----> 00000000000000000010.checkpoint.parquet (NEW)
----> 00000000000000000010.json (NEW)
----> _last_checkpoint (NEW)

We can check the new delta log to have more details. We can see that:

  • the schema has NOT been updated (as we are preventing it)
  • the new parquet file is now being referenced by the table.
{
"commitInfo": {
"operation": "WRITE",
"operationParameters": {
"mode": "Append",
"partitionBy": "[\"city\"]"
},
"engineInfo": "Apache-Spark/3.3.2 Delta-Lake/2.3.0",
...
}
}
{
"add": {
"path": "path": "city=ROMA/part-00007-57b11a8e-6c13-42f4-b94a-3cb6014b3ac3.c000.snappy.parquet",
"partitionValues": {
"city": "ROMA"
},
"dataChange": true,
"stats": "{\"numRecords\":1,\"minValues\":{\"id\":15,\"name\":\"Marcella\",\"surname\":\"Fucsia\",\"height\":161,\"weight\":56.4,\"ts_ins\":\"2019-01-01T00:00:00.000+01:00\"},\"maxValues\":{\"id\":15,\"name\":\"Marcella\",\"surname\":\"Fucsia\",\"height\":161,\"weight\":56.4,\"ts_ins\":\"2019-01-01T00:00:00.000+01:00\"},\"nullCount\":{\"id\":0,\"name\":0,\"surname\":0,\"gender\":1,\"height\":0,\"weight\":0,\"ts_ins\":0}}",
...
}
}

ACID Transactions

Now that we had a detailed look at how Delta Lake handles data and schema operations, we can discuss ACID (Atomicity, Consistency, Isolation, Durability) transactions as they play a crucial role in ensuring data integrity and consistency in data processing workflows. In particular:

  1. atomicity: it ensures that a transaction is treated as a single indivisible unit of work. In Delta Lake, when a transaction is executed, it follows a write-ahead log (WAL) protocol. All changes made within a transaction are first written to the transaction log. This log captures the changes and maintains a sequential record of all the modifications performed during the transaction.
  2. Consistency: it guarantees that a transaction brings the system from one valid state to another. In Delta Lake, consistency is achieved through schema enforcement. Delta Lake allows users to define a schema for the data stored in the data lake. When writing data, Delta Lake enforces the defined schema and ensures that the data being written adheres to the schema constraints. If any data violates the schema, the transaction is rolled back, and the data is not committed to the lake, maintaining consistency.
  3. Isolation: it ensures that concurrent transactions do not interfere with each other and that each transaction operates in isolation as if it were executed sequentially. Delta Lake achieves isolation by leveraging optimistic concurrency control (OCC). Multiple transactions can be executed concurrently on Delta Lake, and OCC allows them to proceed independently without locking the entire dataset. It performs validation checks during commit time to ensure that the modifications made by different transactions do not conflict. If conflicts are detected, Delta Lake resolves them by rolling back and retrying the failed transaction.
  4. Durability: it guarantees that once a transaction is committed, its changes are permanent and will survive subsequent failures. In Delta Lake, the transaction log, which captures all the changes made during a transaction, is stored alongside the data files in the data lake. This ensures that the log and data are both durable and persisted in a fault-tolerant manner. In the event of a system failure, Delta Lake uses the transaction log to recover the data and bring the system back to a consistent state.

Conclusion

In conclusion, Delta Lake has ushered in a new era for modern data lakes, offering enhanced reliability, scalability, and performance. By leveraging Delta Lake, organizations can overcome the challenges that traditional data lakes often face, including data inconsistency, poor data quality, and limited scalability. Delta Lake empowers data engineers and data scientists to focus on extracting meaningful insights from their data, rather than dealing with the complexities of managing and maintaining the underlying data lake infrastructure.

--

--