Lessons learned while optimizing Spark aggregation jobs

Spark from the trenches — Part III

Yann Moisan
May 23, 2019 · 8 min read

In this third article of our Apache Spark series (see Part I, Part II and Part IV), we focus on a real-life use case, where we tried several implementations of an aggregation job.

Business Context

At Teads, we distribute ads to over 1.5bn people every month within professionally-produced content.

One of the main components of our platform is responsible for handling bid requests (an opportunity to display an ad) and for sending back a bid response (the ad to display and the associated price).

An advertising campaign can be set up with delivery constraints :

As a result, ads are filtered according to these requirements.

In order to analyse the ad delivery, we generate a log for each bid request, containing the reasons why an ad was filtered (we also use this log to train our prediction models). For example, a filtering reason can be geolocation, if an ad targets users from a specific country. For the sake of simplicity, we will use four filtering reasons: from A to D.

This article will focus on how to implement a reporting job that counts these filtering reasons for each ad. Such a job can be used to build troubleshooting tools.

Specification of the job

Input data

Here is the schema of the log.

|-- filtering_reasons: map (nullable = true)
| |-- key: string
| |-- value: array (valueContainsNull = true)
| | |-- element: string (containsNull = true)

The field filtering_reasons is a map with ad identifiers as keys and arrays of filtering reasons as values.

Expected output

Let’s take an example with 2 bid requests.

Request 1

Request 2

scala> df.show(false)
|filtering_reasons |
|Map(ad1 -> WrappedArray(A, B), ad2 -> WrappedArray(A, C))|
|Map(ad1 -> WrappedArray(A, D), ad3 -> WrappedArray(C)) |

So, ad1 is filtered two times with reason A, one time for reason B (in request 1), never for reason C, and one time for reason D (in request 2).

If we do the same exercise for ad2 and ad3, we see that the expected result is:

|ad_id|A |B |C |D |
|ad1 |2 |1 |0 |1 |
|ad2 |1 |0 |1 |0 |
|ad3 |0 |0 |1 |0 |

Technical Solution

Preliminary notes

A variable named allFilteringReasons contains all possible filtering reasons.

Implementation 1

The idea is to explode the map to have an ad column and then group filtering reasons by ad with an aggregate function. The desired aggregate function doesn’t exist in Spark, so we have to write a custom one.

Let’s have a closer look, step-by-step.

Explode — An explode method returns two columns when applied on a MapType column : one for the key and one for the value. There is also an as function made for this specific case, that takes a sequence of aliases.

Here is the resulting DataFrame :

|ad1 |[A, B] |
|ad2 |[A, C] |
|ad1 |[A, D] |
|ad3 |[C] |

na.fill — An ad may never have a given filtering reason, resulting in null values. The na.fill replaces all null values by 0.

Custom UDAF — For the record, here is the UDAF code (that’s not the aim of the article). The basic idea is to accumulate each filtering reason in a Map whose key is the reason and value is the counter.

SQL Plan

A note on Catalyst: When using the DataFrame/Dataset API, a query optimizer called Catalyst is used. This optimizer rewrites the query based on predefined rules.

For performance reasons, Spark tries to group together multiple operators inside a Whole-Stage CodeGen. In order to do that, Spark generates Java code on the fly and compiles it with Janino (see here for further details).

On the Web UI, in the SQL tab, we can click on a query to see a graphical representation of the physical plan.

Here is the plan for this implementation:

Implementation 2

Can we do better?

In this second iteration, the idea is to avoid using a UDAF, by transforming each row with a UDF and then use built-in aggregate functions (that could be better optimized by Spark).

If we look at this step-by-step we first have the same explode as in implementation 1.

User defined function — For each row of the dataset, filtering reasons are transformed into a Map. Hence, built-in aggregate functions sum can be used.

Aggregation — All values for a given key must be aggregated together.

Hence, we can do: sum($"map.key")

SQL Plan

The only difference with implementation 1 is that the SortAggregate is replaced by a HashAggregate.

Implementation 3

This time we want to avoid the use of the explode method by mapping directly on existing partitions.

Here, the aggregation by ad is made manually, inside the loop.

A given ad may appear on multiple partitions, so we need an additional step to aggregate each result on the ad_id.

SQL Plan

The plan is quite different from implementation 1 and 2, with the apparition of new operators :DeserializeToObject & SerializeFromObject

Implementation 4

We can also embrace a more functional programming style.

Here, we chain together the following methods :

SQL Plan

Similar to implementation 3 with some additional operators.

Interestingly enough, even if we use groupByKey and reduce (reduceByKey does not exist), a partial aggregation is done on the reduce side. Cf. SPARK-16391

Other implementations — Array based

In all previous implementations, counters are stored in a Map. But Map is not a very efficient data structure. It uses a lot of memory, it is not cache friendly and it generates boxing/unboxing in Scala to store Long values.

And given that all keys are known in advance, we can use an array instead. The reason A is at position 0, the reason B at position 1, and so on …

This doesn’t change our previous implementations a lot. For instance, here is the array-based version of our 3rd implementation:

We have precomputed a cache (reasonToIndex) between filtering reason and index.

Hence, lookups in the hashmap are replaced by direct accesses in the array : counter(reasonToIndex(reason))+=1

Execution time comparison

The application used for this benchmark processes one hour of our real production log with each implementation. As we can see in the SQL plan, the log contains 50M of input rows and the job produces a 3600 rows output.

This benchmark runs on an EMR cluster of 10 nodes (r3.xlarge), with 10 executors, 4 cores per executor and 20GB of RAM.

All possible implementations need at least one shuffle phase, to put all data for a given ad on the same node. In this case, we have only one shuffle, so our implementations are optimized regarding network exchange.

HashAggregate is more efficient than SortAggregate because we do not need to sort the data. Nevertheless, implementation 1 (with a SortAggregate) is more performant than implementation 2 (with a HashAggregate). It’s related to the complexity of both algorithms:

In this use case, using the explode method costs a lot because the explode factor is around 20 (number of output rows in generate / number of output rows in scan, as seen in the SQL plan). This is due to the generation of multiple InternalRow.

Although implementation 3 needs some deserialisation/serialisation (from Tungsten to standard objects) it remains the fastest (as seen in the SQL plan)


Thanks to Wassim Almaaoui, Han Ju, Amine Er-Raqabi, Benjamin Davy, Gaël Rico and Robert Dupuy for their help in writing this article.

Teads Engineering

150+ innovators building the future of digital advertising

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store