Transactional solution to Apache Spark’s overwrite behavior

Himani Gadve
9 min readAug 23, 2021

--

Summary

Spark is a processing engine; it doesn’t have its own storage or metadata store. Instead, it uses AWS S3 for its storage. Also, while creating the table and views, it uses Hive metastore. And that’s the major reason Spark doesn’t provide reliable data processing systems such as atomicity, consistency, Isolation and durability (ACID) transactions. This document describes a transactional solution to Spark’s overwrite behavior and its implementation

What is a Spark’s overwrite behavior

In Apache Spark framework, the overwrite as the name implies it rewrites the whole data into the path that you specify. Rewrite in the sense, the data from the source will be written to the path by removing the existing data files from the target path specified. Hence, you can consider this as a DELETE and LOAD scenario, where you read all the records from the data source AWS S3 and then do your transformations and delete the parquet and write the new content in the target path

Why it is problematic?

1. Atomicity problem with Spark

Atomicity means, either all or nothing. When we are using Spark data frame writer API, it should either write full data or no data. Spark’s save modes (append, overwrite, ignore, error) do not utilize any locking on the data. That means a failed job might leave an incomplete file and ultimately corrupt the data.

2. Consistency problem with Spark

At the most basic level, the consistency ensures that the data is always in the valid state. With that definition, using overwrite mode of the Spark writer API, deletes an existing file/folder and then creates a new one. These two operations are not transactional. That means, in between, there is a time when data does not exist. If our overwrite operation fails, we are going to lose all of the data. There is a time in between the delete and write operation when our data is not in the valid state. And that’s how Spark API lacks consistency

3. Isolation problem with Spark

An operation read/write in process and not yet committed must remain isolated from another activities read/write. When we are writing a data set, another concurrent read/write on the same data set should not be impacted by our write operation. Apache Spark does not have this feature. It does not have a strict notion of a commit. Hence, Spark is not offering isolation types.

4. Durability problem with Spark

A committed data is saved by the system such that, even in the event of a failure and system restart, the data is available in its correct state. Durability is offered by the storage layer, we know that AWS S3 is great in this. However, when Spark doesn’t correctly implement the commit, then all the durability features offered by the storage does not apply correctly. We cannot read a dataset when a different process is concurrently overwriting which is a considerable limitation put by the Apache Spark. Apache Spark does not offer updates and deletes on the data.

5. Small File Problem in Spark

The data ingestion job will append a new batch of data to the destination location hourly which will have at least one new file. File size depends upon how much data we collect in that one hour. It could be 1 GB, 100 GB, or even just a few MBs. Data Ingestion job creates hundreds of small less than 1 GB files every hour which creates too many small files. Small files are neither efficiently handled by the storage systems nor it can be efficient for the Spark because the Spark API would internally need to query the storage system such as AWS S3 to get the list of files. A large number of files would slow down the listing operation. Spark would also need to open all of these files and close them after reading them. This will result in the job spending a lot of time opening and closing files rather than reading data. That will again slow down our job. So, a large number of small files is one of the problems.

Summarized problems using Apache Spark are:

  • Data “overwrite” on the same path causing data corruption or data loss in case of Job Failure
  • Spark’s saveMode as ‘Overwrite’ deletes all the data from target path to load with new data
  • ‘Upsert’ (‘Update’ and ‘Insert’) operation is not allowed
  • No access to the earlier versions of data for audits, rollbacks
  • ACID non-compliance
  • Many number of Small File generation and processing
  • Lack of Schema enforcement

If ACID compliance gets fixed, we will get Update, Delete, and Merge operations functional in Apache Spark.

Solution

The proposed solution is Delta Lake open source implementation on Apache Spark using Scala, Python and SQL. Delta Lake plays an intermediary service between Apache Spark and the storage system. Instead of directly interacting with the storage layer, Spark talks to the Delta Lake for reading and writing the data. Thus, Delta Lake takes the responsibility of complying to ACID properties. ACID feature is the most important aspect of Delta Lake. Delta Lake provides scalable metadata handling, and batch data processing. Delta Lake runs on top of existing data Lake such as AWS S3 and is fully compatible with Apache Spark APIs.

Delta Lake key features:

  • Supports Atomicity, Consistency, Isolation, Durability
  • Enables Time travel
  • Enables UPSERT (Update and Delete) Operation on data/table
  • Make storage more efficient using parquet files

How it works

The Delta Lake transaction log (also known as the DeltaLog) is an ordered record of every transaction that has ever been performed on a Delta Lake table since its inception. Delta Lake is built on top of Apache Spark in order to allow multiple readers and writers of a given table to all work on the table at the same time. In order to show user a correct view of the data at all times, the Delta Lake transaction log serves as a single source of truth — the central repository that tracks all changes that users make to the table.

Delta Lake Transaction Log

Multiple Concurrent Reads and Writes Handling:

Delta Lake breaks each read and write transaction down into its component parts, and once the transaction completes, add it to the transaction log. Since Delta Lake is powered by Apache Spark, it’s not only possible for multiple users to modify a table at once, but it’s an expected situation to handle. To handle these situations, Delta Lake employs optimistic concurrency control. It is a method of dealing with concurrent transactions that assumes that transactions (changes) made to a table by different users can complete without conflicting with one another by implementing a rule of mutual exclusion. This protocol allows Delta Lake to deliver on the ACID principle of isolation.

In the vast majority of cases, this reconciliation happens silently, seamlessly, and successfully. However, in the event that there’s an irreconcilable problem that Delta Lake cannot solve optimistically (for example, if User 1 deleted a file that User 2 also deleted), it throws an error.

As a final note, since all of the transactions made on Delta Lake tables are stored directly to disk, this process satisfies the ACID property of durability, meaning it will persist even in the event of system failure. The transaction log is the mechanism through which Delta Lake is able to offer the guarantee of atomicity. Delta Lake still retains atomic commits to ensure that in the event we need to audit our table or use “time travel” to see what our table looked like at a given point in time, we could do so accurately.

Delta Lake in action

Delta Lake solution which will support Atomicity, consistency, Isolation and durability. This effort will also facilitate the schema enforcement, ‘update’, ‘delete’ , ‘upsert’(‘update’ and ‘delete’) operations on data along with compaction which eradicates small file issue which speeds up readability on data by organizing it into large files that can be read efficiently. This also provide a facility to time travel and retrieve data using timestamp and version. Querying on other engines such as Athena, Redshift Spectrum.

How Delta Lake supports ACID

Delta Lake maintains a delta log in the path where data is written. Delta Log maintains details like:

  • Metadata like — Paths added in the write operation. — Paths removed in the write operation. — Data size — Changes in data
  • Data Schema
  • Commit information like — Number of output rows — Output bytes — Timestamp
  • After successful execution, a log file is created in the _delta_log directory. The important thing to note is when you save your data as Delta, no files once written are removed. The concept is similar to versioning.
  • By keeping track of paths removed, added and other metadata information in the _delta_log, Delta Lake is ACID-compliant.
  • Versioning enables the time travel property of Delta Lake, which is, we can go back to any state of data because all this information is being maintained in _delta_log.

How Delta Lake solves these problems

  • With the support for ACID, if the job fails during the “overwrite” operation, data is not lost, as changes won’t be committed to the log file of _delta_log directory.
  • Since Delta Lake does not remove old files in the “overwrite operation”, the old state of data is maintained and there is no data loss.
  • Delta Lake supports Upsert(Update+Insert) operation as mentioned above so it makes dealing with updates in data easier based on the unique predicates(keys) provided in the script.

When to use Delta Lake with Spark

  • When dealing with “overwrite” of the same dataset, this is the main challenge I have dealt with and Delta Lake works wonder in such scenarios.
  • When dealing with data having updates (keys must match between target Delta table and source DataFrame) and Inserts (keys does not match between target Delta table and source DataFrame), the “merge” functionality of Delta Lake helps in working with Update and Insert in the target data.
  • To perform vacuuming means, removing old version of the data files generated in 0 hours to 168 hours.
  • To generate symlink manifest files to use Delta tables on different engines such as Athena, Redshift Spectrum, Presto.
  • To travel back in time to retrieve the old versions of the data files

Upsert operation and how it works

Upsert operation on DeltaTable allows for data updates, which means if DeltaTable can be merged with some new dataset, and on the basis of a unique join key , data can be inserted or modified in the Delta table if join keys do not match. Upsert operation under the hood works as shown below

Scan 1 — inner join: On the basic of unique predicates, it will select the files which contain the matching records. In the below image, the Grey boxes (file1, file3 and file5) represent files which have no matches at all. Other 2 files (file2 and file4) have partial matches, some rows of the parquet file matched the predicates. The green part represent rows with matched records where red part again non matching records however those are the files eligible for copy as an insert.

fig 1. Upsert operation under the hood-scan 1

Scan 2 — outer join: Once the files are getting identified (via Scan 1), it will rewrite those files because it does not delete or update underlying parquet files.

fig 2. Upsert operation under the hood-scan 2

The rows matched the predicate (green parts) will be updated in the new files where unmatched records (red parts) will be simply copied into those new files.

All the Tombstone files (where no matches found) will not be physically deleted, rather, in the transaction log (JSON files — as defined in fig-1) these files will be marked as “Removed”. This will help to maintain the version history and will be used during time travel.

When and why perform Vacuum?

  • To time travel to a previous version, you must retain both the log and the data files for that version. The data files backing a Delta table are never deleted automatically; data files are deleted only when you run VACUUM. VACUUM does not delete Delta log files; log files are automatically cleaned up after checkpoints are written.
  • By default you can time travel to a Delta table up to 30 days old unless you Run VACUUM on your Delta table.
  • Remove files no longer referenced by a Delta table. Vacuuming is the processing of removing the previous versions of Delta data which are physically present in the target directory. When the job reads data from the target directory it reads the latest version of the data by referring to the .json file which has the instructions to read the latest and skip and old version of data.
  • By default, vacuum has a 7 days retention period, afterwards the previous versions of the data will be deleted forever
  • If you want to delete the not used files right away then we provide 0 hours to remove unused files by Delta. By disabling the Spark configuration
‘spark.databricks.delta.retentionDurationCheck.enabled’
  • If you want to delete the previous version of data files right away at the end of job then we provide 0 hours to remove physical files from Delta file location

Reference:

https://docs.delta.io/latest/quick-start.html

--

--