Manipulating Nested Data Just Got Easier in Apache Spark 3.1.1

Farooq Qaiser
11 min readMar 8, 2021

--

In previous versions of Apache Spark, adding or dropping deeply nested fields could be a challenging programming exercise. In this article, we’ll be demonstrating how you can use the newly added Column.withField and Column.dropFields APIs to perform these kinds of operations trivially. In addition, we’ll also be sharing some of the thought-process that went into the design of these new APIs as well as taking a quick peek at how they work under-the-hood.

Spark 3.1.1 Scala 2.12.10

Things to note

As a disclaimer of sorts, I contributed most of the code for these new APIs to the Apache Spark project (with significant help from the open-source community!) so this article will likely be biased towards my own views and you should note that all opinions expressed herein are purely my own.

The other thing to note is that although we’ll be using Scala throughout this article, the equivalent code in Python or R shouldn’t look much different. If you’re looking to follow along in your REPL, you can find all the code we’ll be using in this gist.

With those out of the way, let’s dive right in.

Sample Data

First, let’s create a DataFrame with some sample nested data to work with:

Generating a DataFrame with a StructType column.

To keep things simple, we’ve created our DataFrame with only one row containing actual data for a fictional person named James. We also have another row to represent missing data (don’t worry about this too much for now, it will soon become clear why we need this). Our DataFrame has one top-level field (or column if you prefer) named person and the rest of the fields are all nested inside this. If you care about terminology, we say the name field is nested one level deep whereas the city and income fields are nested two levels deep.

Now, let’s say we want to add a new nested age field inside of person. Before we demonstrate how easy it is to do this in the latest version of Apache Spark, it’s helpful to see how we would have done this in previous versions of Spark and understand why it could be challenging.

Adding a new nested field prior to Apache Spark 3.1.1

In previous versions of Spark, the only built-in function you had at your disposal for modifying nested fields was the functions.struct method. Using this method, we can add a new nested age field like so:

Adding a new nested field using the functions.struct API.

As you can see, we didn’t need to write much code for this simple exercise, which is good news. Unfortunately, while the result looks reasonable for the first row of our DataFrame, the second row probably isn’t what we had intended. Our code has expanded what was originally a null value into a struct consisting of null values for each of the pre-existing fields in the schema. In general, we would probably want to preserve the null values as just that. This is easily remedied with a little more code:

Adding a new nested field whilst preserving null values using the functions.struct API.

That’s better, now our null values remain as null values. Looking more closely at our schema though, it would probably make more sense if this new age field was nested inside person.details, so let’s try to adapt our code to do that next:

Adding a deeply nested field using the functions.struct API.

Yikes, that’s a lot of code just to add one nested field! We’re starting to see the problem with using the functions.struct API for these kinds of operations. Specifically, the more deeply nested a field is, and the greater the number of fields at each level of nesting, the more code we have to write to modify it. As you can imagine, this quickly becomes tedious to write and hard to read.

Now, if you’re a competent programmer and willing to leave the comforts of the Spark-SQL DSL behind, you can reduce the amount of code you have to write, for example by referencing the DataFrame’s schema to recursively rebuild the struct with your new field inserted in. In fact, you can find a few different implementations of this idea floating around on StackOverflow (albeit of varying quality).

Unfortunately, for many Spark users this would be a challenging exercise. Think of your average Analyst or Product Manager who are just looking to query some data quickly and don’t necessarily use Spark on a daily basis. It’s not uncommon to see these kind of users reaching for suboptimal but perhaps easier to write alternatives such as using UDFs or dropping down to the RDD level. In the worst case scenario, these users may even demand data be stored only with flattened schemas purely for their convenience.

Now that we understand the limitations of the functions.struct API and the problems it can potentially lead to, let’s see how Apache Spark 3.1.1 fixes this issue.

Enter Apache Spark 3.1.1

As mentioned previously, Spark 3.1.1 introduced a couple of new methods on the Column class to make working with nested data easier. To demonstrate how easy it is to use these new methods, we’ll go through the same exercises as before, starting with adding an age field to the person struct:

Adding a nested field with the Column.withField API.

Here we’re using our first new API; Column.withField. We can call the withField method on any StructType Column ($"person" in our case) and it takes exactly two arguments:

  1. fieldName: String
    This is the name of the field we want to add ("age" in our case)
  2. col: Column
    This is the value we want to assign to the new field (lit(40) in our case)

and it returns another StructType Column with the new field added to it. That’s all there is to it. And notice, with this API we get sane null handling behaviour out-of-the-box.

Now let’s try something more challenging; let’s try adding the age field so that it’s nested inside person.details:

Adding a deeply nested field with the Column.withField API.

The interesting thing to note here is that the code is almost the same as in the previous image. The only thing we’ve had to change is instead of passing in "age" as the fieldName argument, we’re now passing in "details.age". The reason this works is because the Column.withField API supports Spark’s “dot notation” feature which allows you to describe the path Spark should traverse to reach your target nested field. We’re able to use this naturally in the Column.withField API to easily modify deeply nested chains of structs.

In contrast, recall that when we were using the functions.struct method to achieve the same result, we were forced to rebuild the chain of structs in their entirety and this typically requires several lines of code (if you care about readability anyway). This is one of the main drawbacks the new API addresses. If you’ve ever had to modify deeply nested fields in the past, you’ll know this is going to save you a lot of code!

Before we move on to the remaining use-cases, let’s take a brief detour to try to understand what Spark is doing under-the-hood to execute our query.

Detour: What is Spark really doing?

As with any new technology, it’s always helpful to understand what is going on under-the-hood to make sure there aren’t edge cases you need to be wary of. Let’s take a look at the physical plan that Spark is generating for our previous query to get a high-level understanding of what’s happening here:

Spark plan for adding a deeply nested field with the Column.withField API.

As you can see, Spark is ultimately translating our Column.withField method call into a bunch of nested if-not_null-named_struct-else-null operations and, if you look closely, you’ll see it’s very similar to what we were doing before with the functions.struct method, except we were writing that out manually. Spark can obviously do this automatically for us because it knows the schema structure of our Dataset. At the end of the day, this is a reminder that however magical Spark may appear to be, it’s really just a clever program.

That’s the end of our brief detour. The rest of this article will focus on demonstrating how you can easily perform other kinds of common operations involving nested fields.

Replacing nested fields

Now let’s say there’s a mistake in our data and James is actually located in New York. How can we fix this?

Fortunately, the Column.withField method allows us to both add new fields or replace existing fields with the same name in our target struct. This is basically the same semantics as you get with the Dataset.withColumn method. In code, this looks like:

Replacing a deeply nested field with the Column.withField API.

That about covers everything you need to know about the new Column.withField API.

Dropping nested fields

Now, what if instead of replacing the contents of the city field, we just want to drop this unreliable field entirely? For this, we can use the new Column.dropFields method:

Dropping a deeply nested field with the Column.withField API.

We can call the dropFields method on any StructType column ($"person" in our case) and as it’s arguments, it will take one or more string values representing the names of the fields we want to drop (only "details.city" in our case), ultimately returning a new Column with only the specified fields removed. Much like the Column.withField method, we get sane null handling behaviour out-of-the-box and the ability to refer to deeply nested fields directly via dot notation.

Another minor benefit of using this new API is that it’s clear what fields are being dropped (provided they existed in the first place). With the functions.struct API, it’s not always clear which fields are being dropped (if any) as dropping is done by omitting fields and unless you know what the schema was prior to the functions.struct method call, you generally can’t tell from just looking at the code if any fields are being dropped.

Now what if you want to add and drop fields at the same time?

Chaining withField and dropFields method calls

Since both withField and dropFields return a new Column, we can chain these method calls one after another in any order and as many times as we want, for example:

Chaining consecutive Column.withField and Column.dropFields methods.

As you can see here, we’re adding two new nested fields (age and gender) and also dropping the nested city field. The nice thing about being able to chain these methods like this is it enables end-users to describe a sequence of Column level “transformations” to apply in a single flow. This is not only easier to write but also easier to read and understand.

Bonus: Modifying Nested Fields in an Array of Struct

Last but not least, let’s see how we can combine these new methods with the existing functions.transform method to manipulate structs nested inside an array. This is a common data science use-case (and in fact, the original problem that motivated me to write these new methods). We’ll need slightly different data to demonstrate this though:

Generating an array of structs column.

All we’ve done here is taken what was previously our top-level (person) struct and nested it inside an array. And now, for the last time, let’s try to add a new field age to each of the structs nested inside the people array:

Adding a deeply nested field to structs nested inside an array.

Ta-da! Okay, the novelty of being able to add deeply nested fields easily is probably wearing off by now but that’s exactly how it should be. And as you can see, these new methods are fully compatible with all of the other existing functions Spark provides which means we can combine them in any way we want to get almost any desired result. This isn’t anything special; it’s just a natural consequence of having methods that both operate on and return Column objects.

Conclusion

To recap, we first looked at how we would traditionally add or drop nested fields in Spark and showed how this was painful to express using just the functions.struct API. We then demonstrated how we could dramatically simplify our code by using the new Column.withField and Column.dropFields APIs across a variety of different common use-cases.

One thing to note is that as awesome as these new APIs are, they aren’t intended to replace the functions.struct method, rather they fill in a previously missing gap. It’s easier to understand if you compare each of these APIs with their natural Dataset equivalents:

+--------------------+-------------------+
| Dataset | StructType Column |
+--------------------+-------------------+
| Dataset.select | functions.struct |
| Dataset.withColumn | Column.withField |
| Dataset.drop | Column.dropFields |
+--------------------+-------------------+

Imagine a world where you didn’t have the Dataset.withColumn or Dataset.drop methods. This would mean any time you wanted to add or drop a top-level column, you would be forced to use the Dataset.select API. That would be a relatively painful experience right? This was part of the problem we faced when all we had was the functions.struct method for manipulating nested fields. We were missing convenient, purpose-built APIs for performing these relatively common operations.

That’s all I wanted to share in this article. I hope these new APIs will make your life easier as a Spark developer going forwards. Last but not least, I wanted to give a shout-out to the awesome committers in the open-source Apache Spark community who patiently worked with me over several months to optimize and generally improve the design of these new APIs into what you see today. And this is only one of the many cool new features introduced in Apache Spark 3.1.1 so if you haven’t already, go check it out today!

PS: What if you’re using an older version of Spark?

  1. Upgrade to Spark 3.1.1
  2. Assuming that’s not possible, you can check out the Make Structs Easy library. It offers similar functionality but because it is essentially the proof-of-concept for the code I submitted to the Apache Spark project, it is lacking some features (less optimized, doesn’t support dot notation for referring to deeply nested fields, and no R support). So you can use it if you’re on an older version of Spark but really, you should just upgrade to Spark 3.1.1

--

--