You Won’t Believe How Spark Shuffling Will Probably Bite You (Also Windowing)

I see this in most new to Spark use cases (which lets be honest is nearly everyone). Spark shuffle is something that is often talked about but it’s typically done with hand wavey advice to “minimize shuffling”

The Spark docs do share information on shuffling but leave out some proper nuance or giant warning symbols but I’ll share the important things from The Spark Programming Guide and the

The Shuffle is an expensive operation since it involves disk I/O, data serialization, and network I/O.

And the why?

During computations, a single task will operate on a single partition — thus, to organize all the data for a single reduceByKey reduce task to execute, Spark needs to perform an all-to-all operation. It must read from all partitions to find all the values for all keys, and then bring together values across partitions to compute the final result for each key

and

Internally, results from individual map tasks are kept in memory until they can’t fit. Then, these are sorted based on the target partition and written to a single file. On the reduce side, tasks read the relevant sorted blocks.

and

When data does not fit in memory Spark will spill these tables to disk, incurring the additional overhead of disk I/O and increased garbage collection.

Ok great which operations do this?

Operations which can cause a shuffle include repartition operations like repartition and coalesce, ‘ByKey operations (except for counting) like groupByKey and reduceByKey, and join operations like cogroup and join.

Ok so like every operation that everyone new to spark wants to do. I’ll add so does groupBy (without the ByKey). Ok great how can we avoid this. Some more hints (but I’d argue this should be in giant letters it bites so many people).

This means that long-running Spark jobs may consume a large amount of disk space. The temporary storage directory is specified by the spark.local.dir configuration parameter when configuring the Spark context.

Ok so lets drill into that last bit spark.local.dir or more likely the SPARK_LOCAL_DIRS parameter (since after 1.0 the latter overrides the former if set, but it could also be LOCAL_DIRS if you’re on YARN). On DSE this is typically set on install to /var/lib/spark and inside of there you’ll find:

  1. /var/lib/spark/rdd (the important bit)
  2. /var/lib/spark/worker (we will need to visit this in another post)

This default /var/lib/spark folder kills people all the time, and can be one of the more important tunings you set but almost no one gets this.

Ways This Can Kill You

Running Out of Space

So what happens if I have tiny SSD with only 10gb space left for /var/lib/spark (this really happens)? Lets say I combine this 10gig free spindle disk with say groupByKey where the key is State and there is 30 gigs in Texas and 40 gigs in California?

You guessed it those nodes that are responsible for Texas and California are never able to complete their work and you get fun errors like:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found

Lack Of Speed Kills

So now imagine you see this little drive you have before you and replace it with a bit fat old 1TB 7200 RPM SATA drive, everything is fixed right?

Wrong! Now your shufflng and windowing (windowing??!?! more on that later) jobs are trying to get on and off this disk with whatever speed they can, and you probably didn’t actually map the /var/lib/spark to a dedicated spindle, so you have contention with whatever else is happening. Plus you have to fight among any other jobs going on at the same time (while not contention the mixed workload will cause the spindle to get laggy). So your performance will plummet with this nice little bottleneck anytime you do a shuffle operation.

In the worst case the disk gets totally overwhelmed and you can even get fun errors like:

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found

Get this on a faster drive, preferably an SSD.

Spark TTL Cleaner Kills

This is more for long windowing operations or very large batch jobs that have to work on enough data to have to flush data to disk (guess where they flush it). For a long time in Spark and still for those of you running a version older than Spark 1.3 you still have to worry about the spark TTL Cleaner which will be removed in 2.0. Why? See https://github.com/apache/spark/pull/10534

Yep you saw that, it can delete data out from under the job. Say you have a 15 minute window that you calculate ever 1 minute and your ttl cleaner is set to 16 minutes, say the job runs long and doesn’t complete till minute 17. Guess what happens? You get fun errors like (see a trend)

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3967.0:0 failed 4 times, most recent failure: Exception failure in TID 43518 on host ********: java.lang.Exception: Could not compute split, block input-0-1416573258200 not found

Solutions

  1. Just set SPARK_LOCAL_DIRS on standalone and mesos based schedulers and LOCAL_DIRS on YARN based schedulers to point to a different location than the default.
  2. Make sure that location is an SSD and of a large enough size to fit your workload.
  3. Determine what your workloads LOOK like before you deploy them. Shuffle operations aren’t free and this is still distributed computing, if these operations are performance sensitive you should think long and hard about how you have your data partitioned from the get go. You can always do things like The Weather Channel (relevant slide below) and prepartition your data into groups

Summary

Spark is a powerful tool, but it’s far too easy to forget it’s still distributed technology and you still have to think about how you do things, and you may have to break old habits that were probably really slow before on a single computer with 25gb of data, but is unbearable with a 100 nodes and petbaytes of data.