How delta works

William Smith
5 min readAug 1, 2019

--

Most existing big data storages based on HDFS are lack of feature upsert(if exists then update otherwise add). This means you may suffer from many situations:

1. You just can not update records. For incremental data sync, the ability of upsert is required.
2. When you are updating data, people may not be able to update or even read the data.
3. A huge number of small files will impact your storage memory and performance seriously. In order to reduce the number of small files, you may create a job to compact small files to big files. But this behavior will trigger situation 2, when you are doing compaction job, this may take a long time, and the other jobs will be forced to stop otherwise they may throw exceptions.

[Delta Lake](https://delta.io) is an open-source storage layer that brings ACID transactions to Apache Spark™ and big data workloads.

It includes the following key features :

1. ACID Transactions.
2. Scalable Metadata Handling
3. Time Travel(data versioning)
4. Open Format(parquet)
5. Unified Batch/Streaming Source and Sink
6. Schema Enforcement
7. Schema Evolution

With the help of these features, you may write/update/read the same data in parallel. You can do a compaction job without affecting the other jobs who write/read the same data collection. This is really incredible. However, the bad news is, the latest version of delta is 0.2.0, and the Upsert/Delete/Compaction features are still working in progress. The good news is that I have created a new open-sourced project [delta-plus](https://github.com/allwefantasy/delta-plus) which has already added Upsert/Delete/Compaction features based on delta 0.2.0.

So in this post, I will also talk about the features which are available on [delta-plus](https://github.com/allwefantasy/delta-plus).

The design of delta is really amazing, and it’s simple, but it works. A delta table is just a directory contains two parts of file collection: A bunch of parquet files and a bunch of metafiles with both JSON/Parquet formats. This can be explained by the following picture:

The first question is, with this design, how can we add new data into the table?

How to add new records to delta table

As described in the previous picture, there are three main steps:

1. Create new parquet files to hold the new records. For now, the reader will not be affected, since the reader will read the existing metafiles and these metafiles do not contain the new parquet files.
2. Finally, the delta will create a new JSON file named 000…0012.json which contains the new parquet files and marked these files are added.
3. Because the delta has committed the parquet files with new commit 12, so now the readers can read the new records.

The data appending is easy, so what about upsert? And why the upsert operation will not affect the other readers?

How to upsert data to delta table

Suppose we have already had a1,a2 in parquet files. Now records collection A comes, we will do the following steps to complete the upsert operation:

1. Create new parquet file a3 from A. Notice that in the actual situation, we may create more than one file.
2. Filter a1,a2 with A, and get the records not in A, create new files for the records not in A. Suppose we create two new files a4,a5.
3. Finally, we commit a new JSON file named 000…012.json which contains information about a1,a2 are deleted and a3,a4,a5 are added.
4. Before the final commit, we do not delete any files, so the readers will not be affected.
5. After the final commit, there is no real deletion happen(we just mark the files are deleted), and the reader knows that a1,a2 have gone, and they need to read a3,a4,a5 from 000…012.json.

To filter records from all existing parquet files excluding the records in A and then creating new files from them is of course not a good idea. Just for simplicity, we describe the steps like that. Actually, we will use A to join all existing data and find the files contains records in A, this will not cause massive files are deleted and too many new files are created. We also can analyze the records A and extract the partition range information so we do not need to join all existing data to find the files contains records in A.

How to do compaction on delta table

The compaction is easy, but there are three limits for compaction implemented in [delta-plus]((https://github.com/allwefantasy/delta-plus)) for now:

1. The table you want to do compaction should not be performed upsert/delete operation.
2. Only one compaction job is allowed at the same time.
3. The specific version needs to be specified. The compaction will process all files are created before the target version.

The first limitation is caused by the current design of compaction. Compaction is a heavy operation because it means there are a lot of old files will be deleted and many new files will be created. The delta supports streaming writing and this means when you do a compaciton job, you should not affect the writing operation of streaming job. In order to accomplish this target, we will not lock the table before the job started, we will merge old files and create the new files firstly, then lock the table
and finally, commit the new files and deleted files.

If the table allows upsert/delete operation when the compaction has merged some old files, after this, the old files may also be deleted(not really deleted) by the new operation, but the new files contains these records should be deleted will finally be committed. This should not happen.

So the compaction consists of three main steps:

1. Find the files before the specific version(and these files will never be deleted by other operation)
2. Merge these files to new files.
3. Lock the table and try to commit.

Conclusions

You may have already noticed that no matter what operation you have applied in delta table, delta will always create a new commit file(JSON file) and record the files are newly created and the files are deleted for your operation. That’s why we call the metafiles as delta log. With delta log, we can do time-travel (or version travel), we can do upsert/delete/append operation without affecting readers.

--

--