Image for post
Image for post
Photo by Doran Erickson on Unsplash

GUIDE TO SPARK AGGREGATION

Apache Spark 3.0: Remarkable Improvements in Custom Aggregation

In the context of the recent official announcement on Spark 3.0, Aggregator would now become the default mechanism to perform Custom Aggregation on Datasets as Spark 3.0 addresses the key usability and coexistence concerns in the earlier Aggregator mechanism. Read the story to know the details.

Ajay Gupta
Jun 27 · 5 min read
  • The UDAF approach is inefficient while updating or retrieving a complex data type, such as Map, in the aggregation buffer. This is due to the considerable cost associated with the conversion of complex data types between user space and catalyst space. Whereas, the Aggregator approach is efficient while updating and retrieving any data type in the aggregation buffer object, since there are no conversions costs as in UDAF.
  • Further, the UDAF approach is not so programmatically elegant while implementing a sophisticated aggregation flow using user-defined data types (UDTs) in aggregation buffer. Whereas the Aggregator approach is programmatically elegant, one can implement sophisticated aggregation flow involving sophisticated data types for the aggregation buffer object.
Aggregator<IN, BUF, OUT> agg = // custom Aggregator
Encoder<IN> enc = // input encoder
// register a UDF based on agg and enc (JAVA compatabile API)
spark.udf.register("myCustomAgg", functions.udaf(agg, enc))
import java.io.Serializable;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.expressions.Aggregator;
import org.apache.spark.sql.functions;
public static class Median implements Serializable {
private ArrayList<Long> arrLong;
private Long count;
public Median() {}
public Median(ArrayList<Long> arrLong, long count) {
this.arrLong = arrLong;
this.count = count;
}
public ArrayList<Long> getArrLong() {
return arrLong;
}
public void setArrLong(ArrayList<Long> arrLong) {
this.arrLong= arrLong;
}
public Long getCount() {
return count;
}
public void setCount(Long count) {
this.count = count;
}
}
public static class MyMedian extends Aggregator<Long, Median, Double>
{
public Median zero() {
return new Median(new ArrayList<Long>(), 0L);
}
public Median reduce(Median buffer, Long data) {
buffer.setArrLong(buffer.getArrLong().add(data));
buffer.setCount(buffer.getCount() + 1);
return buffer;
}
public Median merge(Median b1, Median b2) {
b1.setArrLong(b1.getArrLong().addAll(b2.getArrLong()));
b1.setCount(b1.getCount() + b2.getCount());
return b1;
}
public Double finish(Median buffer) {
double median;

Collections.sort(buffer.getArrLong());
ArrayList<Long> arrLong = buffer.getArrLong();

if (buffer.getCount() % 2 == 0) {
median = ((double)arrLong.get(buffer.getCount()/2] +
(double)arrLong.get[buffer.getCount()/2 - 1])/2;
} else {
median = (double)arrLong.get[buffer.getCount()/2]
}
return median;
}
// Specifies the Encoder for the intermediate value type
public Encoder<Median> bufferEncoder() {
return Encoders.bean(Median.class);
}
// Specifies the Encoder for the final output value type
public Encoder<Double> outputEncoder() {
return Encoders.DOUBLE();
}
}
// Register the function to access it
spark.udf().register("myMedian", functions.udaf(new MyMedian(), Encoders.LONG()));
Dataset<Row> df = spark.read().json("data/src/samples.json");
df.createOrReplaceTempView("samples");
df.show();
// +-------+------+
// | Id |Value|
// +-------+------+
// | 1001| 100|
// | 1002| 200|
// | 1003| 300|
// | 1004| 400|
// +-------+------+
/* Example showing use of only Aggregator based user defined function */
Dataset<Row> output = spark.sql("SELECT myMedian(Value) as median_value FROM samples");
output.show();
// +--------------+
// | median_value |
// +--------------+
// | 250.0|
// +--------------+
/* Example showing use of Aggregator based user defined function along with readymade function avg from Spark */Dataset<Row> output = spark.sql("SELECT myMedian(Value) as median_value, avg(Value) as average_value FROM samples");
output.show();
// +--------------+---------------+
// | median_value | average_value |
// +--------------+----------------
// | 250.0| 250.0|
// +--------------+---------------

The Startup

Medium's largest active publication, followed by +730K people. Follow to join our community.

Ajay Gupta

Written by

Big Data Architect, Apache Spark Specialist, https://www.linkedin.com/in/ajaywlan/

The Startup

Medium's largest active publication, followed by +730K people. Follow to join our community.

Ajay Gupta

Written by

Big Data Architect, Apache Spark Specialist, https://www.linkedin.com/in/ajaywlan/

The Startup

Medium's largest active publication, followed by +730K people. Follow to join our community.

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store