By: Artem Shugayev
Several years have passed since I last debugged MapReduce jobs running on a remote big data stack instead of my local machine. Much has changed since then, but the stack and its design remain pretty much the same. I’m still following the same steps to configure the environment and adding specific properties that can help in investigation. I’ll try to describe each step using an example from real life: a recent issue we investigated and debugged. If you are using the Avro format for your data, with Java Avro library version < 1.10 (avro.jar), you may face the same trouble we did.
The technology stack that we use for our data engineering jobs is pretty conservative. Most tools are from the Apache Hadoop ecosystem, with Cloudera as the platform provider. Our data engineering team uses Apache Spark for batch processing. Spark is the Swiss-army knife of batch processing jobs and has its own system model that needs understanding, at least from a configuration point of view. A look at its architecture will show you its main restrictions and how it works.
Briefly, Spark forms computational stages that are described as a directed acyclic graph (DAG). Each stage is executed on the worker node within the executor’s space. All executors are configurable, and not limited to just one worker. Basically, we can run as many executors on one worker as needed, limited only by that worker’s resources (CPU and memory). So, executors work in a way that is similar to the processes in your favorite operating system, with their own life cycles and resource utilization.
Under the hood, Spark has tons of Java libraries that are part of the distribution. Libraries’ consistency of distribution is why Spark guarantees the compatibility for each integration. In our specific case, one of the libraries that was part of the distribution had a defect. And that defect was the main reason a job got stuck.
At TrueCar, we utilize AWS as our main cloud platform. AWS, obviously, has one of the best storage solutions, S3, that can be used for our data: It is cheap, reliable, and there are many libraries available for Java, Scala, Python, and more.
Among the many data formats we deal with, one important one is Avro. There are a lot of posts about the advantages of Databricks’ Avro format for specific scenarios, like evolutionary schema or massive write operations, so I will not add anything new here.
When we are using, as in our case, a data management system, we always need to check the versions, the way distribution provides hot fixes, how often the libraries are upgraded, and other things about the distribution itself. So, it is not enough to find and resolve the defect in a one-off way; we need to go through a full cycle that includes release and production rollout.
Our job got stuck while reading the stream from the file with an Avro object in it that was placed in the S3 bucket.
At the very end of the stage execution, we arrived at a situation where all the executors released the resources, and only one or two were still running. Those took most of the execution time and exceeded the expected runtimes. Normally, that job would have been finished in 30–40 minutes, but now two hours had passed, and no results had been obtained. Something was preventing the jobs from finishing.
As I said at the beginning of this post, there are certain steps that I follow most of the time. Let’s start with the logs.
The logs were not showing ERROR records; there was nothing unusual or anything that could help at that stage. The log message shows us information about opening the file:
INFO Executor: Running task 35.0 in stage 1.0 (TID 36)
The message below shows us the record of finishing the job:
INFO Executor: Finished task 35.0 in stage 1.0 (TID 36). 13897 bytes result sent to driver
Based on the logs, the number of starts should be equal to the number of finishes. But as we soon found, some executors were still running. That meant the executor hadn’t sent any data to the driver yet, in our specific case. Counting the number of “task beginning” records and the number of “finished task” records showed us that out of six tasks that began, four were complete, but two were still running. That told us that something was wrong and helped narrow down our search, but it still wasn’t enough information to tell us exactly what was going on. So then came the time to check the internal Spark tools. In many cases, these internal Spark tools can give us all that we are looking for.
Spark provides internal tools via UI that can help us to start our preliminary investigation. Thread dumps show us the call stack and basic information. That information helps us to set the breakpoints in the right places. Checking the thread dumps in this case, we saw:
"Executor task launch worker for task 187" daemon prio=5 tid=65 RUNNABLE at org.apache.avro.file.DataFileReader.openReader(DataFileReader.java:55)
Local Variable: byte#8199
Local Variable: org.apache.avro.generic.GenericDatumReader#15
Local Variable: org.apache.avro.Schema$Parser#1
Local Variable: org.apache.avro.mapred.FsInput#15
Local Variable: org.apache.hadoop.conf.Configuration#6
Looks like we were having some issues when reading the files. One of the readers was hitting line 55 and couldn’t stop.
Checking our codebase, we saw:
for (int c = 0; c < magic.length; c = in.read(magic, c, magic.length-c))
Last but not least, we should also mention that:
- Our job was failing consistently in different environments, using the same distribution version of Spark lib. AWS EMR was affected too.
- Our jobs were getting stuck even with different versions of dependencies: avro1.7.6, 1.8.2. Based on the codebase analysis, we’ve figured out that the code at line 55 of DataFileReader in different versions is the same.
At this point, we had everything we needed to start debugging. We knew the library version (as part of the distribution) and the line where the code was getting stuck. However, our environment was not yet fully prepared for debugging; we had no port to connect to remotely using Java. (I prefer IntelliJ IDEA, but most IDEs will need a similar configuration). GDB itself will not be so verbose and actually is not following best practices for debugging Java applications. So, we have the following requirements:
- Enable debugging for remote Java apps.
- Use your favorite IDE to connect to remote JVM and start debugging.
According to the Spark design, the driver spreads the jobs between the executors. When the job is done, the driver gets back some result of the execution from the executor. There are no limitations in the number of executors running at the same worker node. So, each worker can have more than one executor, and the number of executors is limited only by the amount of the resources. For debugging purposes, such a configuration limits us. We should set the same port for all executors running on the worker, in case any other executors will not start due to conflicts. So, the best way to start debugging is to tune the cluster to the current state:
By applying the mentioned approach, we will avoid any port conflicts. Additionally, we need to pass the next Java options to the Spark executor:
Later, when the job gets stuck again, we can connect to the executor remotely and start debugging the exact task:
- Spark will provide us with information about the stuck task and the executor that is running it, so we’ve got the address, and the port we set on our own.
2. Now we can connect to the JVM with our debugger. In IntelliJ IDEA, this can be done by selecting the appropriate launch configuration.
3. However, in our case, we wouldn’t like to use decompiled code, so attaching the appropriate source code of a specific version of our library is a good idea too. As I’ve mentioned before, we know the library where our code stays, and the version of that library can be checked in Spark environment properties:
The library comes as part of the Spark distribution, and the version is also mentioned in the classpath entry.
4. The next step is to add that version of the library to our IDE:
5. We’ve reached the needed line, so we are ready to set the break point and see what is going on there.
In our specific case, we found the issue described in the Apache Jira with the patch attached:
The logic of parsing the file and looking for the “magic” Avro header was broken and was causing our jobs to get stuck. The debug showed that the situation occurred when consuming the stream from S3, because the byte range starting at the end of the file was leading the thread to wait until the stream continued. But the stream never continued and the loop was becoming infinite. The circumstances were already known; we had blocked the job queue just waiting for the stream to continue. However, we hit the end of it.
As next steps, we are applying the patch, building a new version of Avro lib, and deploying it as part of the distribution packages, but the way we are doing that is a story for another time.
In an SRE’s daily routine, debugging is one of the most important things; it helps to troubleshoot the issue and find the appropriate solution. Debugging Spark jobs is not trivial, especially on remote machines in a distributed multi-node environment, where the issue is not reproducible on a local machine. I hope the recommended approach can save you some time in configuring your Spark cluster and the MR job itself.