Delta lake , ACID transactions for Apache Spark

Want cool Future Vision Merch? Check out our store here

An open source storage layer by Databricks, creators of Spark to create easier and reliable Enterprise Data Lakes both On prem and Cloud.

This was one of the big anouncements made in this years Spark + AI Summit. Initially this feature was only available on the Databricks platform, but has been open sources now with permissive Apache License V2.

Image from delta.io : How delta would fit in your existing Data lake

What is Delta Lake?

Delta Lake is basically a compute layer that would sit on top of your existing On Prem HDFS cluster, your favourite Cloud storage or even run it locally on your laptop(Best part)! Data is stored on the above-mentioned storage as versioned Parquet files. Any data that is read using Spark can be used to read and write with Delta Lake. Delta lakes provides an unified platform to support both Batch Processing and Stream processing workloads on a single platform.

Why do I need it?

Below are some of the key features that I found interesting!

  1. ACID transactions on Spark : In a typical Data lake, a lot of users would be accessing ie Reading and writing the data in it and it is really important that the data integrity is maintained. ACID is key feature in majority of the databases but when it comes to HDFS or S3 generally it is very hard to give the same durability gaurantees that ACID databases provide us. Delta Lake stores a transaction log to keep track of all the commits made to the table directory to provide ACID transactions.It provides Serializable isolation levels to ensure the data consistent across multiple users.
  2. Unified Batch and Stream Processing: In a Data lake, if we have an use case of both Stream processing and Batch processing it is normal to follow Lamdba architecture. In Data lake, data coming in as Stream (maybe from kafka) or any historical data you have (say HDFS) is the same table.It gives an unified view of both these 2 different paradigms.Streaming data ingest, batch historic backfill, and interactive queries work just out of the box without much of the extra effort.
  3. Schema Enforcement: Data Lake helps to avoid bad data getting your data lakes by providing the ability to specify the schema and help enforce it.It prevents data corruption by preventing the bad data get into the system even before the data is ingested into the data lake by giving sensible error messages.
  4. Time Travel : Data in the data lake will be versioned and snapshots are provided so that you can query them as if that snapshot was the current state of the system. This helps us to revert to older versions of our data lake for Audits, rollbacks and stuff like that.

Get started with it:

I would be sharing some scala examples.I will adding some Python examples in my next article.

Firstly, we have to add the Library dependency in our Scala SBT file.

"io.delta" %% "delta-core" % "0.1.0",

If you are using spark-shell:

spark-shell --packages io.delta:delta-core_2.12:0.1.0

We can use the DataframeReader/Writer to read or write data to the Delta lake.

val df = Seq( (1, "IL", "USA"),(2, "IL", "USA"),(3, "MO", "USA"),(4, "IL", "USA"),(5, "KA", "INDIA"),(6, "MEL", "AUS")
).toDF("id", "state", "country")
df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("delta/myDeltaTable")
val df1 = spark.read.format("delta").load("delta/myDeltaTable")
df1.show()

The overwriteSchema comes in handy when overwriting a table as By default, overwriting the data in a table does not overwrite the schema even the mode is overwrite .

To query an older snapshot of a Delta Lake table

We can query the state of the table either based on a timestamp or the version number.

Lets say we update the above dataframe.

val df2 = df1.withColumn("versionedcountry",lower(col("country"))).drop("country")
df2.write.format("delta").mode("overwrite").option("overwriteSchema", "true").save("delta/myDeltaTable")
df2.show()
+---+-----+--------------+
| id|state|versioncountry|
+---+-----+--------------+
| 5| KA| india|
| 6| MEL| aus|
| 3| MO| usa|
| 2| IL| usa|
| 4| IL| usa|
| 1| IL| usa|
+---+-----+--------------+
spark.read.format("delta").option("versionAsOf", 0).load("delta/myDeltaTable").show()
+---+-----+-------+
| id|state|country|
+---+-----+-------+
| 5| KA| INDIA|
| 6| MEL| AUS|
| 1| IL| USA|
| 2| IL| USA|
| 3| MO| USA|
| 4| IL| USA|
+---+-----+-------+
spark.read.format("delta").option("timestampAsOf", "2019-04-27 15:18:37").load("delta/myDeltaTable").show()
+---+-----+--------------+
| id|state|versioncountry|
+---+-----+--------------+
| 5| KA| india|
| 6| MEL| aus|
| 3| MO| usa|
| 2| IL| usa|
| 4| IL| usa|
| 1| IL| usa|
+---+-----+--------------+

The timestamp can be date or timestamp strings.

Schema Validation

Lets say we get another similar df but with same different schema.Here everything is same but only the datatype of column id is different than the id we saved previously which was an IntegerType. It will throw

val df = Seq( ("1", "IL", "USA")).toDF("id", "state", "country")

It will throw an error immediately when there is schema mismatch.

df.write.format("delta").mode("append").save("delta/myDeltaTable")
Exception in thread "main" org.apache.spark.sql.AnalysisException: Failed to merge fields 'id' and 'id'. Failed to merge incompatible data types IntegerType and StringType;;
at org.apache.spark.sql.delta.schema.SchemaUtils$.$anonfun$mergeSchemas$1(SchemaUtils.scala:526)

When there is some missing data and the rest of the schema matches, that field is set to null.

val df = Seq( (100, "CA")).toDF("id", "state")
df.write.format("delta").mode("append").save("delta/myDeltaTable")

spark.read.format("delta").load("delta/myDeltaTable").show()
+---+-----+--------------+
| id|state|versioncountry|
+---+-----+--------------+
| 5| KA| india|
| 6| MEL| aus|
| 3| MO| usa|
| 2| IL| usa|
| 4| IL| usa|
| 1| IL| usa|
|100| CA| null|
+---+-----+--------------+

But if there are any new column, it will throw an Analysis exception immediately.

val df = Seq((100, "CA")).toDF("id", "state","New")
df.write.format("delta").mode("append").save("delta/myDeltaTable")
Exception in thread "main" java.lang.IllegalArgumentException: requirement failed: The number of columns doesn't match.
Old column names (2): _1, _2
New column names (3): id, state, New

Features not yet available:

Delta Lake does not support multi-table transactions and foreign keys.
Delta Lake supports transactions at the table level.
Delta does not support the DStream API.

Thanks for reading! Please do share the article, if you liked it. Any comments or suggestions are welcome! Check out my other articles here.