Reliable and Fast Spark Tables

Photo by Thomas Bormans on Unsplash

The anatomy of table and its perils

A table is basically a convenient stable identifier for an unwieldy unstable path where a bunch of data files is located. You can use mart.orders instead of s3://acme-data-mart/fact-tables/orders/v3. These identifiers are usually stored in a metastore service that is part of Hive, even if you don’t use anything else from Hive. Tables can also contain a schema, and usually both Hive-compatible schema and a more detailed Spark schema.

  • The readers might see partially written new data
  • The readers might see duplicates — where both erroneous old rows and corrected new ones are present
  • In the worse case, the writer can crash in the middle, making the above inconsistencies permanent, until the job is rerun

Overwriting a table

The easiest case is completely overwriting a table, which can be done with a code like this

df.write.mode("overwrite")
.option("path", "s3://bucket/dim_user")
.saveAsTable("mart.dim_user")
  • First, the table is completely removed
  • Then, the data is written to S3
  • Finally, a table is created again, pointing to this data
  • Write the data to a new path and a new table. For example, add a random suffix to the name and path of the existing table.
  • When the table is written, use SQL alter table to point the existing table to the new data path.
  • If everything above succeeds, either delete the old table or schedule the deletion for a later time. If anything fails, delete the temporary table and its data.

Overwriting partitions

For large tables, we usually want to partition them by some column, such as date or country, and to be able to replace just one partition. For example, one might collect data for one day, and write it as follows

df.write.mode("overwrite")
.option("path", "s3://bucket/table")
.partitionBy("date")
.saveAsTable("mart.orders")
.config("spark.sql.sources.partitionOverwriteMode", "dynamic")
  • Write new data to a temporary table, making sure to set partition override mode to “static”.
  • Enumerate the written partitions and, optionally, copy the data to the table location. Copying the data into the default partitioned table layout will allow Spark to recover metadata, should it get lost somehow.
  • For existing partitions, modify the table metadata to point at the new locations, and for new partitions, create a new record in metadata.
  • Delete old partition data, or schedule a deletion for later

Committing the data

Imagine we’re back in January 2008, when iPhone 1 (2G version) and HDFS are both cool. You’re using MapReduce to process and write data, but you still need this data to appear in the destination directory in an atomic fashion. To do it, you use a two-stage approach

  • Each task writes data to a temporary directory specific to that task. When the task completes successfully, its temporary directory is atomically moved, via the rename operation, to the job’s temporary directory
  • When all tasks of a job are complete, the job’s temporary directory is renamed to the final output directory

Staging committer

The staging committer writes the task results to the local disk of executors. It also writes some metadata to a shared HDFS location. When all tasks are completed, the data is uploaded from the local disks to the cloud storage, directly to the final locations.

Magic committer

The magic committer uses an S3 mechanism called ‘multi-part upload’. When uploading a large object, it is often convenient to load it in parts, and then tell S3 that you’re done. Only after that, the object becomes visible. Magic committer employs this mechanism to achieve atomic commit in Hadoop/Spark world.

Spark vs. new committers

The new committers were a major improvement, but unfortunately, the Hadoop interfaces for them limited their utility. Specifically, we have one committer that can handle dynamic partition overwrite, another one that cannot, and the common Hadoop interface for them that is unable to communicate the difference. You can’t request or check dynamic partition behavior through this interface. As result, Spark decides to play it safe, and errors out if you are trying to use dynamic partition overwrite together with new committers.

  • Write new data to a temporary table using the magic committer, with zero copying.
  • Discover partitions in the temporary table, copy them to the final table, and use metadata operations to point partitions to the new data

New-generation table formats

The solution with a magic committer, temporary tables, and switching of partition paths was quite a lifesaver for us. It is nonetheless a non-trivial correctness-critical setup we’d rather not think about. If we go back to the discussion of Spark/Hive tables, we can pinpoint one aspect that causes the most trouble. Namely, the idea that we specify a table location or partition location, and every file in that location is assumed to be part of the table.

Delta vs. Iceberg

The two most popular new-generation table formats are Delta and Iceberg. They are very similar conceptually, and the choice between them is mostly a matter of preference, or specific use cases, such as bucketing (supported only by Iceberg) or z-ordering (supported only in Delta).

  • In Spark 2, it was fairly awkward to use. Even writing a partitioned table required undue hacks.
  • In Spark 3.0, it became fairly useable, and we started to use it for our heaviest job. There were still cases, where you’d had to change your code to use Delta.
  • Starting with Spark 3.2.1 and Delta 2.0.0, the dynamic partition overwrite as described earlier just works, identically to classic Spark tables
spark.set("spark.sql.sources.partitionOverwriteMode", "dynamic")
df.write
.format("delta")
.partitionBy("date")
.option("path", "s3://bucket/table")
.saveAsTable("mart.orders")

Conclusion

In this post, we’ve seen that Spark tables are not nearly as simple, reliable, and fast as you might think.

  • Write your own code to atomically update the table via metadata operations. It might take a week, but will save a lot of pain
  • Use the magic committer to reduce the data copying in S3

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store