EXPEDIA GROUP TECHNOLOGY — SOFTWARE

Part 5: How to Resolve Common Errors When Switching to Cost Efficient Apache Spark Executor Configurations

How to resolve memory issues that happen when switching to efficient executor configs

Brad Caffey
Expedia Group Technology

--

An image representing the word ‘ERROR’ in binary code
Photo by vchal on Shutterstock

If you like this blog, please check out my companion blogs about how to use persist and coalesce effectively to lower job cost and improve performance.

When switching to the cost efficient executor configuration, sometimes your tasks will fail due to memory errors. In this blog, I will mention three fixes you can try whenever facing these errors. Before I do that, I will cover how to lookup task failures in the Spark U/I. If you are already comfortable with identifying task failures in the Spark U/I, jump to the Overhead Memory Error section.

Researching Failed Tasks

A quick refresher about the hierarchy of a Spark job. A Spark job is divided into jobs. Each job is divided into stages. And stages are divided into tasks.

Spark job -> smaller jobs -> stages -> tasks.

When you have failed tasks, you need to find the Stage that the tasks belong to. To do this, click on Stages in the Spark UI and then look for the Failed Stages section at the bottom of the page. If an executor runs into memory issues, it will fail the task and restart where the last task left off. If that task fails after 3 retries (4 attempts total by default) then that Stage will fail and cause the Spark job as a whole to fail. Memory issues like this will slow down your job so you will want to resolve them to improve performance.

To see your failed tasks, click on the failed stage in the Spark U/I. The webpage will then refresh with the tasks that ran during that stage. Scroll down to the bottom of the Tasks section to see task details.

Dynamic recording of the procedure outlined in the text to find your failed tasks

Next, you’ll need to click on the status column in the Tasks header to refresh the webpage so that all of your failed tasks sort to the top of the task section.

Continuing the dynamic recording of the procedure outlined in the text to find your failed tasks

In the far right column under the task section you’ll find an Error column. In this column, you’ll see the errors that failed your tasks.

Now you know how to determine which tasks failed and why they failed.

Overhead memory error

Whenever you see an error like the one below in your failed task…

ExecutorLostFailure (executor X exited caused by one of the running tasks) Reason: Container killed by YARN for exceeding memory limits. 34.4 GB of 34.3 GB physical memory used. Consider boosting spark.yarn.executor.memoryOverhead.

…consider boosting the memory overhead as suggested in the error message. Keep in mind that when you do this you will need to decrease your executor memory by the same amount you increase your memory overhead. Doing this ensures the executor container consumes the same amount of node memory as it did before. You might think this decrease will cause memory pressure inside the executor itself but it won’t.

Example of 36GB total memory, overhead is increased from 2GB to 4GB, executor memory decreased from34Gb to 32GB
Decrease executor memory by the same amount you increase overhead memory.

Memory overhead defaults to 2,389MB on Qubole so I typically increase memory overhead to 4,000MB as shown below. If you don’t use Qubole, you can find out the exact overhead memory used in your job by clicking on the Environments tab in the Spark U/I and searching for overhead.

--executor-memory 32G --conf spark.executor.memoryOverhead=4000/* The exact parameter for adjusting overhead memory will vary based on which Spark version you are using.  Check your environments tab to see which overhead parameter to use. */

If you get the error message again, keep incrementing memory overhead in multiples of 1000 while decrementing executor memory the same amount.

Shuffle memory errors

Sometimes your job may fail with memory errors like this one when reading data during shuffles…

ExecutorLostFailure (executor X exited caused by one of the running tasks) Reason: Container marked as failed: container_1541210250016_0002_01_003278 on host: 10.120.8.88. Exit status: -100. Diagnostics: Container released on an *unhealthy or unreachable* node

To tell if your job failed during a shuffle stage, look to see if there’s data in the Shuffle Read column for that particular stage. If there is then next confirm that stage has 200 tasks (the default amount for shuffle partitions) If you are using a different number of shuffle partitions then confirm shuffle partitions lists the adjusted amount.

Check the number of shuffle partitions in stages in the Spark admin interface

If these two conditions are true, then you can eliminate this error by increasing the shuffle partitions used during this stage so that the data read by the executor is divided into smaller sizes for consumption. To do this just pass the below parameter with X being the new number of shuffle partitions. I recommend sizing the shuffle partitions to twice the number of Spark cores running in your job.

X =num-executors * executor-cores * 2

Also, if you are using dataframes in your job, you should change parallelism to the same number as well.

--conf spark.sql.shuffle.partitions = X
--conf spark.default.parallelism = X

The number of shuffle partitions will increase in the Spark UI during your next run to match the value you passed for shuffle partitions. If the issue persists, I recommend doubling the number of shuffle partitions until the error goes away.

When all else fails…

Lower the core count for your executors by one. This will increase the memory to core ratio for your executors. This particular change should be made as a last resort because it will decrease the number of Spark cores running on your node and therefore decrease node efficiency.

Series contents

Learn more about technology at Expedia Group

--

--