Solving 5 Mysterious Spark Errors

yhoztak
6 min readSep 7, 2018

--

At ML team at Coupa, our big data infrastructure looks like this:

https://raw.githubusercontent.com/jupyter-incubator/sparkmagic/master/screenshots/diagram.png

It involves Spark, Livy, Jupyter notebook, luigi, EMR, backed with S3 in multi regions. It’s powerful and great(This post explains how great it is), but it’s sometime hard to debug when there’s issue

How come it’s hard to debug in Spark?

from: https://cwiki.apache.org/confluence/display/SPARK/PySpark+Internals

Lazy evaluation is excellent way of optimize performance by accumulating the tasks until it needs to really perform it, but it sort of makes it hard to debug. All the transformation in Spark is lazy. It looks like it performed complex job fast, but it actually accumulate the computations till it’s really needed to compute, such as when df.showor df.write are called.

Distributed computation can get complex, plus with PySpark, Spark infrastructure is powered by JVM but it’s using py4j to translate from python like above, so the error message looks like Java/scala related and can look really long

Anyway, the error message is sometime mysterious and I’d like to introduce them.

Mysterious (..or weird) Spark Errors:

Here are some of the tricky ones I run into before that I had no idea what it meant initially… can you guess?

Problems & Solutions

Problem 1: Resolved attribute(s) your_field#xx missing from …..

The actual error looks like this and noted in Spark Jira

Exception in thread "main" org.apache.spark.sql.AnalysisException: resolved attribute(s) surname#20 missing from id#0,birthDate#3,name#10,surname#7 in operator !Project [id#0,birthDate#3,name#10,surname#20,UDF(birthDate#3) AS birthDate_cleaned#8];

This error usually happens when two dataframes, and you apply udf on some columns to transfer, aggregate, rejoining to add as new fields on new dataframe..

The solutions:

It seems like if I specifically select columns sometime works

left = left.select(specific_coluns_from_left)
left.cache()
right = right.select(specific_coluns_from_left)
right.cache()
left.join(right, ['column_to_join'])

But sometime I still have issue and following the comments from https://issues.apache.org/jira/browse/SPARK-14948, if I clone the dataframe like the following, it always works

left_cloned = left.toDF(columns_in_order_renamed_to_avoid_confusion)
left_cloned.join(right, ['column_to_join'])
## or in sql
left_cloned.registerTable("left_cloned")
sqlContext.sql("""
SELECT right.* FROM
left_cloned
JOIN right
on right.col1 =left_cloned.col2...
""")

More Common join issue

Usually if you are joining multiple tables with the same field, you get much cleaner error like:

Reference ‘name’ is ambiguous, could be: name#8484, name#8487.

OK I can see what it means, and solution is also simple.

The solution:

If you are joining two dataframes with multiple keys with the same name, code like below pretty well. [‘column1’, ‘column2’] are the columns you are joining on. and you’ll have only one column1 and column2 as a result:

left.join(right, ['column1', 'column2'])

Problem 2: An error occurred while calling o64.cacheTable.

or An error occurred while calling o206.showString.

The real error message may be : The query may have NULL as a value on the columns you are joining with.

It’s really long stacktrace and some part may look like this. It doesn’t really say much other than it crashed and shut down and has long stacktrace:

An error occurred while calling o206.showString.: org.apache.spark.SparkException: Job 25 cancelled because SparkContext was shut down
...
...

I encountered this error a few times. I hate this error because this would sometime takes 2~4 hours then crashes. The real error is probably OutOfMemoryError but somehow it’s not showing that in stack trace.

When you have null field on the field you are joining with, then it’ll create huge many to many relationship that creates pretty much all combinations of rows, then in the end it blow up since it cannot handle it anymore.

Obviously you should not have null on your field that you are joining, but sometime when I’m joining many tables and have expectation not to be null, I may not realize.

Good practice would be always have assertion of field before you use the field for join by simply have these kind of lines is helpful.

assert df.filter("my_column is null").count==0

Also if you don’t have null, your many to many join is creating too many rows. Make sure to count the top distribution of joined field like the following and roughly calculate the estimate of joined columns

df1.groupby('col1').agg(F.countDistinct('col2').alias('uniq_col2_count')).orderBy(F.desc('uniq_col2_count')).show()

In my case, the most frequent col2 was 40000, and count of unique count of col1 was 500

It could end up, 40000 x 40000 x 500 = 800,000,000,000 and my cluster could not handle this well, so ended up tuning up.

Problem 3. After successfully importing it, “your_module not found” when you have udf module like this that you import

See the following code as an example. Doesn’t that look weird?

from lib.preprocess import find_keywords

worked fine, but then when I do…

df.show()

this error shows up.

The real error message: you need to distribute your function through spark_context.addPyFile

Solution:

There’s actually other way to add module so that worker nodes also get them.

sc.addPyFile(path_to_your_module.py)

This forces all worker nodes to have reference to your module file. This happens even when the module file is already distributed through worker server and the pythonpath is not updated. so having

import sys
sys.path.append('/tmp/modules/lib')

so that it doesn’t have any subfolder with modules also helps clarifying where the issue may be.

Official documentation says for addPyFile

Add a .py or .zip dependency for all tasks to be executed on this SparkContext in the future.

4.“Cannot have map type columns in DataFrame which calls set operations”

The real error message may be: please use StructType for complex format instead.

We have requirement to export dataframe to JSON in specific format, so I was writing some udf function with specific return type

Original value of the field is stores as

{‘US’: 3, ‘EU’: 0, ‘UK’: 0}

But we want to export it as array of JSON with keys

[{“country”:”US”, “count”:3}, {“country”:”EU”, “count”:0}, {“country”:”UK”, “count”:0}]

Then I got weird error at export..

Cannot have map type columns in DataFrame which calls set operations

….What does thas mean?

After digging into spark code, I found out that this error is actually caused from df.dropDuplicates() function because that call .match

Solutions:

Do, I could apply this udf right before exporting to JSON to avoid calling dropDuplicates(), but found better solution, which was to create schema with StructType separately like the following. It’s much more organized and flexible this way

And if you JSON is much more complex that you need to twisted around then it’s much easier to just

  • write JSON formatter decorator class
  • iterate through Pandas dataframe and format it.

5. IOPub data rate exceeded

The real error message may be: your docker file is maxed out

When I googled and stacktraced around, looks like having following options would solve it, but it didn’

--NotebookApp.iopub_data_rate_limit=10000000

In my case, this was caused because docker cap out the disk space…

On Mac, docker has dedicated file to store all docker stuff and it caps out at 64GB, and mine was full!

>ls -lh ~/Library/Containers/com.docker.docker/Data/com.docker.driver.amd64-linux/Docker.qcow2-rw-r--r--@ 1 me  staff   64G Sep  7 16:00 /Users/me/Library/Containers/com.docker.docker/Data/com.docker.driver.amd64-linux/Docker.qcow2

The solution:

  1. Running this to remove unused docker processes and images
docker rm $(docker ps -q -f 'status=exited')
docker rmi $(docker images -q -f "dangling=true")

2. Remove `Docker.qcow2` file

3. Then restart Docker

4. Then rebuild Docker image for spark magic Jupyter notebook

Then now this error is now magically gone.

--

--