Photo by Doran Erickson on Unsplash

GUIDE TO SPARK AGGREGATION

Apache Spark 3.0: Remarkable Improvements in Custom Aggregation

In the context of the recent official announcement on Spark 3.0, Aggregator would now become the default mechanism to perform Custom Aggregation on Datasets as Spark 3.0 addresses the key usability and coexistence concerns in the earlier Aggregator mechanism. Read the story to know the details.

The Startup
Published in
5 min readJun 27, 2020

--

Aggregation operator is heavily used across Spark applications meant for data mining and analytics. Therefore, Spark has provided both, a wide variety of readymade aggregation functions and a framework to built custom aggregation functions. These aggregations functions can be used on Datasets in a variety of ways to derive aggregated results.

With the Custom Aggregation framework, a user can implement a specific aggregation flow for aggregating a set of records. For Custom Aggregation, prior releases of Spark have provided two approaches, first is based on ‘UserDefinedAggregationFunction’ and the second is based on ‘Aggregator’.

The two approaches are explained in detail in a previous story, titled,UDAF and Aggregators: Custom Aggregation Approaches for Datasets in Apache Spark”. Out of the two approaches, ‘UserDefinedAggregationFunction’, also called as UDAF, is introduced first to support custom aggregation on Dataframes, untyped view of Data, in Spark. However, later, with the introduction of Dataset, which supports both typed, and untyped view of Data, the ‘Aggregator’ approach is introduced additionally to support custom aggregation on the typed view of Data.

If one compares both the approaches:

  • The UDAF approach is targeted for an untyped view of data, hence in UDAF, everything including the input, the output, and the intermediate aggregated data (in the aggregation buffer) is essentially a row of columns of a certain datatype, while Aggregator approach is targeted for a typed view of data, hence in Aggregator, everything including the input, the output, and the intermediate aggregated data is essentially an object of a certain type.
  • The UDAF approach is inefficient while updating or retrieving a complex data type, such as Map, in the aggregation buffer. This is due to the considerable cost associated with the conversion of complex data types between user space and catalyst space. Whereas, the Aggregator approach is efficient while updating and retrieving any data type in the aggregation buffer object, since there are no conversions costs as in UDAF.
  • Further, the UDAF approach is not so programmatically elegant while implementing a sophisticated aggregation flow using user-defined data types (UDTs) in aggregation buffer. Whereas the Aggregator approach is programmatically elegant, one can implement sophisticated aggregation flow involving sophisticated data types for the aggregation buffer object.

However, UDAF is aligned to SQL dialect and can coexist with other readymade aggregation functions to perform aggregation on the untyped view of Datasets. This makes it easier for users to use UDAF. In contrast, until Spark 3.0, Aggregator was not aligned to SQL dialect and could not coexists with other readymade aggregation functions to perform aggregation on the untyped view of Datasets. This makes it difficult for users to use Aggregator before Spark 3.0.

But, with Spark 3.0, Aggregator usage is now aligned with the UDAF approach to additionally perform aggregation on the untyped view of Datasets. Also, Aggregators can now be used alongside with readymade aggregation functions (a wide variety of these exists in Spark). These new changes in the Aggregator approach along with the inherent benefits of Aggregators over UDAF have eventually made Aggregator a default choice for custom aggregation in Spark 3.0. The earlier UDAF approach stands deprecated going forward.

In Spark 3.0, Aggregator framework is extended so that an Aggregator can now also be registered as a user-defined function similar to UDAF:

Aggregator<IN, BUF, OUT> agg = // custom Aggregator
Encoder<IN> enc = // input encoder
// register a UDF based on agg and enc (JAVA compatabile API)
spark.udf.register("myCustomAgg", functions.udaf(agg, enc))

Once an Aggregator is defined as a user-defined function, the corresponding registered function can then be used to perform aggregation on the untyped view of Datasets. Also, the Aggregator based user-defined function can be used with readymade aggregation functions and other user-defined functions (registered on other aggregators) to perform multiple aggregations on the untyped view of Datasets.

Below is an example of using Aggregator on the untyped view of Dataset. In this example, an Aggregator MyMedian is defined and then registered as a user-defined aggregation function to perform the median calculation on untyped of a sample Dataset:

import java.io.Serializable;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.Aggregator;
import org.apache.spark.sql.functions;
public static class Median implements Serializable {
private ArrayList<Long> arrLong;
private Long count;
public Median() {}
public Median(ArrayList<Long> arrLong, long count) {
this.arrLong = arrLong;
this.count = count;
}
public ArrayList<Long> getArrLong() {
return arrLong;
}
public void setArrLong(ArrayList<Long> arrLong) {
this.arrLong= arrLong;
}
public Long getCount() {
return count;
}
public void setCount(Long count) {
this.count = count;
}
}
public static class MyMedian extends Aggregator<Long, Median, Double>
{
public Median zero() {
return new Median(new ArrayList<Long>(), 0L);
}
public Median reduce(Median buffer, Long data) {
buffer.setArrLong(buffer.getArrLong().add(data));
buffer.setCount(buffer.getCount() + 1);
return buffer;
}
public Median merge(Median b1, Median b2) {
b1.setArrLong(b1.getArrLong().addAll(b2.getArrLong()));
b1.setCount(b1.getCount() + b2.getCount());
return b1;
}
public Double finish(Median buffer) {
double median;

Collections.sort(buffer.getArrLong());
ArrayList<Long> arrLong = buffer.getArrLong();

if (buffer.getCount() % 2 == 0) {
median = ((double)arrLong.get(buffer.getCount()/2] +
(double)arrLong.get[buffer.getCount()/2 - 1])/2;
} else {
median = (double)arrLong.get[buffer.getCount()/2]
}
return median;
}
// Specifies the Encoder for the intermediate value type
public Encoder<Median> bufferEncoder() {
return Encoders.bean(Median.class);
}
// Specifies the Encoder for the final output value type
public Encoder<Double> outputEncoder() {
return Encoders.DOUBLE();
}
}
// Register the function to access it
spark.udf().register("myMedian", functions.udaf(new MyMedian(), Encoders.LONG()));
Dataset<Row> df = spark.read().json("data/src/samples.json");
df.createOrReplaceTempView("samples");
df.show();
// +-------+------+
// | Id |Value|
// +-------+------+
// | 1001| 100|
// | 1002| 200|
// | 1003| 300|
// | 1004| 400|
// +-------+------+
/* Example showing use of only Aggregator based user defined function */
Dataset<Row> output = spark.sql("SELECT myMedian(Value) as median_value FROM samples");
output.show();
// +--------------+
// | median_value |
// +--------------+
// | 250.0|
// +--------------+
/* Example showing use of Aggregator based user defined function along with readymade function avg from Spark */Dataset<Row> output = spark.sql("SELECT myMedian(Value) as median_value, avg(Value) as average_value FROM samples");
output.show();
// +--------------+---------------+
// | median_value | average_value |
// +--------------+----------------
// | 250.0| 250.0|
// +--------------+---------------

Summary: Considering the importance of custom aggregation requirements in complex data analytics and mining jobs, Spark 3.0 has addressed the long awaiting demand of providing an efficient, comprehensive and easy to use framework for provisioning custom aggregation in Spark applications. Therefore, the Aggregator is the new mantra for custom aggregation while UDAF becomes history with Spark 3.0.

In case of feedback or queries on this story, do write in the comments section. I hope, you would find it useful. Here is the link to other comprehensive stories on Apache Spark.

--

--

The Startup

Leading Data Engineering Initiatives @ Jio, Apache Spark Specialist, Author, LinkedIn: https://www.linkedin.com/in/ajaywlan/