Spark UDAF could be an option!

Calculate average on sparse arrays

Wassim Almaaoui
Teads Engineering
9 min readJan 29, 2020

--

In this fourth article of our Apache Spark series (see Part I, Part II and Part III), we present another real-life use case that we faced at Teads and cover methods to consider when optimizing a Spark job. For those who are not familiar with Spark User Defined Aggregation Functions (UDAF), we take a relatively simple but useful example when dealing with sparse arrays (an array of data in which many elements have a value of zero).

Business Context

At Teads, we deliver ads to 1.5 billion unique users (called viewerId or vid) every month. We use an unsupervised machine learning algorithm to create clusters of those users based on a set of features. This clustering is soft, meaning that for each user we compute the probability to belong to each of our clusters (we have 100 clusters in this example):

+-----+---------+------------------------------+
| vid | country | probas // has as length 100 |
+-----+---------+------------------------------+
| 1 | US | [0.1, 0, 0.02, 0.6,...,0] |
| 2 | FR | [0.5, 0, 0.15, 0.07,...,0.3] |
| 3 | FR | [0.3, 0.04, 0.35, 0,...,0] |
+-----+---------+------------------------------+

This clustering has various applications, ranging from improving our prediction models to creating custom user segments (predefined lists of people sharing some known properties). For some of them, we need to normalize the resulting probabilities by their average at different levels. In our example, we calculate the average probabilities by country.

Input data

The input is the dataset described above. But we store sparse and short-encoded arrays to S3 to reduce storage costs.

Sparse arrays

Since our arrays are very sparse, we only want to store significant values (above some thresholds) into two arrays:

  1. Array 1: indices of non-negligible values (values above a certain threshold)
  2. Array 2: values corresponding to those indices

For example, considering a threshold of 0.1 and supposing that the missing values in the above table are zeros, the sparse version would be:

+-----+---------+-------------+------------------+
| vid | country | index | probas |
+-----+---------+-------------+------------------+
| 1 | US | [0, 3] | [0.1, 0.6] |
| 2 | FR | [0, 2, 99] | [0.5, 0.15, 0.3] |
| 3 | FR | [0, 2] | [0.3, 0.35] |
+-----+---------+-------------+------------------+

Short encoding:

We store this data in snappy Parquet files and encode probabilities as Short to further reduce the size of the dataset.

The encode to short function

After this processing, our input dataset looks like this:

+-----+---------+-------------+--------------------+
| vid | country | index | probas |
+-----+---------+-------------+--------------------+
| 1 | US | [0, 3] | [-26214, 6553] |
| 2 | FR | [0, 2, 99] | [0, -22937, -13107]|
| 3 | FR | [0, 2] | [-13107, -9830] |
+-----+---------+-------------+--------------------+

Excepted output

We would like to decode these Short values to probabilities, have the sum of all probabilities per array be equal to 1, and then compute an average by array index (aka cluster). Here is the step by step illustration of what we want to achieve:

Step 1 — Probabilities are decoded back to Double.

+-----+---------+-------------+--------------------+
| vid | country | index | probas |
+-----+---------+-------------+--------------------+
| 1 | US | [0, 3] | [0.1, 0.6] |
| 2 | FR | [0, 2, 99] | [0.5, 0.15, 0.3] |
| 3 | FR | [0, 2] | [0.3, 0.35] |
+-----+---------+-------------+--------------------+

Step 2 — Probabilities are normalized, the sum of probas should equal 1 for each user.

+-----+---------+-------------+--------------------+
| vid | country | index | probas |
+-----+---------+-------------+--------------------+
| 1 | US | [0, 3] | [0.14, 0.86] |
| 2 | FR | [0, 2, 99] | [0.52, 0.16, 0.32] |
| 3 | FR | [0, 2] | [0.46, 0.54] |
+-----+---------+-------------+--------------------+

Step 3 — We calculate, by country, the average membership of people to each cluster.

+---------+-------------------------------+
| country | probas // length = 100 |
+---------+-------------------------------+
| US | [0.14, 0, 0, 0.86, 0, …, 0] |
| FR | [0.49, 0, 0.35, 0, …, 0.16] |
+---------+-------------------------------+

Implementation 1: use Spark native `average` function on dense arrays

The first intuition was to create a UDF that decode and normalize data and a second one to densify the arrays. We could have a single one to do the 3 operations. Having elementary UDFs is a best practice, check the “Avoid UDFs or UDAFs that perform more than one thing” paragraph in Part I for more details.

However, grouping some functions in the same UDF sometimes helps to avoid intermediate data structures and conversion operations between Spark’s internal data representation and the JVM. There is a trade-off to make between performance and code complexity.

UDFs

Then we use Spark’s avg aggregation function to calculate the average by country for each cluster.

The job using the UDAF

Execution time:

28 min on an EMR cluster of 50 r3.xlarge nodes (AWS instances).

Dataset size: 700GB in compressed parquet/snappy (a lot more when uncompressed, at least by a factor of 10) and 1.5 billion rows.

DAG and Execution Plan:

Let’s have a look at it, step by step:

WholeStageCodegen: Since version 2.0.0, Spark can optimize some physical operations by collapsing them into one java function that it generates on the fly. In our case, this includes the parquet file read operation and the execution of our two UDFs. This is explicit in the execution plan in the SQL tab of the Spark UI:

Exchange: this is the shuffle triggered by the aggregations ( avg ).

HashAggregate: we can see a first aggregation operation before the shuffle and another one after it. This is how some aggregations are implemented in Spark. The Data is pre-aggregated within each input partition before shuffling (also called partial aggregation or map side aggregation); it is then further aggregated post shuffle to obtain the global result.

In this first implementation, the densifying operation instantiates 100 length array for each row, this is time and memory consuming. Also, converting back all these arrays from Java objects to an InternalRow data-structure (Spark-SQL optimized representation) at the exit from densifyUdf is also a heavy operation.

Implementation 2: use a custom UDAF on sparse arrays

Following our first results, a way to identify optimization leads could have been to perform a JVM profiling. In our case, we decided to directly try a UDAF to calculate the average on the arrays:

UDAF
  • The UDAF is generic in relation to the nbClusters.
  • The update method is responsible for the map side aggregation
  • The merge method is responsible for further aggregation on the reduce side.
  • Note that we use getSeq[T] when we just read the data and getAs[WrappedArray[T]] when we need to increment/modify the array in-place. Behind, it’s the same implementation (WrappedAray) but we do so to get access to the mutable API of WrappedArray when needed and to make the code a little bit more readable and explicit (to see immediately if we need a mutable structure or not).

Then we use it in our job so we don’t need the densifyUdf anymore:

Execution time:

11 min on an EMR cluster of 50 r3.xlarge nodes. \o/

Dataset size: 700GB in compressed parquet/snappy and 1.5 billion rows.

DAG and Execution Plan:

The UDAF implementation is 2.5X faster than the first implementation.

Can we do better?

Avoid the decodeAndNormalizeUdf

As discussed above, using a UDF besides the UDAF may create unnecessary intermediate data structures and we were not sure if Spark is smart enough to not do a further serialization/deserialization between the UDF and the UDAF. We think it will do because UDFs are black boxes for Catalyst, it won’t even try to optimize them according to The Internals of Spark SQL by Jacek Laskowski.

So we tried to eliminate the decodeAndNormalizeUdf and apply its logic in the UDAF itself to see if it changes anything.

The inputSchema and the update methods of the UDAF become the following (the other methods remain the same):

The part that changed to include the decodeAndNormalize in the UDAF

The execution of the above took 10 min on the same cluster, so we can conclude it’s almost 10% faster.

An important difference that we noticed on the execution plan, is the peak memory by task. It’s more important with the UDF applied separately from the UDAF.

Here, we need to make a choice, keeping the UDF is more readable and granular but a little bit less efficient. We finally chose to put the decode function in the UDAF.

The icing on the cake? Avoid the sort-based aggregation

Even if the UDAF solution is much faster, it uses the sort-based aggregation (cf the execution plan) which is most of the time less performing than the hash-based aggregation since it involves some extra sorting that is useless in our case.

Why Spark uses it? Let’s dig a bit further.

Honestly, this is not well documented and we had (like several other times) to search in Spark’s code to find the answer.

Reading the code we understand that all fields of our bufferSchema should have mutable data-types. But wait! I thought we were actually using mutable types! Here is a reminder of our buffer schema, we are using an array of double and a long:

the bufferSchema

Looking at the code we can see how Spark defines mutable data-types here and here.

It shows that the Hash aggregation was not selected because we are using an ArrayType in the aggregation buffer.

To make sure Spark is able to perform a Hash aggregation we slightly edited the UDAF to use 100 (nbClusters) Double columns instead of an array.

The UDAF using mutable structure

At the end of the class we added some methods to the MutableAggregationBuffer via an implicit conversion to make the code a little bit less verbose and more readable.

Hurray! We are now using a hash-based aggregation again:

Avoiding the Array also avoids allocating a new array and copy all the data to the aggregating buffer on each row (on each update call) triggered by:

buffer(0) = valuesSum // in the UDAF with an array in the buffer

A thread dump of the previous implementation (with array) via the Spark UI showed this:

Execution time:

The execution of this new version only takes 9.2 min.

Yes! We gained another 8%.

Conclusions

  • Always use the Spark UI to check the DAG and the physical execution plan. This is the easiest way to identify the bottlenecks if any, see the amount of data being shuffled, make sure the data is uniformly distributed over the partitions (no skew) and if they are not too big (no spill on disk), to check the algorithms being chosen for aggregation or join (hash-based, sort-based, broadcast…) and many other things that you can learn about your job.
  • UDAFs are sometimes worth being considered, especially when you are creating intermediate structures (arrays, maps …) to be able to use Spark native aggregation functions.
  • When creating a UDAF, try to avoid Spark “non-mutable” data types in the buffer schema (such as String and Arrays…). The full list of mutable data types is documented here.
  • Using elementary UDFs is more granular and more readable. However, we must not forget that a ser/deser from Java objects to Tungsten data types could happen on each one of them. So benchmark and decide according to the performance gain and the code complexity.

Thanks to all those who have helped me working on this use case and writing this post especially Benjamin Davy, Cyrille Dubarry, Yann Moisan, Robert Dupuy, Han Ju, Louis Fruleux and Lucie Bontour. If you like this kind of challenge, diving into the internal working of technologies, have a look at our other articles and our job opportunities.

--

--