Spark UDAF could be an option!

Calculate average on sparse arrays

Wassim Almaaoui
Jan 29 · 9 min read
Image for post
Image for post

In this fourth article of our Apache Spark series (see Part I, Part II and Part III), we present another real-life use case that we faced at Teads and cover methods to consider when optimizing a Spark job. For those who are not familiar with Spark User Defined Aggregation Functions (UDAF), we take a relatively simple but useful example when dealing with sparse arrays (an array of data in which many elements have a value of zero).

Business Context

+-----+---------+------------------------------+
| vid | country | probas // has as length 100 |
+-----+---------+------------------------------+
| 1 | US | [0.1, 0, 0.02, 0.6,...,0] |
| 2 | FR | [0.5, 0, 0.15, 0.07,...,0.3] |
| 3 | FR | [0.3, 0.04, 0.35, 0,...,0] |
+-----+---------+------------------------------+

This clustering has various applications, ranging from improving our prediction models to creating custom user segments (predefined lists of people sharing some known properties). For some of them, we need to normalize the resulting probabilities by their average at different levels. In our example, we calculate the average probabilities by country.

Input data

Sparse arrays

Since our arrays are very sparse, we only want to store significant values (above some thresholds) into two arrays:

  1. Array 1: indices of non-negligible values (values above a certain threshold)
  2. Array 2: values corresponding to those indices

For example, considering a threshold of 0.1 and supposing that the missing values in the above table are zeros, the sparse version would be:

+-----+---------+-------------+------------------+
| vid | country | index | probas |
+-----+---------+-------------+------------------+
| 1 | US | [0, 3] | [0.1, 0.6] |
| 2 | FR | [0, 2, 99] | [0.5, 0.15, 0.3] |
| 3 | FR | [0, 2] | [0.3, 0.35] |
+-----+---------+-------------+------------------+

Short encoding:

We store this data in snappy Parquet files and encode probabilities as Short to further reduce the size of the dataset.

The encode to short function

After this processing, our input dataset looks like this:

+-----+---------+-------------+--------------------+
| vid | country | index | probas |
+-----+---------+-------------+--------------------+
| 1 | US | [0, 3] | [-26214, 6553] |
| 2 | FR | [0, 2, 99] | [0, -22937, -13107]|
| 3 | FR | [0, 2] | [-13107, -9830] |
+-----+---------+-------------+--------------------+

Excepted output

Step 1 — Probabilities are decoded back to Double.

+-----+---------+-------------+--------------------+
| vid | country | index | probas |
+-----+---------+-------------+--------------------+
| 1 | US | [0, 3] | [0.1, 0.6] |
| 2 | FR | [0, 2, 99] | [0.5, 0.15, 0.3] |
| 3 | FR | [0, 2] | [0.3, 0.35] |
+-----+---------+-------------+--------------------+

Step 2 — Probabilities are normalized, the sum of probas should equal 1 for each user.

+-----+---------+-------------+--------------------+
| vid | country | index | probas |
+-----+---------+-------------+--------------------+
| 1 | US | [0, 3] | [0.14, 0.86] |
| 2 | FR | [0, 2, 99] | [0.52, 0.16, 0.32] |
| 3 | FR | [0, 2] | [0.46, 0.54] |
+-----+---------+-------------+--------------------+

Step 3 — We calculate, by country, the average membership of people to each cluster.

+---------+-------------------------------+
| country | probas // length = 100 |
+---------+-------------------------------+
| US | [0.14, 0, 0, 0.86, 0, …, 0] |
| FR | [0.49, 0, 0.35, 0, …, 0.16] |
+---------+-------------------------------+

Implementation 1: use Spark native `average` function on dense arrays

However, grouping some functions in the same UDF sometimes helps to avoid intermediate data structures and conversion operations between Spark’s internal data representation and the JVM. There is a trade-off to make between performance and code complexity.

UDFs

Then we use Spark’s avg aggregation function to calculate the average by country for each cluster.

The job using the UDAF

Execution time:

Dataset size: 700GB in compressed parquet/snappy (a lot more when uncompressed, at least by a factor of 10) and 1.5 billion rows.

DAG and Execution Plan:

Image for post
Image for post

Let’s have a look at it, step by step:

WholeStageCodegen: Since version 2.0.0, Spark can optimize some physical operations by collapsing them into one java function that it generates on the fly. In our case, this includes the parquet file read operation and the execution of our two UDFs. This is explicit in the execution plan in the SQL tab of the Spark UI:

Image for post
Image for post

Exchange: this is the shuffle triggered by the aggregations ( avg ).

HashAggregate: we can see a first aggregation operation before the shuffle and another one after it. This is how some aggregations are implemented in Spark. The Data is pre-aggregated within each input partition before shuffling (also called partial aggregation or map side aggregation); it is then further aggregated post shuffle to obtain the global result.

In this first implementation, the densifying operation instantiates 100 length array for each row, this is time and memory consuming. Also, converting back all these arrays from Java objects to an InternalRow data-structure (Spark-SQL optimized representation) at the exit from densifyUdf is also a heavy operation.

Implementation 2: use a custom UDAF on sparse arrays

UDAF
  • The UDAF is generic in relation to the nbClusters.
  • The update method is responsible for the map side aggregation
  • The merge method is responsible for further aggregation on the reduce side.
  • Note that we use getSeq[T] when we just read the data and getAs[WrappedArray[T]] when we need to increment/modify the array in-place. Behind, it’s the same implementation (WrappedAray) but we do so to get access to the mutable API of WrappedArray when needed and to make the code a little bit more readable and explicit (to see immediately if we need a mutable structure or not).

Then we use it in our job so we don’t need the densifyUdf anymore:

Execution time:

Dataset size: 700GB in compressed parquet/snappy and 1.5 billion rows.

DAG and Execution Plan:

Image for post
Image for post
Image for post
Image for post

The UDAF implementation is 2.5X faster than the first implementation.

Can we do better?

Avoid the decodeAndNormalizeUdf

So we tried to eliminate the decodeAndNormalizeUdf and apply its logic in the UDAF itself to see if it changes anything.

The inputSchema and the update methods of the UDAF become the following (the other methods remain the same):

The part that changed to include the decodeAndNormalize in the UDAF

The execution of the above took 10 min on the same cluster, so we can conclude it’s almost 10% faster.

An important difference that we noticed on the execution plan, is the peak memory by task. It’s more important with the UDF applied separately from the UDAF.

Image for post
Image for post

Here, we need to make a choice, keeping the UDF is more readable and granular but a little bit less efficient. We finally chose to put the decode function in the UDAF.

The icing on the cake? Avoid the sort-based aggregation

Why Spark uses it? Let’s dig a bit further.

Honestly, this is not well documented and we had (like several other times) to search in Spark’s code to find the answer.

Reading the code we understand that all fields of our bufferSchema should have mutable data-types. But wait! I thought we were actually using mutable types! Here is a reminder of our buffer schema, we are using an array of double and a long:

the bufferSchema

Looking at the code we can see how Spark defines mutable data-types here and here.

It shows that the Hash aggregation was not selected because we are using an ArrayType in the aggregation buffer.

To make sure Spark is able to perform a Hash aggregation we slightly edited the UDAF to use 100 (nbClusters) Double columns instead of an array.

The UDAF using mutable structure

At the end of the class we added some methods to the MutableAggregationBuffer via an implicit conversion to make the code a little bit less verbose and more readable.

Hurray! We are now using a hash-based aggregation again:

Image for post
Image for post

Avoiding the Array also avoids allocating a new array and copy all the data to the aggregating buffer on each row (on each update call) triggered by:

buffer(0) = valuesSum // in the UDAF with an array in the buffer

A thread dump of the previous implementation (with array) via the Spark UI showed this:

Image for post
Image for post

Execution time:

Yes! We gained another 8%.

Conclusions

  • UDAFs are sometimes worth being considered, especially when you are creating intermediate structures (arrays, maps …) to be able to use Spark native aggregation functions.
  • When creating a UDAF, try to avoid Spark “non-mutable” data types in the buffer schema (such as String and Arrays…). The full list of mutable data types is documented here.
  • Using elementary UDFs is more granular and more readable. However, we must not forget that a ser/deser from Java objects to Tungsten data types could happen on each one of them. So benchmark and decide according to the performance gain and the code complexity.

Thanks to all those who have helped me working on this use case and writing this post especially Benjamin Davy, Cyrille Dubarry, Yann Moisan, Robert Dupuy, Han Ju, Louis Fruleux and Lucie Bontour. If you like this kind of challenge, diving into the internal working of technologies, have a look at our other articles and our job opportunities.

Teads Engineering

150+ innovators building the future of digital advertising

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store