Five Ways to Perform Aggregation in Apache Spark

Aggregation being the widely used operator among data analytics assignments, Spark provides a solid framework for the same. Here are the five different ways in which one can target aggregation on big data using Spark.

Ajay Gupta
The Startup
Published in
7 min readMay 12, 2020

--

GroupByKey or ReduceByKey Transformation on RDDs: RDDs are the earliest representation of distributed data collection in Spark where data is represented via arbitrary java objects of type ‘T’. Aggregation on RDDs is similar to reduce concept in map-reduce framework where a reducer function (acting on two input records to produce a aggregated record) is key to aggregation. With RDDs, aggregation is performed either by GroupByKey or ReduceByKey transformations, however, these transformations are limited to only Pair RDDs (collection of Tuple Objects, each Tuple consisting of Key Object of type ‘K’ and a Value Object of type ‘V’).

In case of aggregation via GroupByKey, the transformation results in tuple objects having a key object and a collection of all value objects against the key object. Therefore a mapper (map transformation via map, maptoPair or mapPartitions) needs to be applied afterwards so as to reduce the collection of value objects into an aggregated value object for each of the Tuple object.

Aggregation on a Pair RDD (with 2 partitions) via GroupByKey followed via either of map, maptopair or mappartitions

Mappers such as map, maptoPair and mappartitions transformations contain aggregation functions to reduce the collection of value object of type ‘V’ into an aggregated object of type ‘U’. The aggregation function can be a arbitrary one and need not to obey associative or commutative traits. GroupByKey transformation has three flavors which differs in the partition specification of the RDD resulting from applying the GroupByKey transformation. GroupByKey can be summarized as:

GroupByKey (PairRDD<K,V>) => PairRDD<K,Iterator<V>> 
Map (PairRDD<K,Iterator<V>>) => PairRDD<K,U>

In case of aggregation via ReduceByKey, the transformation directly results in tuple objects having a key object and a aggregate object against the key object. No mapper is required after ReduceByKey as in case of GroupByKey. ReduceByKey transformation takes a associative and commutative aggregation function so that the records (residing in the same partition) can be aggregated locally before aggregating the records across partitions. Also, the aggregation function takes two value objects of say type ‘V’ and return a object of type ‘V’. Similar to GroupByKey, ReduceByKey transformation also has three flavors which differs in the partition specification for the RDD resulting from applying the ReduceByKey transformation. ReduceByKey can be summarized as:

ReduceByKey(PairRDD<K,V>, Function<V,V,V>) => PairRDD<K,V>

Among GroupByKey and ReduceByKey, the former is more generic and can work with any aggregation function whereas the latter is more efficient but suitable to only a class of aggregation functions as described earlier.

Mappartitions on RDD or Dataset: Mappartitions , as described in one of the previous blog, is one of the powerful narrow transformation which is available on both RDD and the Dataset (data representations in Spark) to perform variety of operations partition wise. One of such operation includes aggregation too. However, the only condition that needs to be satisfied is that records belonging to same grouping key(s) should reside in single partition. This condition could be implicitly satisfied in a RDD or Dataset (to be aggregated) materialized from a shuffling operation involving grouping key(s). Also, the condition can be achieved explicitly by first repartitioning the RDD or Dataset on the basis of grouping key(s).

Inside mappartitions for a typical aggregation flow,one has to first instantiate a Hashmap storing the aggregated Value Objects against the corresponding grouping key(s). This Hashmap is then repeatedly updated while iterating the data collection of the underlying partition. Finally, an iterator on aggregated Value/Objects (optionally along with associated grouping key(s)) contained in the map is returned back.

Since Mappartitions based aggregation involves a Hashmap to be maintained in the memory to hold key and aggregated Value objects, considerable heap memory would be required for the Hashmap in case large number of unique grouping keys are residing in the underlying partition, and therefore could lead to risk of out of memory termination of the corresponding executor. Henceforth, grouping key distribution across partitions should not be skewed, otherwise it would lead to wastage of executor memory due to over provisioning of executor memory to handle the skewness. Also, since a heap memory based aggregation hashmap is required, relative memory allocation to the memory is more as compared to dedicated aggregation operators in Spark, but if memory is not the constraint, Mappartitions based aggregation could provide good performance gains.

UDAF for Dataframe or Dataset: Unlike methods presented above, UDAF achieves aggregation based on the notion of aggregation buffer and a set of methods operating on this buffer.

Aggregation buffer based aggregation flow in Spark (for Datasets and Dataframe)

UDAF is most common way till now to write aggregation logic for Dataframe or Dataset representations of distributed data collection in Spark. UDAF works on the untyped view of data collection where a data record is considered as a row (of a Table) with a schema defining the type and nullability for each of the column in the row. One can create a UDAF in Spark by extending the ‘UserDefinedAggregationFunction’ class present in the package ‘org.apache.spark.sql.expressions’ and overriding the implementation of following methods in the base class:

/*Return schema for input column(s) to the UDAF, schema being built using StructType*/
=> public StructType inputSchema()
/*Return schema of aggregation buffer, schema being built using StructType */
=> public StructType bufferSchema()
/*DataType of final aggregation result*/
=> public DataType dataType()
/*Initialize aggregation buffer*/
=> public void initialize(MutableAggregationBuffer buffer)
/*Update aggregation buffer for each of the untyped view (Row) of an input object*/
=> public void update(MutableAggregationBuffer buffer, Row row)
/*Update current aggregation buffer with a partially aggregated buffer*/
=> public void merge(MutableAggregationBuffer buffer, Row buffer)
/*Evaluate final aggregation buffer and return the evaluated value of DataType declared earlier */
=> public Object evaluate(Row buffer)

In addition to overriding above methods, One can always declare extra fields (with optional initialization of these in the UDAF constructor) and additional methods in the customized UDAF class in order to use them inside overriding methods to achieve aggregation goal.

Before using a UDAF, one has to register the instance of the same with Spark framework:

spark.udf.register('sampleUDAF, new SampleUDAF()); 

After registering, UDAF can be used inside Spark SQL query to aggregate either the whole of Dataset/Dataframe, or groups of records in Dataset/Dataframe (grouped via one or more columns). In addition to directly using inside the Spark SQL query, one can also use UDAF via Dataframe/Dataset aggregation APIs, such as ‘agg’.

UDAF, although a popular way to define custom aggregation, suffers with performance issues when complex data types (Array or Map) are used in the aggregation buffer. This is due to the fact that conversion of scala datatypes (user specific) to corresponding catalyst datatypes (catalyst internal datatypes) (and vice-versa) becomes very costly for complex data types during every update operation in UDAF. This cost is more from both, memory and the compute perspective.

Aggregator for Dataset: Aggregator is the most recent way of performing aggregation on Dataset, and similar to UDAF, it is also based on the notion of aggregation buffer and a set of methods operating on this buffer. However, Aggregator way of doing aggregation is called as typed aggregation as it involves operations on/with objects of various types. Input to the aggregator, aggregation buffer and the final aggregated output (derived from the buffer) are all objects of certain types with corresponding Spark Encoders. Users can define their own custom Aggregator by extending abstract generic ‘Aggregator<IN,BUF,OUT>’ class (present in the package ‘org.apache.spark.sql.expressions’) with a type defined for IN (input record type), a type defined for BUF (aggregation buffer) and a type defined for OUT (output record type) along with overriding the implementation of following methods in the base class:

/* return Encoder for aggregation buffer of type BUF. This is required for buffer ser/deser during shuffling or disk spilling */
=> public Encoder<BUF> bufferEncoder()
/* return Encoder for output object of type OUT after aggregation is performed */
=> public Encoder<OUT> outputEncoder()
/* return updated aggregation buffer object of type BUF after aggregating the existing buffer object of type BUF with the input object of type IN*/
=> public BUF reduce(BUF buffer, IN input) ()
/* return updated aggregation buffer of type BUF after merging two partially aggregated buffer objects of type BUF */
=> public BUF merge(BUF buffer1, BUF buffer2)
/* return output object of type OUT from evaluation of aggregation buffer of type BUF */
=> public OUT finish(BUF arg0)
/* return buffer object of type BUF after initializing the same */
=> public BUF zero()

Since Aggregator natively supports aggregation buffer to be an object, it is efficient and does not required unnecessary overhead associated with conversion from scala types to catalyst types and vice-versa (as with UDAF). Also, Aggregator way of aggregation provides much more flexibility and programming elegance in writing the aggregation logic. Aggregators are also being integrated into untyped aggregation flow to support SQL like querying in upcoming releases.

Predefined Aggregation Functions: Spark provides a variety of pre-built aggregation functions which could be used in context of Dataframe or Dataset representations of distributed data collection. These pre-built functions can be used inside the SPARK SQL query expression, or can be used with the aggregation APIs defined for Dataframe or Dataset. All pre-built aggregation functions are defined as static methods of ‘functions’ class in ‘org.apache.spark.sql’ package. A list of all such functions can be referred in the underlined link.

Predefined Aggregation Functions are highly optimized and in most cases work directly with Spark tungusten format. Therefore, a Spark programmer should always prefer the usage of pre-built aggregation function if the same exists in the ‘functions’ class. In case, the desired aggregation functions does not exists there, then only one should resort to writing a custom aggregation function.

In case you have more queries on Spark Aggregation framework, please feel free to ask in comments section.

--

--

Ajay Gupta
The Startup

Leading Data Engineering Initiatives @ Jio, Apache Spark Specialist, Author, LinkedIn: https://www.linkedin.com/in/ajaywlan/