Tables are fundamental to Spark jobs, and the documentation makes them look simple. As we shall see, they are full of perils. We’ll go deep on the issues, and review up-to-date solutions.
Let’s be honest up-front — if you a starting a new Spark project, use Spark 3.2, and is willing to listen to strangers on the internet, I’d advise that you use Delta 2.0. Although released just a few weeks ago, it’s a safe choice that gets you 90% there. If your situation is more complicated, or you want to learn the details, please read on. We’ll discuss how to safely overwrite a complete table, how to do it for a partitioned table, how to use “magic committers” to avoid expensive object copies, and then see which of the problems are solved by “next generation” table formats.
The anatomy of table and its perils
A table is basically a convenient stable identifier for an unwieldy unstable path where a bunch of data files is located. You can use
mart.orders instead of
s3://acme-data-mart/fact-tables/orders/v3. These identifiers are usually stored in a metastore service that is part of Hive, even if you don’t use anything else from Hive. Tables can also contain a schema, and usually both Hive-compatible schema and a more detailed Spark schema.
The key part for our purposes is that all files under the specified path are considered by Spark to be part of the table data. For a partitioned table, each combination of partition values has an associated path, for example, a given date partition might be a path of
s3://acme-data-mart/fact-tables/orders/v3/date=2022-01-05/. It is possible to change the path for a partition, even using a different bucket. However, it is still a path, and all files under that path are implicitly part of the table data.
Creating a table for the first time is easy. We write the data, and we register the identifier in the metastore. Since we haven’t told anybody about this new table, no reader will come over before we’re done. The problems pile up when a lot of jobs and interactive users might be using a table, while we need to update it —either to add new rows or to fix erroneous ones. What could go wrong? From the first principles, we can imagine several failure modes:
- The readers might see partially written new data
- The readers might see duplicates — where both erroneous old rows and corrected new ones are present
- In the worse case, the writer can crash in the middle, making the above inconsistencies permanent, until the job is rerun
If you are familiar with ACID terminology, cloud warehouses usually offer durability automatically, don’t care about consistency, and have, by default, major issues with atomicity and isolation. That’s what we’ll discuss in this post.
Let’s briefly talk about append vs overwrite. When appending to a table we’re adding new files with new data without touching any existing rows, and leaving it to the readers to make sense of it. When overwriting, we completely define the new content of the table or some of the partitions. We can combine new data with the existing, eliminate duplicates and apply any semantic changes. While there are some cases where append is appropriate, in general, there are more readers than writers, and spending extra effort on the writer side to make sure data is consistent is worthwhile. Therefore, we’ll be mostly talking about overwrite scenarios.
Overwriting a table
The easiest case is completely overwriting a table, which can be done with a code like this
Right away, we have a major gotcha:
- First, the table is completely removed
- Then, the data is written to S3
- Finally, a table is created again, pointing to this data
Any job, or interactive query, that is executed at the same time will likely fail because the table does not exist. Worse, if writing the table fails for any reason, we’ll end up with no table.
While Spark does not offer any solution, it can be easily implemented ourselves:
- Write the data to a new path and a new table. For example, add a random suffix to the name and path of the existing table.
- When the table is written, use SQL
alter tableto point the existing table to the new data path.
- If everything above succeeds, either delete the old table or schedule the deletion for a later time. If anything fails, delete the temporary table and its data.
alter table is purely a metadata operation, the window of time when concurrent operations might fail is reduced to a few milliseconds, which is usually acceptable.
For large tables, we usually want to partition them by some column, such as date or country, and to be able to replace just one partition. For example, one might collect data for one day, and write it as follows
Unfortunately, this code works similarly to the unpartitioned case above — it will start with deleting the entire table, with all historic partitions.
To avoid that, you’d use a configuration option
With it, Spark will only delete a partition if our dataframe has data for it. However, we don’t know what partition values are present in the dataframe. Therefore, Spark will first write data to a staging location, partitioning it. Then, it will determine what partition values are present, remove existing data in those partitions, and copy new data over.
The fundamental problem with this approach is that again, it’s not atomic. A concurrent reader can observe a state where partition data is deleted, but new data is not yet written, or partially written.
In all Spark versions prior to 3.1.0 there also was an important technical bug — the dynamic overwrite mechanism could not recover from any task failure. Normally, if one executor goes down for transient reasons, Spark can retry relevant tasks and complete the job. The bug meant that any such retries will fail all over until Spark gives up and fails the write operation. That was eventually fixed but highlights how fragile the dynamic overwrite is.
Fortunately, we can apply the same trick we used with full table overwrite.
- Write new data to a temporary table, making sure to set partition override mode to “static”.
- Enumerate the written partitions and, optionally, copy the data to the table location. Copying the data into the default partitioned table layout will allow Spark to recover metadata, should it get lost somehow.
- For existing partitions, modify the table metadata to point at the new locations, and for new partitions, create a new record in metadata.
- Delete old partition data, or schedule a deletion for later
In our specific case, a complete implementation runs around 300 lines of code and basically eliminates all reliability issues with standard Spark table write methods. However, this leaves the question of performance.
Committing the data
Imagine we’re back in January 2008, when iPhone 1 (2G version) and HDFS are both cool. You’re using MapReduce to process and write data, but you still need this data to appear in the destination directory in an atomic fashion. To do it, you use a two-stage approach
- Each task writes data to a temporary directory specific to that task. When the task completes successfully, its temporary directory is atomically moved, via the rename operation, to the job’s temporary directory
- When all tasks of a job are complete, the job’s temporary directory is renamed to the final output directory
Because on HDFS, rename operation is atomic, the approach is fairly reliable. It is known as Hadoop Output Committer version 1.
We’re now back to the present time. Using Spark tables with dynamic partition overwrite adds an additional round of copying, as explained above. When using S3, there is no atomic rename of directories — you have to move each object, and moving an object is basically copying every byte to the destination and removing the source. When using GCS, moving an object is in theory a metadata-only operation, and no content bytes are copied. In practice, the stopwatch barely records any speedup, it’s still very slow. So, we end up with 3 rounds of copying data for every byte.
Given the observation that sequential copying of objects will not give us any sort of atomic semantics per se, the first optimization was to merge the task temporary directory and job temporary directory into one, resulting in Hadoop Output Committer version 2, which is the standard recommendation for Spark with cloud storage. On the Spark side, the entry point for the commit mechanism is the HadoopMapReduceCommitProtocol class, which in particular has a lengthy comment describing those data movements.
The Hadoop community then moved on to create a better long-term solution, creating two next-generation committers — staging and magic.
The staging committer writes the task results to the local disk of executors. It also writes some metadata to a shared HDFS location. When all tasks are completed, the data is uploaded from the local disks to the cloud storage, directly to the final locations.
Since the shared HDFS location contains the list of all paths we’re planning to write, the staging committer can implement the dynamic partition overwrite logic itself. It can enumerate all partitions we’re trying to write to, and automatically delete existing data in those partitions.
Support for dynamic partition overwrite is a clear benefit of this committer. On the other hand, you need additional local disk space, the commit is not atomic, and if any executor is lost, the task results on the local disk are also lost, and the task needs to be retried. Finally, you need to maintain HDFS just for coordination.
The magic committer uses an S3 mechanism called ‘multi-part upload’. When uploading a large object, it is often convenient to load it in parts, and then tell S3 that you’re done. Only after that, the object becomes visible. Magic committer employs this mechanism to achieve atomic commit in Hadoop/Spark world.
Each task starts the multi-part upload operation, specifying the final destination for the object. When the job is complete, all the upload operations from individual tasks are committed.
It looks like the fastest possible solution — there is no data copying at all and no additional metadata is kept on HDFS. However, the absence of metadata means that we can’t do dynamic partition overwrite. There’s simply not enough information to know what files we’re writing.
It should be noted that initially, the magic committer required an additional component called S3Guard. The S3 metadata operations were not perfectly consistent — it was possible for object listing to not show an object that was just added. Such inconsistency broke magic committer, and S3Guard was a solution that uses an additional DynamoDB table to keep track of what objects we’ve just added. In 2022, it is no longer necessary, S3 is fully consistent itself.
Spark vs. new committers
The new committers were a major improvement, but unfortunately, the Hadoop interfaces for them limited their utility. Specifically, we have one committer that can handle dynamic partition overwrite, another one that cannot, and the common Hadoop interface for them that is unable to communicate the difference. You can’t request or check dynamic partition behavior through this interface. As result, Spark decides to play it safe, and errors out if you are trying to use dynamic partition overwrite together with new committers.
What we can do is combine our earlier approach of table metadata updates with the new committers:
- Write new data to a temporary table using the magic committer, with zero copying.
- Discover partitions in the temporary table, copy them to the final table, and use metadata operations to point partitions to the new data
This approach does include one unnecessary copying, but it’s fairly reliable and does one less copy than the default Spark approach.
New-generation table formats
The solution with a magic committer, temporary tables, and switching of partition paths was quite a lifesaver for us. It is nonetheless a non-trivial correctness-critical setup we’d rather not think about. If we go back to the discussion of Spark/Hive tables, we can pinpoint one aspect that causes the most trouble. Namely, the idea that we specify a table location or partition location, and every file in that location is assumed to be part of the table.
This is precisely the reason why we can’t run 100 tasks each writing a file to the final destination. As soon as one of them finishes, that one file, with 1/100 of all the data will be considered as part of the table by all the readers. There are several new table formats that redo this aspect and make a table explicitly list the data files.
Suppose we are overwriting a table. There are 100 existing files, and a manifest file listing those 100 data files. We can write data directly to the same location. We don’t update the manifest file, so any concurrent readers will continue using the old 100 data files without any trouble. After we’re done, we update the manifest file to contain the list of 100 new files. As long as we can atomically update that single manifest file— and it’s relatively easy — all concurrent readers will either use the old data files or the new data files. There is never a situation where we read an inconsistent state.
Why should we use a manifest file, as opposed to additional metadata database? That boils down to operational and development convenience. Sure, Hive metastore could be modified to have a list of files, but then any changes to table format would have to be coordinated with Hive's release schedule. The file list could be stored in yet another separate database, but that’s extra operational effort. Keeping all the metadata in a file alongside the data itself makes a table self-contained — as long as you know the S3 path, you can read this table.
Delta vs. Iceberg
The two most popular new-generation table formats are Delta and Iceberg. They are very similar conceptually, and the choice between them is mostly a matter of preference, or specific use cases, such as bucketing (supported only by Iceberg) or z-ordering (supported only in Delta).
There are some attempts to compare them, including performance comparison, but I would not care very much. Performance comparisons are pretty hard to do even on your own cluster and your own workloads, and when other people run “industry standard” benchmarks, it often means absolutely nothing for your heaviest jobs.
I have direct experience with Delta, and it made quite a great progress in the recent couple of years
- In Spark 2, it was fairly awkward to use. Even writing a partitioned table required undue hacks.
- In Spark 3.0, it became fairly useable, and we started to use it for our heaviest job. There were still cases, where you’d had to change your code to use Delta.
- Starting with Spark 3.2.1 and Delta 2.0.0, the dynamic partition overwrite as described earlier just works, identically to classic Spark tables
The last item is exactly where we were heading. It is now possible to write
As result, partitions for any dates present in
dfwill be replaced and all other partitions will be left alone. Importantly, because data is written directly to the specified paths, you don’t need to think about any custom committers; delta has its own one that works just fine.
In this post, we’ve seen that Spark tables are not nearly as simple, reliable, and fast as you might think.
If you are starting a new project and use Spark 3.2.1 or later, I would recommend starting with Delta 2.0. It solves almost all issues with reliability and performance, it does not require any committers to be set up, and basically gets your 90% there. Iceberg would also be a reasonable choice, but you probably should first speak to people who used it in production.
If you can use neither Delta nor Iceberg, then my recommendation would be
- Write your own code to atomically update the table via metadata operations. It might take a week, but will save a lot of pain
- Use the magic committer to reduce the data copying in S3
Hopefully, it was useful. If you have any questions, feel free to ask on Twitter, and if you’re interested in more data engineering and data science content, please follow me on Twitter and subscribe on Medium.