Don’t Ever Let the Facts Get in the Way of a Good Tale

Mickey Shnaiderman
Reco R&D
Published in
7 min readFeb 9, 2022

A story about how we (allegedly) found an undocumented or reported “bug” in Delta Lake, how we “mitigated” it, only to realize there wasn’t a bug in the first place. We still developed a cool solution, and we love it!

Originally posted on recolabs.dev

You know how sometimes you’re hungry, but there’s not much to work with? You make a basic dish with whatever’s in the fridge (who said cucumber omelet is not a thing?), and it comes out surprisingly delicious, only to discover all the while you had everything you needed to make a fancy risotto. After you’ve finished cooking, you feel amazing simply because you created a wonderful recipe by yourself. This is kind of what happened to us as we were trying to solve a non-existing bug in Delta Lake.

Tables in Delta Lake provide ACID transactions, which implies data may be accessed and edited without causing corruption. However, there are significant assumptions and problems in this claim. In this post, we’ll go through everything in detail: what is the table and format structure of our data lake; why and how we moved to Delta Lake (and why we love it!); what obstacles did we overcome (and believed they were bugs); how did we fix them; and how we learned we could have solved the problem right away by raising our thread’s exceptions (and RTFM!).

TL;DR

  • Delta Lake’s dataframe write API call turned out to be not thread-safe foolproof if the table does not exist yet;
  • The same applies when multiple threads try to evolve the schema of an existing table
  • The solution: Make sure that only one thread creates or modifies a table at a time, after which concurrent writing works flawlessly!
  • Another (probably less efficient) solution: Just retry on exceptions until success.
  • The best solution: RTFM and raise exceptions from threads in Python by calling result!

What does (part of) our data lake look like?

At @RecoLabs, we rely heavily on graphs to represent data. We also have a large number of distinct sorts of nodes and edges. As a result, each entity is represented by its own table. EmailAccount, CalendarEvent, and so on are examples of node tables, while InvitedTo, SentMessage, and so on are examples of edge tables.

Multiple sources are used to populate those tables at the same time: On a daily basis, raw data is pulled from SaaS platforms (such as GMail, GDrive, Calendar, and so on) and saved in an S3 (as JSON) bucket before being turned into the entities listed above (Also JSON at the time). For example, EmailAccount items can come from practically any of those services’ extracted data, implying that we write to the same table from different sources.

As a result, we need a technique to ensure the data in those tables is consistent. Furthermore, we want to ensure that if the schema of the entities changes, the tables would update as well (or alternatively crash if there is a schema incompatibility). Delta Lake is our guy! Not only does it not affect our formats significantly, but it also manages schemas, transactions, and concurrency. Or so we thought…

Coding through the migration to Delta Lake

Initially, our transformation code was built entirely in Python, and each raw JSON file taken from the SaaS was changed into a series of JSONs, each of which related to each of the entities in the original raw file. It’s worth noting that, despite the fact that this method is embarrassingly parallel (each input file results in a single output file), it produces very small files, causing a performance hit on the readers of those tables later on.

To use Delta Lake, we must convert the entities into Spark DataFrames, which necessitates rewriting some of the functionality in PySpark. That wasn’t that difficult. Here it is in nutshell:

  1. Read the raw JSON files directory as a dataframe and run the Python function on the contents, which produces a list of all entities found in the raw file.
  2. Divide the results into columns, each of which corresponds to a distinct entity (note that each column is of the type Struct);
  3. Expand each of the aforementioned columns into its own dataframe, which can then be appended to a Delta Lake table (this step can be done in parallel!).

Another thing to keep in mind is that our entire product is deployed as a single-tenancy, which means that all of those tables are “duplicated” for each of our clients. As a result, the data lake for a new onboarding client is empty, and we must generate them from scratch during run time. Another scenario in which a new table must be created from scratch is when a customer configures a new SaaS for us to extract from. That includes a new type of entity.

Testing (and failing)

After the transformation outlined above, we run an aggregation suite that de-duplicates the entities and aggregates their fields according to a set of predefined rules (such as collecting all the titles of a CalendarEvent or setting the start and end time of the event to the ones from the last edit).

At this point, we’ve run a number of data quality tests (based on data samples of our own SaaS accounts).

After implementing the above, we ran those tests, and one actually failed! The test verified all nodes present in the edge tables (from and to) are present in the nodes tables (and vise-versa).

Investigating

The aggregation stage’s code remained unchanged (save for reading Delta Lake instead of JSONs), so we decided it wasn’t the source of the problem and crossed it off the suspect list.

Following further study, we discovered that different test runs result in different rows in the tables! That is to say, the issue is one of concurrency: For some reason, multiple threads add to the same dataframe that is stored to a Delta Lake table, causing it to collide. We arrived to the following findings about the Delta Lake dataframe write API after a little more tinkering:

  • If the table does not exist yet, concurrent writes are not thread-safe.
  • If the table does exist, concurrency works flawlessly.

As a result, we built an init function that looks over all possible entities and relationships before creating an empty Delta Lake dataframe with dummy data (we generate a dummy schema for all tables because we enable schema evolution anyway):

We did all of the modifications and writings in parallel after the table initializations. Then there was some cleanup to be done. In the code above, there is a dummy partition (1970-01-01) with a dummy column. Those should not be included in any of our final dataframes. Furthermore, if we didn't touch any empty dataframes throughout this run of the transformation step, we don't want them.

So we wrote some cleanup functions:

These functions totally remove initialized and untouched tables (by deleting their directories) and reconstruct the initialized and written tables without the dummy partition and column.

All good, right?

Furthermore, for some reason, schema merge (recall that we initialized a dummy table with only one dummy column) did not appear to be thread-safe, so we thought: Can we create a thread queue per model (each model is saved into a single Delta Lake table), where the first item in the queue runs alone and all the other tasks run in parallel after it finishes (similar to a soldier squad, where the first one ensures that the area is clear)?

This way, the first task constructs the Delta Lake directory with the correct schema (or merges a new schema to an existing table) without causing any disturbance, and all of the other writers are known to work fine in parallel after that!

So, instead of starting with one thread, we built a new ThreadPool that starts with one and then releases all the others to run:

Not only do all tests pass now, but there is also much less code and much less writing!

  • No initialization of all tables — each first thread in each pool writes, initializes it, or merges the schema out of the box.
  • No rewrites and deletions at the end for cleanup of the init tables.

We are the champions (ARE WE?)

Not so fast… While writing this blog post we felt like we were missing something. How can Delta Lake fail so miserably in concurrent writing and not raise an exception? We then stumbled upon Concurrency control — Delta Lake Documentation which describes different exceptions that can occur when a transaction cannot be performed.

WAIT, WHAT? Did you say exceptions? We haven’t seen any 🤨… After a bit more investigation we found the reason was in ThreadPoolExecutor.submit function: if a thread throws an exception, it will not be raised unless Future.result function is called, which we didn’t use 🤦‍♂️. We’ve added result() calls to all submit() calls, and guess what?

delta.exceptions.ProtocolChangedException: The protocol version of the Delta table has been changed by a concurrent update. This happens when multiple writers are writing to an empty directory. Creating the table ahead of time will avoid this conflict. Please try the operation again.
Conflicting commit: {"timestamp":1638795307104,"operation":"WRITE","operationParameters":{"mode":Append,"partitionBy":["extraction_date"]},"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"2536","numOutputRows":"66"}}
Refer to https://docs.delta.io/latest/concurrency-control.html for more details.

Yes, we are embarrassed, but at least we found the correct solution to the “problem” and we love our cucumber omelet because it is OUR cucumber omelet. Conclusion: Always raise or log errors — do not suppress them! Oh, and RTFM!

--

--