Optimising Spark RDD pipelines
7 tips to save time and money, when processing lots of data with Spark.
Every day, in THRON, we collect and process millions of events regarding user-content interaction. The reason we do so is because we enrich user and content datasets, analise the timeseries, extract behaviour patterns and ultimately infer user interest and content characteristics from those; this is done to fuel lots of different cool benefits such as recommendations, Digital content ROI calculation, predictions and many more.
If we only had to use such data to present shiny BI reports it would have been a simple matter, but since this data has to fuel real-time recommentations and other user-facing services, we need to update both model and data in real-time and be able to query results with sub-second query times (well… hundredths of a second time, actually). We have also to grant a 2 year retention period and, for every update on data model, we need to reprocess all past events to ensure an accurate customer profile. Such requirements led us to focus on systems that can scale horizontally in a cost effective way.
Being a multitenant system we cannot even optimize the data processing for a specific workload (based on specific velocity, variety and/or volume) because it has to accomodate any workload of all our customers at the same time.
In THRON our primary language is Scala and we have been using it from the beginning to develop most of our services. When we started building our own recommender, evaluating Spark was natural and we found it very flexible to develop and, thanks to AWS EMR, which provides out-of-the-box Spark support, we had the chance to scale it without the need to know how to configure an Hadoop cluster and, at the same time, we could easily integrate Spark applications in our existing AWS Data Pipeline jobs.
As a big benefit, we could use lots of big machines for the enrichment process for a limited amount of time and keep costs under control by leveraging EC2 spot instances. The output of spark processing is then loaded into ElasticSearch clusters to achieve outstanding query performances. Unfortunately, at the time we tackled this problem, only RDD data structures were available on Spark (nowadays you could use DataSets and DataFrames, which are more powerful and easier).
From this experience we learned some lessons that we think you might appreciate.
Read all the data together
Our data is stored in S3 with different data formats: events in parquet, users in plain JSON, etc.. In the enrichment phase we need to join the events with the other data to get the results. Events have a different cardinality compared to the other data sources and at first we were not reading all the events together but we were using batches of few days. This has been good for a while because we didn’t encounter Spark scaling needs yet, but when the event count reached a certain treshold we observed that
- Spark cluster was idle between batches because of slow S3 writes;
- at the end of each batch most nodes were idle because the next batch couldn’t start until all the previous tasks weren’t completed: because our data (and yours too, we bet) is usually skewed this was a pretty common situation;
A result of those 2 issues was that we couldn’t scale the application horizontally: adding machines was not improving the performance because we were just increasing the idle time.
Before digging into complex optimizations, ensure you can minimise the cluster idle time.
To avoid the idle time between batches we naively tried to read all the data in a single step instead of reading it in multiple batches. We failed because we realised our data had a quite serious skew problem. Unskewing data allowed us to distribute it in a more even way across nodes and basically remove the idle times, increasing cluster efficiency and allowing horizontal scale to bring benefits.
Don’t be afraid of launching a lot of nodes: it’s better to have many nodes for a short time than having few nodes for a long time.
Know your data (and how to unskew it)
When you work with a lot of data it’s very important to know how your data is distributed and your application must be designed accordingly. Join is a very common transformation with Spark and if you are using hash join your data will be distributed in the nodes based on the hash of the join key:
every record with the same hash key will be stored in the same node.
This is very helpful because in this way each node has all the data needed for the join but, but what happens when you have some keys that cannot fit in a single node?
If your data is skewed you will probably end up having two different scenarios:
- one node working at 100% of cpu while all the others idle
- Java out of memory problems
I am not sure what’s the worst one but since we are unlucky we first encountered the former and, after, the latter.
When you need to join (or distribute your data across the nodes based on the hash key) you need to know if that key is well distributed and the maximum cardinality fits into a single node.
In case the keys aren’t safe to be distributed across the nodes you can always cheat by adding a step to analyse the dataset and add a salt in the unskewed keys: this way you can safely join the data and the application parallelism will gain in efficiency because all tasks will have, most likely, a similar size and duration.
Broadcast RDD carefully
If you have searched “how to optimize RDD join in Spark” in Google you probably already know what I mean: when you join a big RDD with a small one, broadcasting the smaller RDD in all nodes and then performing the join is very fast.
What’s the drawback? If your “small” RDD, at a given time, is not small anymore you will be presented with a memory problem in all your nodes and you need to update your code to handle the RDD properly. That’s what happened to us.
We now pay a lot of attention to small RDDs and we must be 100% sure their size will not grow in limit cases before leveraging the broadcasting join.
Default memory settings are always wrong
“Do not Rely on default memory settings” sounds as a very standard suggestion, but in Spark it has an additional reason to be repeated:
if you hit memory limits , the given message could cause a misunderstanding
the executor and driver default memory allocation is just 1GB and when you reach this limit the log message will suggest you to increase the memoryOverhead (a memory “safe margin” used by Spark) instead of increasing the memory allocation. Remember to always review Spark settings, some parameters depend on the machine characteristics like memory and cpu, others depend on your application like the parallelism.
Be careful writing algorithms that use a lot of memory because the Garbage Collector (Spark runs on JVM) has very poor efficiency in such cases
Working on parallelism may be used to lower memory usage, ensure you set the parallelism considering the size of your dataset.
A rule of thumb regarding memory settings configuration: it’s a good starting point to use, for the Spark driver, the same configuration as the executors.
Don’t write directly to S3
One of the common task in Spark is to read or write data from a data lake and, if you use AWS, you will read from and write to S3. It’s VERY simple to write the output to S3, the only thing to do is to specify the output location in the form “s3://bucket/prefix”. What we realised is that if you do so even after the application has finished to compute the result, the cluster will still keep running doing nothing.
This is because each Spark task writes the output to a temporary path and then will move the files into the destination folder at the end of the write process. S3 is not a file system and there is no native “move” operation, so what happens under the hood is that it creates a copy of the file in the destination folder and then (after the copy has finished) it deletes the source file. This step could take a long time and it looks like it isn’t executed in parallel even if you have to move just a few files. To avoid this we found that it’s quicker to write the application result in HDFS and to add a second step to copy the files from HDFS to S3 using S3DistCP.
We went from over 1 hour copy time to about 2 minutes.
In the public cloud world this translates to both time and money savings, because you pay for your cluster uptime, even when it does nothing.
Choose the correct file format
In Spark you can use many different file formats for input or output (text, CSV, JSON, Orc, Parquet, etc…) compressed with a wide range of encoders (bzip, gzip, etc…) and it’s not wise to choose randomly among the different file formats or encoders, the format should depend on how data is used.
You should consider that not all compression formats create splittable files: GZip, as an example, is very fast but it’s not splittable so when you have a single big file the read process cannot be parallelised. We found that we had better performance by keeping the file uncompressed and let Spark parallelise the read.
Another consideration was how we read the output. One of our data consumers is AWS Athena and we chose Parquet because it stores compressed data in a columnar format. Athena has a price model based on GB read. Using columnar format, saving costs, Athena query performance improves too because each query will address just the data it needs without needing to scan the whole dataset.
We never successfully wrote a Spark application at the first try. Spark application design can be very complex and unusual, so we often have to update the application logic to improve its scalability with the application workload:
Writing an application that brings the correct result on a sample dataset is easy, but delivering the same result with lots of data and distributed computing is just a different thing.
We found very useful writing a lot of unit tests: every time we have to optimize the computation logic and refactor code we make sure we don’t change our job result by simply running the tests. Validating this without automatic testing would be really expensive and error prone, this is why we spend extra time and effort to create many unit tests for Spark applications.
In these years we have learned a lot of lessons about writing and managing Spark applications and we are still learning a lot. One of the evolutions we plan to undertake, in order to further improve the performance and scalability of our code, is to move the application that uses the “old” RDD with the newer Datasets.
We are also developing an even more complex application with many different inputs and we want to use Spark accumulators to track how many records we are discarding to increase the input data validation.
We also plan to integrate in our applications some tools to analyse spark runs (such as Sparklens) and create better tests to catch issues without relying on huge test datasets.
Which Spark topics are you interested in hearing from us ?