Lessons learned while optimizing Spark aggregation jobs

Spark from the trenches — Part III

Yann Moisan
Teads Engineering
8 min readMay 23, 2019

--

In this third article of our Apache Spark series (see Part I, Part II and Part IV), we focus on a real-life use case, where we tried several implementations of an aggregation job.

Business Context

At Teads, we distribute ads to over 1.5bn people every month within professionally-produced content.

One of the main components of our platform is responsible for handling bid requests (an opportunity to display an ad) and for sending back a bid response (the ad to display and the associated price).

An advertising campaign can be set up with delivery constraints :

  • target specific users, depending on their geolocation,
  • target specific devices, OS, browsers,
  • etc.

As a result, ads are filtered according to these requirements.

In order to analyse the ad delivery, we generate a log for each bid request, containing the reasons why an ad was filtered (we also use this log to train our prediction models). For example, a filtering reason can be geolocation, if an ad targets users from a specific country. For the sake of simplicity, we will use four filtering reasons: from A to D.

This article will focus on how to implement a reporting job that counts these filtering reasons for each ad. Such a job can be used to build troubleshooting tools.

Specification of the job

Input data

Here is the schema of the log.

root
|-- filtering_reasons: map (nullable = true)
| |-- key: string
| |-- value: array (valueContainsNull = true)
| | |-- element: string (containsNull = true)

The field filtering_reasons is a map with ad identifiers as keys and arrays of filtering reasons as values.

Expected output

Let’s take an example with 2 bid requests.

Request 1

  • ad1 is filtered by reasons A and B
  • ad2 is filtered by reasons A and C

Request 2

  • ad1 is filtered by reasons A and D
  • ad3 is filtered by reason C
scala> df.show(false)
+---------------------------------------------------------+
|filtering_reasons |
+---------------------------------------------------------+
|Map(ad1 -> WrappedArray(A, B), ad2 -> WrappedArray(A, C))|
|Map(ad1 -> WrappedArray(A, D), ad3 -> WrappedArray(C)) |
+---------------------------------------------------------+

So, ad1 is filtered two times with reason A, one time for reason B (in request 1), never for reason C, and one time for reason D (in request 2).

If we do the same exercise for ad2 and ad3, we see that the expected result is:

+-----+---+---+---+---+
|ad_id|A |B |C |D |
+-----+---+---+---+---+
|ad1 |2 |1 |0 |1 |
|ad2 |1 |0 |1 |0 |
|ad3 |0 |0 |1 |0 |
+-----+---+---+---+---+

Technical Solution

Preliminary notes

A variable named allFilteringReasons contains all possible filtering reasons.

Implementation 1

The idea is to explode the map to have an ad column and then group filtering reasons by ad with an aggregate function. The desired aggregate function doesn’t exist in Spark, so we have to write a custom one.

Let’s have a closer look, step-by-step.

Explode — An explode method returns two columns when applied on a MapType column : one for the key and one for the value. There is also an as function made for this specific case, that takes a sequence of aliases.

Here is the resulting DataFrame :

+-----+-----------------+
|ad_id|filtering_reasons|
+-----+-----------------+
|ad1 |[A, B] |
|ad2 |[A, C] |
|ad1 |[A, D] |
|ad3 |[C] |
+-----+-----------------+

na.fill — An ad may never have a given filtering reason, resulting in null values. The na.fill replaces all null values by 0.

Custom UDAF — For the record, here is the UDAF code (that’s not the aim of the article). The basic idea is to accumulate each filtering reason in a Map whose key is the reason and value is the counter.

SQL Plan

A note on Catalyst: When using the DataFrame/Dataset API, a query optimizer called Catalyst is used. This optimizer rewrites the query based on predefined rules.

For performance reasons, Spark tries to group together multiple operators inside a Whole-Stage CodeGen. In order to do that, Spark generates Java code on the fly and compiles it with Janino (see here for further details).

On the Web UI, in the SQL tab, we can click on a query to see a graphical representation of the physical plan.

Here is the plan for this implementation:

  • The Generate operator corresponds to the explode method in the implementation,
  • Generate breaks the WholeStageCodegen in two,
  • Exchange corresponds to a shuffle operation between 2 stages,
  • SortAggregate appends twice (before and after the shuffle) because Spark performs the aggregation locally on each mapper before sending results to a reducer.

Implementation 2

Can we do better?

In this second iteration, the idea is to avoid using a UDAF, by transforming each row with a UDF and then use built-in aggregate functions (that could be better optimized by Spark).

If we look at this step-by-step we first have the same explode as in implementation 1.

User defined function — For each row of the dataset, filtering reasons are transformed into a Map. Hence, built-in aggregate functions sum can be used.

Aggregation — All values for a given key must be aggregated together.

  • There is a not so well known syntax to access a given key on a MapType column $"map.key".
  • Another possible way is $"map".getItem(key). Note: if the key doesn’t exist in the map, null is returned.

Hence, we can do: sum($"map.key")

SQL Plan

The only difference with implementation 1 is that the SortAggregate is replaced by a HashAggregate.

Implementation 3

This time we want to avoid the use of the explode method by mapping directly on existing partitions.

Here, the aggregation by ad is made manually, inside the loop.

A given ad may appear on multiple partitions, so we need an additional step to aggregate each result on the ad_id.

SQL Plan

The plan is quite different from implementation 1 and 2, with the apparition of new operators :DeserializeToObject & SerializeFromObject

  • DeserializeToObject: with the DataFrame API, a lot of operators work on InternalRow, the optimized representation of a row into memory. But the MapPartitions operator works on a standard Scala object. We need to use an operator, DeserializeToObject, to convert data from an InternalRow to a Scala object.
  • SerializeFromObject: this operator is used to go back to the DataFrame world.

Implementation 4

We can also embrace a more functional programming style.

Here, we chain together the following methods :

  • flapMap: to replace theexplode
  • groupByKey followed by mapValues
  • reduceGroups to aggregate values

SQL Plan

Similar to implementation 3 with some additional operators.

Interestingly enough, even if we use groupByKey and reduce (reduceByKey does not exist), a partial aggregation is done on the reduce side. Cf. SPARK-16391

Other implementations — Array based

In all previous implementations, counters are stored in a Map. But Map is not a very efficient data structure. It uses a lot of memory, it is not cache friendly and it generates boxing/unboxing in Scala to store Long values.

And given that all keys are known in advance, we can use an array instead. The reason A is at position 0, the reason B at position 1, and so on …

This doesn’t change our previous implementations a lot. For instance, here is the array-based version of our 3rd implementation:

We have precomputed a cache (reasonToIndex) between filtering reason and index.

Hence, lookups in the hashmap are replaced by direct accesses in the array : counter(reasonToIndex(reason))+=1

Execution time comparison

The application used for this benchmark processes one hour of our real production log with each implementation. As we can see in the SQL plan, the log contains 50M of input rows and the job produces a 3600 rows output.

This benchmark runs on an EMR cluster of 10 nodes (r3.xlarge), with 10 executors, 4 cores per executor and 20GB of RAM.

All possible implementations need at least one shuffle phase, to put all data for a given ad on the same node. In this case, we have only one shuffle, so our implementations are optimized regarding network exchange.

HashAggregate is more efficient than SortAggregate because we do not need to sort the data. Nevertheless, implementation 1 (with a SortAggregate) is more performant than implementation 2 (with a HashAggregate). It’s related to the complexity of both algorithms:

  • In impl. 1, only filtering reasons for a given ad are considered (2 or 3 on average), so the map or the array are sparse.
  • In impl. 2, all filtering reasons are considered, so this triggers a lot more lookups.

In this use case, using the explode method costs a lot because the explode factor is around 20 (number of output rows in generate / number of output rows in scan, as seen in the SQL plan). This is due to the generation of multiple InternalRow.

Although implementation 3 needs some deserialisation/serialisation (from Tungsten to standard objects) it remains the fastest (as seen in the SQL plan)

Conclusions

  1. As seen previously, there are multiple ways to implement a given algorithm with Spark.
  2. Knowing and choosing the appropriate data structure is one of the most important skills for a software engineer and especially in a big data context.
  3. In order to improve the performance of your jobs, two important things are required : knowing Spark execution model and the shape of your data. And it’s worthy because this can lead to significant improvements.

Thanks to Wassim Almaaoui, Han Ju, Amine Er-Raqabi, Benjamin Davy, Gaël Rico and Robert Dupuy for their help in writing this article.

--

--