Delta Lake, when reliability meets performance

Wassim Almaaoui
Untienots
Published in
11 min readJan 11, 2022

Delta Lake is an open source library that provides useful properties to your data lake. It’s a young and wonderful project that is getting more and more popular and supported by cloud storage systems and frameworks.

Delta Lake provides ACID transactions, scalable metadata handling, and unifies streaming and batch data processing on top of existing data lakes, such as S3, ADLS, GCS, and HDFS.

Usually file based stores are preferable for data lakes to store and process massive raw data, which are sometimes unstructured, in a cost effective way. Whereas databases are usually used for data warehouses to handle structured and preprocessed data for specific use cases. Delta lake, in turn, is one of the technologies that promote a new pattern called data Lakehouses, combining the best of the two approaches and thus simplifying the overall architecture. check this blog if you want to learn more about this paradigm shift.

Actually, you could see it as a file format based on Parquet which provides further metadata that makes it “smarter” than other formats (Parquet, CSV, JSON, ORC, etc..).

Delta may be interesting to you if you have ever wondered about:

  1. How to manage concurrent reads and writes into the same folder?
  2. What happens if a job fails after having already written some data?
  3. How to delete specific data from your folder?
  4. What did your folder look like a few days ago? For rollback, debug or audit needs
  5. How to merge multiple small files? The infamous streaming problem!
  6. For a streaming job, how to achieve an “exactly once guarantee” when using a distributed processing engine? How to rollback already written files if the stream fails?
  7. When appending new files to a folder, how to make sure that its schema is the same as existing ones or is compatible?
  8. What happened when reading a folder containing files with different schemas?

We can summarise Delta features to respond to the above questions into

  • ACID Transactions (question 1 and 2)
  • Upserts and deletes (question 3)
  • Time travel (question 4)
  • Streaming and batch unification (question 5 and 6)
  • Schema enforcement (question 7 and 8)

In a series of small articles, we will go through these questions, highlight them using simple concrete code examples and explain how Delta, thanks to a simple and lightweight implementation, offers a more efficient and reliable data lake.

In this first article, we will focus on the two first questions and we will dig into the ACID transactions introduced by Delta.

For this we will be:

  • Comparing Delta with Parquet since it’s probably the most popular file format when it comes to data lakes
  • Using Spark to read and write data, as Delta Lake has been created for it
  • Referencing a Delta folder as a table, which is the most common designation.

Setting up Delta Lake

Setting up Delta is as simple as adding the delta lake dependency to your project via your package manager (maven, sbt..).

For the purpose of this article, we will be adding the dependency when running spark-shell directly

spark-shell --packages io.delta:delta-core_2.12:1.0.0

To choose the right version of Delta, check this page

Data consistency concerns in a data lake

Concurrent reads and writes could lead to multiple issues if you are not using Delta!

Reading intermediate empty table

During the writing process, readers may see an empty table (intermediate state) as spark first deletes the whole table before starting the processing of the last stage that will write to the table. Below, an example to highlight this issue.

Write a parquet table for the first time.

List the written files

ls /tmp/parquet_table
_SUCCESS
part-00004-55cf09d0-cb7c-460c-95c6-dcdc9829fe9c-c000.snappy.parquet
part-00000-55cf09d0-cb7c-460c-95c6-dcdc9829fe9c-c000.snappy.parquet part-00006-55cf09d0-cb7c-460c-95c6-dcdc9829fe9c-c000.snappy.parquet
part-00001-55cf09d0-cb7c-460c-95c6-dcdc9829fe9c-c000.snappy.parquet part-00007-55cf09d0-cb7c-460c-95c6-dcdc9829fe9c-c000.snappy.parquet
part-00003-55cf09d0-cb7c-460c-95c6-dcdc9829fe9c-c000.snappy.parquet

Overwrite the table, this actually happens when you want to refresh the table and thus write a new version of it. The sleep is to simulate some processing that is slow enough to see the issue.

Listing the folder files during the writing

ls /tmp/parquet_table
_temporary

If any other processing job reads the table during this time, it will fall on an empty table. The previous table state has been removed before the new version was written..

Reading partial or corrupted table

If you were lucky not to read the table then, you may not be lucky enough to not read a partially written table.

In fact, spark, as a distributed processing framework, has multiple executors that work independently and each will output its result (files) directly to the final table. For consistency concerns, the executors first write their results to a _temporary folder (like what we have seen above) and then move it to the final location if everything went well. This process is controlled by Hadoop connectors and it’s called Commit. There are two built-in versions of the commit algorithm:

Hadoop commit V1: Move all files at the end of the job, once all temporary files have been completely written.

Hadoop commit V2: Move each file as soon as it has been completely written. Don’t wait for other files to be written.

The Hadoop commit V2 is more (x5) performant as it starts moving files in parallel as soon as they are available and it’s now the default version in Hadoop, controlled by the parameter mapreduce.fileoutputcommitter.algorithm.version.

However, this introduces a new inconsistency issue, what happens if some executors (or tasks to be more precise) succeed but not all of them?

Let’s simulate a spark job that writes half of the data before failing!

Brief explanation of the above job:

  • We create a dataframe composed of 100000 rows and split into 8 partitions (we expect 8 files in the output table if everything went well)
  • We convert it to RDD, so that we can use mapPartitionsWithIndex, only available on the RDD API. The index will go from 0 to 7 (8 partitions).
  • We want to introduce a failure after having processed and written half of the data. As all partitions could be processed in parallel by spark, I add a sleep of 10s for the failing partitions to give enough time to the other ones to finish!
  • We convert back to dataframe to write it easily in parquet.

Result:

The job fails as expected with the following message

21/12/31 11:46:04 ERROR Executor: Exception in task 4.0 in stage 7.0 (TID 60)
java.lang.Exception: Some failure happened

The folder state

ls /tmp/parquet_table/
part-00000-a95bcfe2-4f14-4ae6-a819-f7494ed9fc53-c000.snappy.parquet part-00002-a95bcfe2-4f14-4ae6-a819-f7494ed9fc53-c000.snappy.parquet
part-00001-a95bcfe2-4f14-4ae6-a819-f7494ed9fc53-c000.snappy.parquet part-00003-a95bcfe2-4f14-4ae6-a819-f7494ed9fc53-c000.snappy.parquet

Only 4 out of 8 expected files have been written. This means that if your job fails, you can end up with a corrupted table… which is not ideal at all!

And even if everything went well, you could have read partial data if you requested the table during the writing process.

Concurrent writes

Let’s imagine the following scenario:

  • Job A reads a table “parquet_table” to update a column (table size at this point: 10 rows).
  • Job B adds 5 new rows to “parquet_table” (table size at this point: 15 rows).
  • Job A has finished the processing and writes back the updated data to the same location with mode “overwrite” (table size at this point: 10 rows again!!).

In such a scenario we would lose data (the 5 rows added meanwhile) silently — without any job failure or alert..

Let’s summarise the consistency issues..

If some processing jobs read a table while it’s being updated, they could fall on an empty, partial or corrupted table.

Also, current writing to the same table could lead to losing updates.

This could result in a totally incorrect output, which could have huge business implications.

The same considerations and issues apply if we were updating only one partition and not the whole table.

Some may argue that this could be avoided only by good scheduling.. But it’s not enough because:

  • Some business cases can require ad-hoc job runs, for example BI users can trigger queries at any time, ad-hoc upload of files by an end user, a Machine learning experiment..
  • Exploitation (debug, replay, audit, migrations, fixes..) can also require manual job runs.
  • Some workflows are independent by design. For example, workflow B uses a mapping table updated by workflow A but you can tolerate that the table is not always up to date, you can, for simplicity and reliability concerns, separate the two workflows’ scheduling.

It’s always a good practice to fix the issues where they appear and not just move them. For example, for data modelling, if we want unicity in a table, it’s preferable to enforce it at the database level than managing it at the application level.

How to avoid the above issues?

At Untienots, data pipelines are our core business and we have experienced all the above issues. To make our file-based data lake more reliable, we started thinking about developing some in-house library that:

  • Add some “smart” properties to synchronise concurrent reads and writes based on some kind of locking, snapshotting, versioning tables, adding metadata, etc.
  • Is smart enough to look like a database, having a state, protocol..
  • But stay simple and reliable like writing a file (without introducing a database nor an always-on service)
  • Doesn’t have significant performance overhead.
  • Ideally without an additional cost.

Fortunately we figured out the existence of Delta Lake before starting developing such a tool because it was a perfect fit for all our needs and requirements. So how does it work?

Inside Delta Lake

How Does it work?

Let’s try to write a delta table. We can create only two partitions (two files expected) to make it easier to compare the folder state and follow the changes.

Folder state

tree /tmp/delta_table
/tmp/delta_table
├── _delta_log
│ └── 00000000000000000000.json
├── part-00000-e4c6e1f3-b488-f37d33e2da13-c000.snappy.parquet
└── part-00001-4d988aef-a043-2c6d15406043-c000.snappy.parquet

Delta writes the data in Parquet, and writes in addition some metadata in a sub-folder _delta_log.

This folder contains one json file per update operation (write, update, delete..) that describes the changes made on the table. Let’s take a look at the file content. We hid the non relevant fields so far for clarity (we will see more fields in the next articles).

Useful information we can see in the file:

  • The timestamp of the operation
  • The type (write) and parameters of the operation (overwrite, non partitioned)
  • Some metrics: we wrote 100 rows
  • The schema
  • The names of the two files added to the table.

The key concept behind Delta implementation, is that updating a table or a partition won’t delete the old data immediately, but instead, it would mark them as “deleted” and write the new files next to the old ones in the same folder. The workflow is as follows:

  • Writing new parquet files in the same folder
  • Adding, at the end of the operation, a new json metadata file where we indicate the newly added files and the ones that should be excluded from the table. This step is called commit. From that moment, the change is exposed to readers!
  • When reading, the reader will first read the metadata to list the “good” files and read only those ones.

Let’s see this in practice. We will overwrite the above table which already contains 100 rows, by a dataset of 50 rows.

The folder now contains the following files:

  • Two new parquet files, besides the two older ones.
  • A new json file to describe the new transaction
tree /tmp/delta_table
/tmp/delta_table
├── _delta_log
│ ├── 00000000000000000000.json
│ └── 00000000000000000001.json
├── part-00000-d8ee29f8-a202-66cc6a25b845-c000.snappy.parquet
├── part-00000-e4c6e1f3-b488-f37d33e2da13-c000.snappy.parquet
├── part-00001-4d988aef-a043-2c6d15406043-c000.snappy.parquet
└── part-00001-7e2ec721-8f3e-540cd8d4d439-c000.snappy.parquet

The content of the new json/metadata file

According to the above file:

  • The operation type is an overwrite
  • Adds 50 rows
  • Adds 2 new files
  • Removes 2 old files.

The old files are marked as removed even if the files are still physically there. We will see later that this mechanism has many advantages. Readers, now, will skip these files when they want to read the current state of the table.

Here is an example of reading a delta table.

How does this help to avoid reading an empty, partial or corrupted table?

As long as the update of the table is not finished, the _delta_log folder is not updated. The readers, meanwhile, will fall on the previous version of the table which is not up to date but always complete and coherent. Even if the new change has been committed during the reading, it will not affect it as the reader has already read the metadata and will ignore new files.

Once the new change has been committed, all readers will read new files and ignore old ones.

If a job fails after writing some data, the commit won’t happen and the readers will continue to see the older version.

We speak about transactions or atomic update, because no matter how much time writing the data takes, the update from a reader point of view is immediate.

To learn more about how transactional writes works in Delta, check this well explained blog.

How does it help to prevent errors on concurrent writes?

Delta uses an Optimistic Concurrency Control algorithm to figure out if two concurrent writes are compatible or not. Two concurrent writes could be:

  • Compatible: if they don’t change the same portion of data (same files). For example, if they will update different partitions. In this case both operations could be applied without any issue.
  • Conflicting: if they try to update the same files. Delta can figure out which files would be impacted by each commit and the reading version of each writing job (what was the table version when that job had read that table). In this case one of the writers will succeed and the other will fail. If you have a retry policy, the failed job will be rerun and work again.

In both cases we don’t have silent error like with Parquet.

To learn more about the transaction log (the json file) and managing concurrent writes, check this wonderful blog

To learn more about which operations can conflict, check the official documentation

Data is being duplicated

Delta relies on duplicating data. This could have a cost. You should clean up older versions (vacuum) yourself to maintain a reasonable size of the table.

This will delete the old two files in our folder.

Keeping old versions could be interesting to rollback the table or for debug/audit purposes. So choosing how many versions we should keep depends on:

  • How big is the table (cost)
  • How often we update it (cost)
  • How important is the data
  • How fast are jobs that read it. The retention time should be at least as long as the slowest job duration.

At Untienots we put 7 days for most tables and 1 month for small and critical ones. This still has a cost but in our case, the (GCS) cost overhead is acceptable.

Tips: Implementing a separate regular job that will apply the vacuum for all tables instead of doing it in the main jobs that update them is a good practice. The vacuum, as any operation, could fail, and it would be a pity to fail the whole job for a cleanup step.

Performance concerns

Delta is based on top of Parquet, so we leverage all the parquet optimisations (compression, columnar, partition and column pruning, group scan skips, statistics .. ).

Besides, Delta commit protocol is more performant than Hadoop commit V2 because:

  • It writes files directly to their final destination
  • To replace existing files, it won’t physically delete it, just mark it as removed in the metadata, until a vacuum is executed. Which makes the main job faster.

Coming Next

I hope this blog will clarify for you the advantages of transactional writes for batch workloads.

At Untienots we are very satisfied with our experience with delta. It helped us to improve the reliability, simplicity and performance of our pipelines. During the next blogs of this series, we will go through the other advantages of Delta (Schema enforcement, Schema evolution, data deletion, small files compaction) and also some issues that we faced using this technology.

Considering the simplicity of its integration and all the above avantages, I think Delta could be a good option to consider for many companies. Delta provides a command to migrate existing parquet tables to delta. This command will add the required metadata to the folder without rewriting all files!

--

--