The following post is intended to summarize some of the Spark SQL optimization challenges faced by the Data team at JW Player and how we overcame these challenges. On average, we ingest 550 GB of data a day, comprising roughly 1.5 billion rows, and we must run complex Spark SQL jobs on this set of data in order to transform it into a more manageable size from which insights can be gleaned. Play Sessions is one such job, and the focus of this post. I’ve also included some helpful links next to some section headings — these pages were instrumental in learning Spark SQL.
What are Play Sessions?
Play Sessions are the bread and butter of the Data Pipelines engineering team at JW Player. They are an attempt to identify a single ‘unit of work’ of a video viewer by computing transformations and aggregations in Spark SQL to compact the data down into a much more manageable size. These query operations are performed across roughly 100 columns, which turns out to be a hefty query and the impetus for why we needed to tune & optimize our Spark SQL job.
At JW Player, we run our Spark SQL jobs on Amazon Elastic MapReduce (EMR) clusters that we spin up as needed for jobs to run. In this case, we were initially running our Play Sessions queries on a cluster with an Amazon EC2 c4.xlarge size master node and five c4.xlarge core nodes — each node having 7.5 GB of memory and 4 vCPUs. The cluster runs Hadoop version 2.7.3 and Spark version 2.1.0.
The Play Sessions Query
The Play Sessions query, depicted below, performs transformations against an upstream data source stored as Avro, then `JOINS` to another, much smaller table, runs aggregations, and ultimately writes the results out to an Amazon S3 bucket as Parquet.
The Performance Problem
Our data pipeline has one core performance requirement: that each downstream job run faster than the job before it. If this requirement is not met, downstream jobs will slowly build up lag over time and never be able to process all the incoming data.
As part of a new data initiative at JW Player, we considerably expanded the input data set in our session job, and quickly realized that the job was running significantly slower than its upstream job (the one producing Avro, which is the input for Play Sessions) — we were building lag. Formerly, the job would run for roughly 16–18 minutes, but now we were seeing runtimes upwards of 23 minutes, with occasional spikes up into the 40 and 50 minute range!
We needed to find a way to optimize the sessions queries and shave enough time off to keep pace with the upstream job.
Where To Begin?
A fair starting point seemed to be the Spark SQL documentation itself, especially as they had a section dedicated to performance tuning. The next few sections of this post will be short summaries of the various learnings I gleaned from a short, deep dive into the waters of Spark SQL.
Great, Apache’s got docs dedicated to performance tuning — this will be a piece of cake!
Right off the bat, the docs note that tables can be cached in memory (we’ll come back to this later) and then lists two properties related to caching. These seem to be fine as default, so we move on to the below configuration options.
This is the first option that catches my eye, as we are actually already setting a value for it. Currently, we’re setting a value of 12, but the default is 200. Since this controls the number of partitions Spark SQL uses for joins & aggregations (exactly what our session queries are doing) we likely want this value to be higher. I raise the value up to 100 to be conservative and seem to get some incremental performance benefits, but am still nowhere close to keeping pace with the upstream job. However, this option does eradicate the heavy spikes in runtime we had been seeing, where sporadically the job would take upwards of 40 or 50 minutes — the spikes are gone, but the queries are still far too slow.
This option seems quite promising now that we are doing a `JOIN` in our sessions queries. If we can broadcast the `JOIN` out through memory in Spark, we can avoid extra shuffles, as each partition will have the data it needs, in memory, to accomplish the `JOIN`. Since the table we are joining to is quite small, this option seems appetizing, and turns out to be… later on.
For now, it doesn’t seem to move the runtime needle. I set the value to roughly double the size of the joined-to table, and move on.
Understanding The Logs
Runtime is a high-level indicator of whether your tweaks are making an impact. Run the same batch through sessionization over and over (and over) again, and if your overall runtime goes down, great! You’re doing something right.
However, it became increasingly important to understand the Spark SQL logs in a lot more detail, and discover other ways of digging deeper into what was going on behind the scenes.
Spark outputs the time taken after each command it runs — even commands defining functions, for example. This proved useful in honing in on the bottlenecking query, which, to perhaps no one’s surprise, was the query performing all the aggregations.
Here are some example log lines to illustrate:
Time taken: 815.051 seconds
18/05/05 14:45:30 INFO CliDriver: Time taken: 815.051 seconds
Tasks and Runtime
Spark will also output the time taken for each task within a given query, which ordinarily might not seem that helpful, as we know the total runtime. However, stare at the logs long enough and they’ll start speaking to you — after awhile, it became apparent that certain tasks within a stage (a set of parallel tasks) were taking much, much longer than others. Spark can’t move on to the next stage of a query until all of the previous stage’s tasks have completed, so we were losing out on a lot of the benefits of parallelization here.
This problem was greatly magnified when we had only a few shuffle partitions, as one task might take 500 seconds while the rest took only 200. Painful — and certainly not ideal.
Again, some example log lines help to illustrate:
18/05/05 14:39:18 INFO TaskSetManager: Finished task 75.0 in stage 14.0 (TID 470) in 39090 ms on ec2.internal (executor 2) (98/100)
18/05/05 14:39:19 INFO TaskSetManager: Finished task 60.0 in stage 14.0 (TID 456) in 57565 ms on ec2.internal (executor 1) (99/100)
18/05/05 14:39:20 INFO TaskSetManager: Finished task 12.0 in stage 14.0 (TID 411) in 72625 ms on ec2.internal (executor 1) (100/100)
Interestingly, Spark can output a query plan as well, by running an `EXPLAIN EXTENDED` command on a given query. While this didn’t ultimately prove useful in this exercise, understanding the query plan could likely be a source of discoveries for further optimizations.
The Spark UI
The Spark UI provides a handy browser view to inspect jobs that have run in detail. For each job, you can see the various stages, tasks, and executors of the job, along with the DAG (Directed Acyclic Graph), how much garbage collection has taken place, and lots of other useful tidbits of information.
Optimizing the Query
Equipped with some understanding of what’s going on behind the scenes and a couple of good Spark SQL settings, it was time to turn toward ways of improving the query itself.
As mentioned, the query computes transformations and then aggregations across roughly 100 columns. The key point here is that after the aggregations are computed, there are three tables built off the resulting dataset, each applying their own filters and selecting their own subsets of the columns.
Let’s return to our `autoBroadcastJoinThreshold` setting, shall we? After scanning the logs and the Spark UI, it became apparent that Spark wasn’t actually broadcasting the join in memory — in particular, the UI showed extra stages in the DAG corresponding to additional table scans on the joined-to table. As it turns out, Hive supports an `ANALYZE TABLE` command that computes statistics on a table — most importantly, the size of the table. Spark must have this information in order to appropriately do a broadcast `JOIN`.
Once we analyzed the table, we saw two entire stages of the DAG disappear, a great victory and simplification of the overall job (later on, we learned that you can also give query hints to Spark that will force the broadcast).
We were getting close to a runtime that would fit within the timeframe we were allotted. There was one more big victory on the horizon.
The Big Win — Caching 7
Our session queries have always written their output to S3 — we want to persist the data so that we can load it into downstream applications. However, as it turns out, the writing and subsequent reading of data to and from S3 was quite an expensive set of tasks all on their own. Furthermore, the aggregations table itself does not need to be persisted on S3.
With these two thoughts in mind, I learned that Spark has an incredibly powerful command called `CACHE TABLE` that allows us to compute the results of the query and store them in memory, avoiding any need to write or read data to a filesystem. Beyond that, we can lazily cache the table, so that each subsequent query will only evaluate the pieces it needs (and queries beyond that will be able to read from the cache where there is overlap). In total, this optimization shaved whole minutes off of our job and led to quite a nice performance improvement. However, it did come at a cost — we needed more hardware.
Choosing the Right Cluster
Prior to the significant expansion of our input dataset, our sessions job had been running on five core c4.xlarge Amazon EC2 nodes (with a c4.xlarge master node) — we had scaled up from three to four to five nodes with previous releases and the addition of hardware had always met our performance needs.
However, in this case, our initial attempts at throwing more hardware at the problem were not helping — we tried scaling up and saw virtually no improvements in runtime.
With the aforementioned tweaks to the Spark settings and various query optimizations, we did start seeing improvements as we scaled up hardware, but still needed to find the right cluster size and instance types to handle the new memory requirements imposed by the broadcast `JOIN` and the caching of the aggregation table.
After some experimentation and running the jobs on a variety of cluster sizes and instance types, we landed on m4.2xlarge size nodes in order to support the increased memory requirements as well as the additional processing power we needed to handle the larger incoming datasets.
Hopefully this post helps point folks in the right direction when they first set out on a journey of Spark SQL optimization. There are no easy answers out there, and I’ve just barely scratched the surface of ways to optimize the session job and reduce the overall runtime.
However, with the above adjustments in place, we have managed to bring the runtime nicely below the pace of the upstream job, and can safely move forward with computing play sessions on larger and larger datasets.
Dan is a software engineer on the Data Pipelines team at JW Player.