How we reduced our Apache Spark cluster cost using best practices

Umberto Griffo
Jan 13 · 8 min read

It’s been about 3 months now since I switched over to Lisbon from Italy. I’ve been offered a chance to work with one of the fanciest startups in Lisbon tb.lx, where we are developing smart solutions based on data for Trucks and Buses using a top-notch tech stack, which includes Apache Spark.

We use Apache Spark to implement scalable and reliable data transformation pipelines to feed event-driven applications. Apache Spark is fantastic and does what it promises to do, but you can’t consider it a “Swiss Army Knife” to sort your problem out without putting any effort. Using Spark to deal with massive datasets can become nontrivial, especially when you are dealing with a terabyte or higher volume of data. The first thing that comes up could be to use a large cluster of hundreds of machines with hundreds of cores and petabytes of RAM, but using a super-sized cluster has a cost that can exponentially grow. In this article, I’m going to introduce you to some techniques we are using to tune our Apache Spark jobs for optimal efficiency to save costs.


Use the Best Data Format

Apache Spark supports several data formats, including CSV, JSON, ORC, and Parquet, but just because Spark supports a given data storage or format doesn’t mean you’ll get the same performance with all of them. Parquet is a columnar storage format designed to only select data from columns that we actually are using, skipping over those that are not requested. This format reduces the size of the files dramatically and makes the Spark SQL query more efficient. The following picture from [1] about parquet file, explains what we could achieve with Column Pruning (projection push down) and Predicate Push Down:

As an example, we can imagine having a vehicle dataset containing information like:

Here is the code to persist a vehicles DataFrame as a table consisting of Parquet files partitioned by the destination column:

Below is the resulting directory structure as shown by a Hadoop list files command:

hadoop fs -ls /data/vehicles  /data/vehicles/destination=ATL
/data/vehicles/destination=BOS
/data/vehicles/destination=CLT
/data/vehicles/destination=DEN
/data/vehicles/destination=DFW
/data/vehicles/destination=EWR
/data/vehicles/destination=IAH
/data/vehicles/destination=LAX
/data/vehicles/destination=LGA
/data/vehicles/destination=MIA
/data/vehicles/destination=ORD
/data/vehicles/destination=SEA
/data/vehicles/destination=SFO

Given a table of vehicles containing the information as above, using the Column pruning technique if the table has 7 columns, but in the query, we list only 2, the other 5 will not be read from disk. Predicate pushdown is a performance optimization that limits with what values will be scanned and not what columns. So, if you apply a filter on column destination to only return records with value BOS, the predicate push down will make parquet read-only blocks that may contain values BOS. So, improve performance by allowing Spark to only read a subset of the directories and files. For example, the following query reads only the files in the destination=BOS partition directory in order to query the average arrival delay for vehicles destination to Boston:

You can see the physical plan for a DataFrame calling the explain method as follow:

== Physical Plan ==
TakeOrderedAndProject(limit=1001, orderBy=[avg(arrdelay)#304 DESC NULLS LAST], output=[src#157,dst#149,avg(arrdelay)#314])
+- *(2) HashAggregate(keys=[destination#157, arrdelay#149],
functions=[avg(arrdelay#152)],
output=[destination#157, avg(arrdelay)#304])
+- Exchange hashpartitioning(destination#157, 200)+- *(1) HashAggregate(keys=[destination#157],
functions=[partial_avg(arrdelay#152)], output=[destination#157, sum#321, count#322L])
+- *(1) Project[arrdelay#152, destination#157]
+- *(1)Filter (isnotnull(arrdelay#152) && (arrdelay#152 > 1.0))
+- *(1) FileScan parquet default.flights[arrdelay#152,destination#157] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[dbfs:/data/vehicles/destination=BOS], PartitionCount: 1, PartitionFilters: [isnotnull(destination#157), (destination#157 = BOS)], PushedFilters: [IsNotNull(arrdelay), GreaterThan(arrdelay,1.0)],
ReadSchema: struct<destination:string,arrdelay>

Here in bold, we can see partition filter push down, which means that the destination=BOS filter is pushed down into the Parquet file scan. This minimizes the files and data scanned and reduces the amount of data passed back to the Spark engine for the aggregation average on the arrival delay.

Cache Judiciously and use Checkpointing

Just because you can cache an RDD, a DataFrame, or a Dataset in memory doesn’t mean you should blindly do so. Depending on how many times the dataset is accessed and the amount of work involved in doing so, recomputation can be faster than the price paid by the increased memory pressure. If the data pipeline is long, so we have to apply several transformations on a huge dataset without allocating an expensive cluster, a checkpoint could be your best friend. As we saw in the previous paragraph, here is an example of a query plan:

== Physical Plan ==
TakeOrderedAndProject(limit=1001, orderBy=[avg(arrdelay)#304 DESC NULLS LAST], output=[src#157,dst#149,avg(arrdelay)#314])
+- *(2) HashAggregate(keys=[destination#157, arrdelay#149],
functions=[avg(arrdelay#152)],
output=[destination#157, avg(arrdelay)#304])
+- Exchange hashpartitioning(destination#157, 200)+- *(1) HashAggregate(keys=[destination#157],
functions=[partial_avg(arrdelay#152)], output=[destination#157, sum#321, count#322L])
+- *(1) Project[arrdelay#152, destination#157]
+- *(1)Filter (isnotnull(arrdelay#152) && (arrdelay#152 > 1.0))
+- *(1) FileScan parquet default.flights[arrdelay#152,destination#157] Batched: true, Format: Parquet, Location: PrunedInMemoryFileIndex[dbfs:/data/vehicles/destination=BOS], PartitionCount: 1, PartitionFilters: [isnotnull(destination#157), (destination#157 = BOS)], PushedFilters: [IsNotNull(arrdelay), GreaterThan(arrdelay,1.0)],
ReadSchema: struct<destination:string,arrdelay>

Exchange means a shuffle occurred between stages, while HashAggregate means an aggregation occurred. If the query contains a join operator, you could see ShuffleHashJoin, BroadcastHashJoin (If one of the datasets is small enough to fit in memory), or SortMergeJoin (inner joins). Checkpoint saves the data on disk truncating the query plan, and this is a nice feature because each time you apply a transformation or perform a query on a Dataset, the query plan grows. When the query plan starts to be huge, the performances decrease dramatically, generating bottlenecks. In order to avoid the exponential growth of query lineage, we can add checkpoints in some strategic points of the data pipeline. But how can we understand where to put them? A possible rule of dumb could be defining a complexity score as the deepness of the query plan. Each time an Exchange, HashAggregate, ShuffleHashJoin, BroadcastHashJoin, SortMergeJoin occurred, we add one point to the complexity score. Each time the sum of each of them is greater or equal to 9, do a checkpoint.

In addition, using checkpointing will help you to debug the data pipeline because you will know precisely the status of the job.

Use the right level of parallelism

Clusters will not be fully utilized unless the level of parallelism for each operation is high enough. Spark automatically sets the number of partitions of an input file according to its size and for distributed shuffles. When you use a DataFrame or a Dataset, it creates several partitions equal to spark.sql.shuffle.partitions parameter with a default value of 200. Most of the time works well, but when the dataset is starting to be massive, you can do better using the repartition function to balance the dataset across the workers.

The below picture I made inspired by [6] shows how to use the Spark UI to answer to the following questions:

· Any outlier in task execution?

· Skew in data size, compute time?

· Too many/few tasks (partitions)?

· Load balanced?

You can tune the number of partitions, asking those questions. The above example is a best-case scenario where all the tasks are balanced, and there isn’t skew in data size. In our specific use case, where we are dealing with billions of rows, we have found that partitions in the range of 10k work most efficiently. Suppose we have to apply an aggregate function like a groupBy or a Window function on a dataset containing a time-series of billions of rows, and the column id is a unique identifier, and the column timestamp is the time. In order to well-balance the data, you can repartition properly the DataFrame/Dataset as follows:

This repartition basically will balance the dataset and the load on the workers speeding up the pipeline. Another rule of thumb [9] is that tasks should take at most 100 ms to execute. You can ensure that this is the case by monitoring the task duration from the Spark UI. If your tasks take considerably longer than that keep increasing the level of parallelism, by say 1.5, until performance stops improving.

Use Broadcast Joins

Furthermore, an additional technique to improve the performance is analyzing the DataFrame size to get the best join strategy. If the smaller DataFrame is small enough to fit into the memory of each worker, we can turn ShuffleHashJoin or SortMergeJoin into a BroadcastHashJoin. In broadcast join, the smaller DataFrame will be broadcasted to all worker nodes. Using the BROADCAST hint guides Spark to broadcast the smaller DataFrame when joining them with the bigger one:

This way, the larger DataFrame does not need to be shuffled at all.

Recently Spark has increased the maximum size for the broadcast table from 2GB to 8GB. Thus, it is not possible to broadcast tables which are greater than 8GB.

If the smaller DataFrame does not fit fully into memory, but its keyset does, it is possible to exploit this. As a join will discard all elements of the larger DataFrame that do not have a matching partner in the medium size DataFrame, we can use the medium key set to do this before the shuffle. If there is a significant amount of entries that get discarded this way, the resulting shuffle will need to transfer a lot fewer data.

It is important to note that the efficiency gain here depends on the filter operation, actually reducing the size of the larger DataFrame. If there are not a lot of entries lost here (e.g., because the medium size DataFrame is some king of large dimension table), there is nothing to be gained with this strategy.

Conclusion

In this article, I showed you some techniques we are using daily to tune our Apache Spark jobs to save costs and speed up our data transformation pipelines. We learned what the benefits of using parquet format are, how and when to checkpoint a Dataset, a technique to choose the right level of parallelism when we have to deal with massive dataset and how to take the advantages of broadcast joins. Nevertheless, there are many other optimization techniques that we can use, especially with rapidly evolving technologies like Spark. So stay tuned!

Acknowledgements

This article was revised by my dear colleagues Carlos Azevedo, Margarida Pedro, João Paulo Figueira, and Nelson Costa. Thank you for your amazing help. This article would not have been possible without you.

References

[1] Efficient Data Storage for Analytics with Parquet 2.0

[2] The Columnar Roadmap: Apache Parquet and Apache Arrow

[3] The Parquet Format and Performance Optimization Opportunities Boudewijn Braams (Databricks)

[4] Tips and Best Practices to Take Advantage of Spark 2.x

[5] Aggregations execution in Apache Spark SQL

[6] Understanding Query Plans and Spark UIs

[7] Spark performance tuning from the trenches

[8] Optimize Spark SQL Joins

[9] Spark best practices

[10] Apache Spark-Best Practices and Tuning

Umberto Griffo works as a Data Engineer for tb.lx in Lisbon, Portugal.

IoTransportation

Data is key towards a sustainable future of transportation. http://tblx.io/

Thanks to Carlos Azevedo, Margarida Pedro, and João Paulo Figueira

Umberto Griffo

Written by

Data Engineer at https://tblx.io/

IoTransportation

Data is key towards a sustainable future of transportation. http://tblx.io/

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade