Spark, Hadoop, classloaders and dracones

Antonio Murgia
Agile Lab Engineering
7 min readDec 7, 2023

During November 2023 I helped a colleague in the infamous task of upgrading AWS EMR version on an internal framework built to ease the use of Apache Spark.

Disclaimer: this is not a beginner tutorial/article, we will delve into the Spark/Hadoop internals so every time I can I will add a link to relevant resources to fill your background.

Background

Our data platform is built on AWS and makes extensive use of S3, actually ALL data is persisted on S3.
In order to do so, during the years we developed a custom AWSCredentialProvider to tailor the way we authenticate to AWS from our Apache Spark jobs. Let’s not dive into any detail about it, but you simply need to know that we are not getting aws credentials in a standard way, and we wrote a couple of Java classes to achieve that.

These classes are part of our SDK and are shipped together with the Spark applications that run in the cluster (potentially every application can have a different version of the SDK).

Going back to the update, we made a pretty big jump from: EMR 6.0.1 to 6.13.0

The most notable version changes were:

So, we initially thought the culprit for the sudden change was Spark — but surprise, surprise, the Spark update turned out to be a piece of cake 🍰. We confidently deployed some sample applications to the new cluster, only to be met with the harsh reality that NOTHING was actually working.

A bizarre error threw us for a loop:

java.io.IOException: From option fs.s3a.aws.credentials.provider 
java.lang.ClassNotFoundException: Class awesome.company.aws.auth.v2.CustomCredentialsProvider not found

The confusion was real. Our code was running (main function and all), but the custom credential provider seemed to be playing hide and seek. How on earth could this happen?

We checked the deployed jar file, and the class was indeed there…

jar -tf myJar.jar | grep CustomCredentialsProvider.class

Then we started digging into Spark intricacies…

Spark and ThreadLocal

Spark goes all-in on ThreadLocal variables as a shortcut to keep its code clean and simple. Their assumption is straightforward: “Hey, you don’t need to worry about juggling multiple threads in a Spark application. We’ve got this covered with a nifty abstraction to parallelize your compute.”

Now, I’m not the biggest fan of ThreadLocals, but I’m not about to toss Spark out the window just for that. Sometimes you gotta embrace the quirks, right?

Turns out we want to do different things in parallel on the Spark driver and we are leveraging cats effect to do so.

What we suspected was that Spark was keeping Classloader instances (the classloader is that “invisible” thing in the JVM that loads classes) in ThreadLocal variables and that running our code on top of cats effect IO ended up executing on a different thread which had a different Classloader that did not load our application classes.

Our suspicion was that we were capturing a classloader instance “too early” in our code, so that our own class was not loaded yet. We wrote some additional code in our codebase to try to debug the issue, in short we tried instantiating the class directly (a new call) , through reflection and using Hadoop configuration utilities (which were the ones throwing the exception in the first place). We did that in the main thread and in all other threads and the result was always correct: the class was loaded successfully.

In the end, ThreadLocals were not the problem, neither was cats-effect and its concurrency model.

Spark multiple classloaders

Spark is not an application server, but has something in common with them, it somehow follows the Hollywood Principle when deploying your application.

What you usually do when deploying Spark applications on YARN in cluster-mode is the following:

  1. you prepare a fat-jar that contains all your custom dependencies (everything not Spark related) and copy it somewhere accessible in a distributed way (S3, HDFS are examples)
  2. you perform a spark submit call specifying the jar location, the main class and any Spark configuration you need

The spark submit call will then act as a client to YARN that will spawn a remote JVM which in turn will:

  1. run a predefined main called Launcher (this is possible because the cluster has Spark pre-installed so that Launcher main class is always available; from now on I will call this primitive classloader LauncherClassLoader)
  2. download your jar (from S3 or HDFS)
  3. create a new child classloader where your jar is loaded
  4. invoke your main class

Pretty neat, huh?
Usually this procedure does not have any kind of downside, all custom code runs on top of this new classloader (called MutableClassLoader) and every class that needs to be loaded (directly or indirectly) is loaded from there.

Another thing you need to know is that EMR is a “custom” distribution of the Hadoop stack and that aws takes good care of its customers packing in this distribution all the dependencies needed to interact with most aws services. As a matter of fact, it loads the following jar: hadoop-aws which contains S3AFileSystem (the abstraction to treat S3 as any other fs) and aws SDK, which contains AwsCredentialsProvider: the class we are extending to provide the custom authentication that we need.

Having that packaged for us makes our life much easier and also implies that is loaded in the LauncherClassloader.

Digging deeper

We knew we needed to dig deeper, and the easiest way to do so, was to remove the dust from our remote debugging skills.

We started reading S3AFilesystem code, we started stepping into, stepping over, placing breakpoints and losing track of time until we finally stepped into THAT line of code:

// fix up the classloader of the configuration to be whatever
// classloader loaded this filesystem.
// See: HADOOP-17372
conf.setClassLoader(this.getClass().getClassLoader());

Obviously we then looked up HADOOP-17372, where we noticed two things:

  1. It was introduced in 3.3.1 that means it affects our version and wasn’t affecting 3.2
  2. Another user had our same exact issue (Brandon)

So, we understood where the problem stemmed from: S3AFilesystem was loaded by the Launcher classloader; that line was setting the classloader of the Configuration object (later used to instantiate the CredentialProvider) to a classloader that knows nothing about our custom class!

Now the problem was found but, how do we fix it?
Many options unfolded:

  1. patch Hadoop jars at cluster provisioning
  2. add our credential provider jar along spark distribution jars at cluster provisioning
  3. Add the jar to the Spark working dir and setting spark.driver.extraClassPath and spark.executor.extraClassPath
  4. contribute to mainline Hadoop a patch that through a configuration key could avoid the setClassloader call

Option 1 and 2 have a couple of downsides:

  1. we would’ve shared the same credential provider over all application running into the cluster, without any ability to tweak it when needed
  2. needed integration with the cluster provisioning mechanism that sits out of our team; much harder to implement

Option 3 was clever since it overcame problems with previous options, but still we needed to complicate user life, telling them to add a weird string to their submit code, fostering cargo-cult programming.

Option 4 was probably the best, but given the pace new Hadoop versions are released and the pace aws includes them on EMR, it would’ve taken months to actually have the fix available in our environment.

After sitting down for half an hour making this discussion with team, we wrapped up the day; we were very happy because the problem was found, but it was bittersweet since no solution satisfied us 100%.

As usual, sleeping over it was a very good idea. The next morning I woke up with a plan in mind! If S3AFileSystem is loaded by the MutableClassloader the setClassloader call will set the “right” classloader and everything would work. The problem is that we can’t remove a class from a class loader, unless deleting a jar file from EMR cluster machines, still a suboptimal option (same as option 1 and 2).
Then a bulb lit in my head: fs.s3a.impl !

Hadoop allows us to set the class that implements the filesystem interface (set by default to org.apache.hadoop.fs.s3a.S3AFileSystem), the solution is then 2 lines of code;

first line of code, define a class that extends S3AFileSystem:

package org.apache.hadoop.fs.s3a;
class PatchedS3AFileSystem extends S3AFileSystem

Second line of code:

spark-submit --conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.PatchedS3AFileSystem

This works like a charm because the implementation is loaded by the MutableClassloader since we provide it as part of our app jar, and when the setClassloader call is performed, the MutableClassloader (that also contains the credentials provider) is set to resolve it. Bingo!

End of the story?

No sir! We will provide a PR to Hadoop so that the setClassloader call can be avoided when needed (i.e. when using Spark and custom credential providers).

In the end, what started as a seemingly routine upgrade turned into a valuable learning experience, reminding us that sometimes the most unexpected issues lead to innovative solutions. As we continue to evolve our data platform on AWS, we’ll keep embracing challenges, finding solutions, and, of course, sharing our insights with the community.

And with that, this episode of the saga of Spark, Hadoop, classloaders, and dracones comes to a close.
Until the next unexpected adventure in the world of data engineering!

--

--

Antonio Murgia
Agile Lab Engineering

Computer science engineer, scala addict, coding enthusiast, distributed systems lover... I also like beers, wine and good food. Quite a good chef after all.