What’s inside DELTA LAKE
Delta Lake is the term you would have heard about or read about in 100s of blogs or you may have even used it in your project.
The intentions of this blog is not only talk about Delta Lake and its concepts, but to familiarise you on how does it work under the hood.
Before we get any deeper let’s set the base.
What is Delta Lake?
Delta Lake is an open format storage layer that delivers reliability, security and performance on your data lake — for both streaming and batch operations. By replacing data silos with a single home for structured, semi-structured and unstructured data, Delta Lake is the foundation of a cost-effective, highly scalable lakehouse.
Features of Delta Lake
· ACID Compliant
· Schema Enforcement
· Support for streaming data
· Time Travel
· Upsert and Delete compatible
Components of Delta Lake
- Delta lake Storage Layer
We store the data using delta lake and access the data using Spark. This Approach provides the performance boost and as well as the consistency of the data.
2. Delta Tables
Parquet files: Delta table stores the data in the Parquet file format.
Transaction logs: Its an ordered entry of every transactions has even been performed in this table
Upon firing any query on top of the delta table, spark checks the transaction logs to see what’s the new transactions which has been posted to the table. And updates the table with the new changes.
This ensures that the user table is always synchronised with the latest changes
metastore: Stores the schema and metadata.
3. Delta Engine
Delta Engine is a high performance, Apache Spark compatible query engine that provides an efficient way to process data in data lakes including data stored in open source Delta Lake. Delta Engine accelerate data lake operations, supporting a variety of workloads ranging from large-scale ETL processing to ad-hoc, interactive queries. Many of these optimisations take place automatically; you get the benefits of these Delta Engine capabilities just by using Databricks for your data lakes.
Lets get inside the Delta Tables
When we talk about Delta table, how is it different from the normal Table?
Delta Table mainly consists of below 2 components.
Parquet files: The file format by which the files/data are stored.
Transaction logs: It’s an ordered record of the transactions which occurred on the delta table. Consider this as a Source Of Truth which will be used by the delta engine to guarantee the atomicity.
Visualising the Delta Table path:
Let’s dig deeper inside the _delta_log folder.
delta_log folder contains JSON files which keeps track of all the transactions occurred for the given table.
Soon will have demo and shall talk about it more.
Delta Lake time travel
Using Delta lake time travel we can refer to the previous versions of the data.
Delta table, tracks every transaction and at a given point in time it gives the flexibility to refer to any previous snapshots.
Time travel would help in
Comparing the previous version of the data with current
In case of data corruption, we can go back to a previous snapshots.
Let’s create a simple dataframe by using a CSV file.
Lets convert the data to Delta by specifying the format as “delta”
We can also create the table in the metastore.
Delta supports the partitioning of the data. So lets use Year as our partitioning column.
Lets see what do we have under the delta path:
ROW 6,7,8,9: are the parquet files which got created when we inserted the data into the table for the first time.
ROW 1,2,3,4: are the partition folders which got created when we inserted the data into the same delta path but as partitioned.
ROW 5: _delta_log: which contains the transaction logs
Lets dig the the delta_log:
ROW 5,6: are the transaction logs which holds the transactions which occurred against the table
The four columns each represent a different part of the very first commit to the Delta Table, creating the table.
add:contains the information about the individual columns.
commitInfo:Contains the details about the type of the operation READ or Write. plus the timestamp and the user.
metaData:Contains the schema information.
protocol:Contains the delta version.
Since we have applied 2 write functions.
- Initial write by reading from CSV > version 0
- Write on the partitioned column > version 1
We can look at the version 1 file and confirm that it captured the change.
PartitionValue contains the column by which we have partitioned.
You can confirm the same in the below screenshot.
Let’s write our version 3, Put a filter on the same data and write it back to the same delta path.
So even after we did the overwrite, we can still see that the data pertaining to previous commits still exists.
Access previous versions of table using Time Travel
Until now we have performed 3 writes:
- Read the initial CSV file and written to delta path
- Written the csv file partitioned on YEAR
- Applied the filter on the BasePay column and written to the delta path
So as per our 3 writes we have 3 different versions available in the table.
We can travel back to the specified time or the version.
To access the specific version we can pass “versionAsOf” with the version number.
To travel back to the specific timestamp time we can pass “timestampAsOf” with the timestamp value.
Vaccum is the command which can be used to clean up our directory and delete any previous version of the data.
We get the error while we try to delete the previous version. This is by design to have a fail safe against the accidental deletes.
We can bypass the default retention period check by setting the “spark.databricks.delta.retentionDurationCheck.enabled” to false.
And to confirm that that the version 0 is deleted, let’s try running the read command for the version 0.
Now since we deleted the version 0, we should still see version 1 which is a partitioned write.
Recursively vacuums directories associated with the Spark table and remove uncommitted files older than a retention threshold. The default threshold is 7 days. Databricks automatically triggers
VACUUM operations as data is written.
VACUUM [ table_identifier | path] [RETAIN num HOURS]
To clean up uncommitted files left over from Spark jobs, use the
VACUUM command to remove them. Normally
VACUUM happens automatically after Spark jobs complete, but you can also run it manually if a job is aborted.
VACUUM ... RETAIN 1 HOUR removes uncommitted files older than one hour.
// recursively vacuum an output path
spark.sql("VACUUM '/path/to/output/directory' [RETAIN <N> HOURS]")
// vacuum all partitions of a catalog table
spark.sql("VACUUM tableName [RETAIN <N> HOURS]")
VACUUM (Databricks SQL)
Learn how to use the VACUUM syntax of the SQL language in Databricks SQL.
You may find the Above codes in my github repo:
Delta Lake on Databricks - Databricks
Delta Lake is an open format storage layer that delivers reliability, security and performance on your data lake - for…
Delta Lake - Reliable Data Lakes at Scale
Scalable Metadata Handling In big data, even the metadata itself can be "big data." Delta Lake treats metadata just…
Learn how to use the VACUUM syntax of the SQL language in Databricks for Spark and Delta tables.
Top 5 Reasons to Convert Your Cloud Data Lake to a Delta Lake
If you examine the agenda for any of the Spark Summits in the past five years, you will notice that there is no…
BigData Engineer — Love for Bigdata, Analytics, Cloud and Infrastructure.
Subscribe to my: Weekly Newsletter Just Enough Data