Apache Spark — Delta Lake

Delta lake is a open source storage layer with support of ACID transactions to Apache Spark and big data workloads. Other powerful features include unification of streaming and batch data processing, time travel(data versioning) and scalable metadata handling.

This article will go into some details on how to start spark shell with delta support, delta transaction logs, time travel on delta tables, cleaning up of data .

When to use delta vs parquet

Delta storage format is nothing but Apache parquet files backed by transaction logs(data versioning). Delta format has similar write performance benefits as with parquet. When it comes to read and data transformation operations, delta is superior over parquet format.

So, when should we go for delta ? …

Well, when we have use cases where we need to do frequency updates, deletes and merge(upserts) operations on data. Delta probably is a bit much if we are just dumping in data and has no real updates/deletes planned.

Start spark shell with delta support

To explore more on delta operations, we can start the spark shell with delta handle.

./pyspark --packages io.delta:delta-core_2.12:0.8.0 --conf "spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" --conf "spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension"

The above should add delta-core jars to driver and executor class path, looking at few different places like local maven repo, or any configured remote repositories or central repository.

Read and write

Reading data from and writing to delta table is straightforward. Just specify the format type as “delta”.

read and write — delta format

After executing above commands, at this point the data is stored in delta format. When we do a “overwrite”, the spark’s api drops and recreate table using new data frame.

If “userMetadata” option is set in the DataFrameWriter or spark.databricks.delta.commitInfo.userMetadata in sparkSession , it is recorded in userMetadata column in delta lake history.

Updating data

For instance if there is a need to update some data in the existing table using spark’s framework without delta support, then we may have to first Select data that needs to be updated into a new dataframe, then make changes to data, later join original data frame with the new dataframe and finally overwrite the table using new dataframe .With all these steps, all we can achieve is a overwrite not exactly an update.

Updates on delta tables can be done using DeltaTable update api. DeltaTable is the entry for interacting with tables programmatically.

To do an update on delta table — access delta table using table path or table name and execute an update statement.

Access and update delta table
# Hive metastore based tablename
deltaTable = DeltaTable.forName(sparkSession, tableName)

Merge

Upserts (Update + Insert) and insert with deduplication can be performed using merge api. According to the documentation, there are some constraints that needs to be followed while using DeltaMergeBuilder. Merge basically updates target table, based on condition. The merge can take place, depending on when clauses(ranging from 1 to 3)used, at most 2 whenMatched clauses and at most one whenNotMatched clause is allowed.

merge — whenMatched clause

Deleting data

Delta handles deletes using multi version concurrency control, by allowing multiple version of same data to be present.

example — delete operation

vacuum

vacuum is table utility command that cleans up files(files corresponding to Delta table) that are over retention period and are no longer referenced by delta table and any files that are not managed by delta. This command needs to be run manually, as it is not automatically handled. Default retention period is 7 days. It can be set as a table property in CREATE or ALTER commands.

VACUUM table_name/path [RETAIN num HOURS] [DRY RUN]Example:
VACUUM db_name.table_name retain 200 hours DRY RUN
# Table property to set data file retention.
delta.deletedFileRetentionDuration = "interval <interval>"
Example:
delta.deletedFileRetentionDuration = "interval 15 days"

vacuum does not delete log files. Log files are automatically deleted after log file retention period(30 days) is reached. When vacuum operation is run on delta table, it ignores all directories starting with _ (_delta_log and any other directories that starts with _ gets ignored too). In above example, if vacuum is run after 15th day a delta data file is created , it is candidate for deletion.

Each table may require different retention interval. Lets just say if a delta table is not often accessed for any kind of commits, then having slightly larger retention interval may be a good option. Likewise, if a table has more commit traffic, then increasing retention period takes up storage space, as it has to account for all the delta files.

Time Travel

It is nothing but an ability that delta lake provides to access different versions of table data based on the version number. In the transaction log(json file) , version number along with other information gets recorded after every commit(Update, delete, merge , insert). Time travel can be done based on version or timestamp. By default time travel back on a delta table can be done till 30 days.

# Based on timestamp
df = (spark.read.format("delta").option("timestampAsOf", "2022-05-13").load("/path/to/delta_table"))
example — time travel based on version

The ability to time travel back to a older version is lost after running vacuum, as vacuum operation deletes data files conforming to retention period. In the same way time travel also gets affected if log file and data file retention periods are changed.

delta.logRetentionDuration = "interval <interval>"
Example:
delta.logRetentionDuration = "interval 10 days"

Delta transaction log

Transaction log keeps track of changes that are made to delta tables. Atomicity of transaction is guaranteed because each commit is isolated transaction that is recorded in ordered transaction log.

Also delta lake applies highest level of isolation which is serializable(2PL or 2 phase locking — acquires both shared lock and exclusive lock), and this ensures that concurrent reads are executed in sequence. In all of the above examples, the table isolationLevel is indicated as null, it can be changed by executing alter statement. The default level of serialization is WriteSerializable.

ALTER TABLE <tabeName> SET TBLPROPERTIES ('delta.isolationLevel' = 'Serializable')

When table in delta format is created, table’s transaction log(json file per commit) is automatically created by the name _delta_log. This sub directory hosts json files and checkpoint files. A checkpoint file is created by default for every 10 commits.

Checkpoint file after every 10 commits

Checkpoint file gets created in parquet format. It is a snapshot of table state(which is cumulative state until the commit) at a point in time the file is created. Checkpointing can be done manually also. In reconstruction of delta table state, checkpoint file plays a key role. Whenever a read operation is done, spark framework skips to the latest checkpoint file and applies the commits made after the selected checkpoint file to arrive at current state. Checkpoint files saves time during reads.

As a final note, this article covered the basics of delta lake tables to get started and I learned as I wrote this. There is sea of documentation available from Databricks, keep referring back to it. One more part worth looking at is on optimizations https://docs.databricks.com/delta/best-practices.html

Happy learning!

References:

https://docs.delta.io/latest/delta-update.html

--

--

--

Software Engineer interested in Data | ML | AI

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Documenting Spring Boot Rest API With OpenAPI 3.0

Export is simplified in bed products in all over theworld https://t.co/4C3ncXlvz0

Tower of Hanoi in c programming

Initial Impression of WordPress 5 from a former WordPress Startup CEO

#D17 Hack

How to hide a class in kotlin

Download GS Paper-1 &amp; CSAT Paper-2 (Set A/B/C/D)

Presented by HyperDAO ✨

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Hareesha Dandamudi

Hareesha Dandamudi

Software Engineer interested in Data | ML | AI

More from Medium

Map vs MapPartitions in Apache Spark

Spark Application Tuning on Contended Driver

Spark Shines brighter with Project Tungsten

Putting Hadoop, Hive, and Spark together for the first time