Getting hands dirty in Spark Delta Lake

Understanding the world of Delta Lake from with Hands On

Abid Merchant
Analytics Vidhya
8 min readNov 17, 2019

--

Hey Fellas,

This is 2nd part of my article in which I gave an overview of Delta Lake and its use cases, In case you happen to come directly here do give my first article a read to get some context. Here’s the link

So, In this article we will go through the amazing work Delta Lake does, the code is in Pyspark. Firstly, let’s see how to get Delta Lake to out Spark Notebook.

pip install --upgrade pysparkpyspark --packages io.delta:delta-core_2.11:0.4.0

First command is not necessary if you already have an upgraded spark, second command will start our pyspark shell with Delta package, Voila! nothing more to do for installation part. Please note that minimum Spark version required for Delta lake is Spark 2.4.2.

So, as per my previous article Delta Lake brings to us the following important operations:

  • Schema Enforcement
  • ACID transactions on Data Lake
  • Time Travel Capabilities

We will go each of these functionalities, first lets understand Schema Enforcement.

Schema Enforcement

As I said, with Delta Lake “schema on write” is followed, so any changes in schema when writing will be tracked and any discrepancy will raise an exception at that time. Below code will make a dataframe of 1–5 numbers and we will write it as a Delta table.

data = spark.range(1,5)
data.write.format("delta").mode("overwrite").save("/mnt/data/delta_sample")

Lets checkout the output file

As you can see write format is parquet. A log file is maintained which will contain all the transaction log. The log file is a JSON file which will containf metadata regarding the operations performed on our dataset and this log file will be read by spark first rather than directly reading the part files.

Now, I will make a dataframe with numbers from 5–10 and will give its datatype as String and will try to append the dataset on our existing dataset.

import pyspark.sql.functions as fn
new_data = spark.range(5,10)
new_data = new_data.withColumn("id",fn.col("id").cast("String"))
new_data.write.format("delta").mode("append").save("/mnt/data/delta_sample")

Performing below snippet of code led me to error which mentioned:

So, Delta lake stopped the incorrect data to go in our delta Lake, moreover on changing my column name from “id” to “id1”, I got another exception. So, as a whole any changed will not be allowed while writing to any existing dataset. Finally, I will append the dataset with correct schema.

new_data = spark.range(5,10)
new_data.write.format("delta").mode("append").save("/mnt/data/delta_sample")

Now, my dataset contains following files:

Let’s take a look at the log file:

So, and new log file is generated when I wrote my dataset, the log file contains following content:

It directs spark to add the part files which were newly written on dataset along with specifying information such as mode of write and modification time.

So, this is how schema on write is enforced and Delta Lake table works. Now, we will dig deeper and understand Merge, Update and Delete operations in Spark.

Deletion

Now, before performing the delete operation, lets read our table in Delta format, we will read the dataset we just now wrote. Follow the below lines of code.

from delta.tables import *delta_df = DeltaTable.forPath(spark, "/mnt/data/delta")

Delta_df is of type Delta table, we cannot perform spark dataframe operations on it, but that can be achieved by converting it into Spark Dataframe through delta_df.toDF() operation.

Now, we will delete the data where id is ≤2.

delta_df.delete("id<=2")

That’s it, Delta tables are auto refresh and any changes on the data we do here will directly be reflected on our dataset. So, just after performing the above delete operation, someone on another session reads our dataset will not find id≤2. After deletion, one more commit log was created, let’s checkout how the commit log is written for delete operation.

It guides spark to delete the original part files through remove and then add the new part file with predicate as id≤2, even operation performed(DELETE) is specified. So, the commit log is exhaustive and one shop stop for all the metadata of our dataset. Now, we will do updation on the dataset.

Updation

Now, in this segment we will try out the update operation on our data. We will read the dataset again and will update the value from 5 to 500. Checkout the code snippet for update operation

delta_df = DeltaTable.forPath(spark, "/mnt/data/delta")
delta_df.update(condition = "id = 5", set = { "id": "500" })\

The above operation is will set id to 500 where it is 5 and not to mention that as Delta Table is auto refresh the data is updated. As you can see the syntax is very simple and even a layman will understand it on the first go. So, we move to our final operation Merge.

Merge

Now, we will perform the merge operation on our Delta Table. Before performing Merge, I will read a new dataset containing Country, Year and Temperature columns and will write it as a Delta Table.

df = spark.read.csv("/mnt/data/dataset", inferSchema=True, sep=',', header=True)df.write.format("delta").save("/mnt/data/delta_merge")

Our Delta table contains following data…

Now, we have delta feed that has to be merged into this Delta Table in a Spark Dataframe, the data is…

We received 2 new records out of which Australia is to be added and India has to be merged into our Delta Table.

delta_merge.alias("delta_merge").merge(
updatesDf.alias("updates"),
"delta_merge.country = updates.country") \
.whenMatchedUpdate(set = {
"temperature" : "updates.temperature",
"year" : "updates.year"
} ) \
.whenNotMatchedInsert(values =
{
"country": "updates.country",
"year": "updates.year",
"temperature": "updates.temperature"
}
) \
.execute()

The merge syntax closely resembles to merge syntax in Hive. Firstly, we have given our table table an alias as “delta_merge” and our Spark Dataframe as “updates”. Now, where country column is matched in both the dataset’s, we will updated the temperature and year column from Spark Dataframe to our Delta Lake and when not matched we will insert all the columns into our Delta Lake, simple isn’t it!

Final merged records will look like…

So, temperature of India got updated to 100.0 and year got updated to 2019 and that’s all about the Merge Operation.

Time Travel

This is the coolest part of Delta Lake. As explained in my first article, with Delta Lake we will be able to maintain different versions of our dataset’s and can be reused when needed. In this section we will have a closer look as to how we can get different versions of our dataset.

Before explaining how to get any version of data, first I will show you how to know which version data I have to get, but how will I know when I last updated my data, what I updated and how many versions are actually created? To know that, Delta Table maintains a history which can be get by calling history() method which will return a Spark Dataframe so we can query it and know the version we want to get. History data will look like:

So, as you can see it contains each detail we need to understand the operations done on our data. Now, I will read the dataset created by us intially which contained only one column “id” consisting of numbers. We played around with that dataset but while doing so all our activities were tracked and nothing was unseen by Delta Lake. Without further ado I will read the dataset.

delta_df = DeltaTable.forPath(spark, "/mnt/data/delta")

Final version of our dataset contained:

Now, I will get the dataset of “version 1" and as the history table displays, we had append data to our existing dataset, and if you can travel back in your mind we appended 5–10 numbers to our dataset. So, now lets get that data in out Spark Dataframe:

version_1 = spark.read.format("delta").option("versionAsOf",1).load("/mnt/data/delta")

This will give us the data consisting from numbers from 0->9 in Spark Dataframe, you can specify version you want and get your lost baby back in stroller to play around.

Apart from these things, one more important feature of Delta Lake is deleting the old version data after some time. Cause, we don’t want our storage system to blast after certain period of time. For that, we have function known as vacuum() which will recursively delete files and directories in the table that are not needed by the table for maintaining older versions up to the given retention threshold. This method will return an empty DataFrame on successful completion.

deltaTable.vacuum()     # vacuum files not required by versions more than 7 days old

deltaTable.vacuum(100) # vacuum files not required by versions more than 100 hours old

So, that’s all Folks, hope you find my article helpful.

In case you want to verify what I wrote is actually true : please find the below links for your reference.

--

--