Apache Spark : Secondary Sorting in Spark in Java

Chandra Prakash
Analytics Vidhya
Published in
5 min readFeb 18, 2021

--

We all might have seen secondary sorting in Mapreduce/Hadoop and its key implementation. There are enough information and blogs available on this. Let’s learn how secondary sorting can be achieved using Spark in Java.

Before we go ahead, you can refer the dataset from here — https://www.kaggle.com/bhoomika216/airline-dataset-for-analysis?select=FINALdata1.csv

As per the definition, secondary sorting problem relates to sorting values associated with a key in the reduce phase. Sometimes, it is called value-to-key conversion. The secondary sorting technique will enable us to sort the values (in ascending or descending order) passed to each reducer. In simple terms, you can relate it to ORDER BY in sql. Also, It relies on using a Composite Key which will contain all the values we want to use for sorting. Now, using this dataset which you might have downloaded from kaggle, we will try to achieve secondary sorting. Let’s head into that now, The data we are using is the airline on-time performance data and also there are several data points available, but we’ll focus on which airlines have the most late arrivals and at which airport these late arrivals occur. From this statement, we can determine our sort order: AirlineId (or UNIQUE_CARRIER), AirportId (or DEST_AIRPORT_ID) and delay time(or ARR_DELAY). I mean to say — ORDER BY (UNIQUE_CARRIER) ASC, (DEST_AIRPORT_ID) ASC, ARR_DELAY DESC. Before we jump into code, we need to understand the approaches first, there are at least two possible approaches for sorting the reducer values. These solutions may be applied to both the MapReduce/Hadoop and Spark frameworks.

1st Approach —

Performing Secondary Sorting in Memory or In Reducer Sort

In this approach, we collect the grouped data at each reducer’s node/executor, it will get buffered and sorted in memory (using Java’s Collections.sort()). There is a scalability issue with this approach because of the in memory sort. As soon as the grouped data will exceed the reducer’s node/executor’s memory then an Out of Memory exception will be thrown and hence job will fail. It will work with low size dataset.

2nd Approach —

Performing Secondary Sorting Using Spark Framework or In shuffle sort

In this approach, we will first understand — “Composite Key”. A composite key is formed when a value from the payload is promoted and is appended to the key or natural key. Let’s discuss more on this —

Input Format

The input data is in CSV format and have below columns, those in bold are our values on which secondary sorting will be applied :

DAY_OF_MONTH, DAY_OF_WEEK FL_DATE, UNIQUE_CARRIER, CARRIER ORIGIN_AIRPORT_ID, ORIGIN_CITY_MARKET_ID, ORIGIN_STATE_ABR, DEST_AIRPORT_ID, DEST_CITY_MARKET_ID, DEST_STATE_ABR, CRS_DEP_TIME, DEP_TIME, DEP_DELAY_NEW, TAXI_OUT, WHEELS_OFF, WHEELS_ON, TAXI_IN, CRS_ARR_TIME, ARR_TIME, ARR_DELAY, CANCELLED, CANCELLATION_CODE DIVERTED

Secondary sorting will be applied in similar to — ORDER BY (UNIQUE_CARRIER) ASC, (DEST_AIRPORT_ID) ASC, ARR_DELAY DESC;

Composite Key will be like

UNIQUE_CARRIER,DEST_AIRPORT_ID,ARR_DELAY

The ‘natural’ key here is the UNIQUE_CARRIER and DEST_AIRPORT_ID and ARR_DELAY are the values which we’ll include our natural key. Defining the composite key (by adding the DEST_AIRPORT_ID and ARR_DELAY to the natural key i,e UNIQUE_CARRIER) enables us to sort the reducer values using the Spark framework, i,e the system shuffle will first sort by the first part of the key and then by the second. We will see how we can achieve this in Spark, for now, just remember that we have something called — “repartitionAndSortWithinPartitions()” since Spark 1.2. Let’s look at code now to understand what exactly has been done.

package com.spark.rdd.tutorial;

import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;

public class SecondarySorting {
public static void main(String args[]) {
SparkConf conf = new SparkConf().setMaster("local[*]").setAppName("app");
JavaSparkContext jsc = new JavaSparkContext(conf);

JavaRDD<String> javaRDD = jsc.textFile("D:\\data\\airlineDataSmall.csv");

JavaRDD<String[]> map = javaRDD.map(new Function<String, String[]>() {
@Override
public String[] call(String s) throws Exception {
String[] split = s.split(",");
return split;
}
});
JavaPairRDD<CustomKey, List<String>> pairRDD = map.mapToPair(new PairFunction<String[], CustomKey, List<String>>() {
@Override
public Tuple2<CustomKey, List<String>> call(String[] s) throws Exception {
CustomKey key = new CustomKey();
key.setUniqueCarrier(s[3]);
key.setDestAirportId(s[8]);
key.setArrivalDelay(Integer.valueOf(s[20]));

List<String> l = new ArrayList<>();
l.add(s[2]);
l.add(s[5]);
l.add(s[6]);

return new Tuple2<>(key, l);
}
});

JavaPairRDD<CustomKey, List<String>> customKeyListJavaPairRDD = pairRDD.repartitionAndSortWithinPartitions(new CustomPartitioner(), new CustomComparator());

List<Tuple2<CustomKey, List<String>>> collect = customKeyListJavaPairRDD.collect();
for (Tuple2<CustomKey, List<String>> l : collect) {
System.out.println(l._1().getUniqueCarrier() + "::" + l._1().getDestAirportId() + "::" + l._1().getArrivalDelay());
}
}

public static class CustomPartitioner extends Partitioner {
@Override
public int numPartitions() {
return 1;
}

@Override
public int getPartition(Object key) {
if (key instanceof CustomKey) {
return((CustomKey) key).getUniqueCarrier().hashCode()%numPartitions();
}
return -1;
}
}

public static class CustomComparator implements Comparator<CustomKey>, Serializable
{
@Override
public int compare(CustomKey o1, CustomKey o2)
{
int value1 = o1.getUniqueCarrier().compareTo(o2.getUniqueCarrier());
if (value1 != 0)
{
return value1;
}
else
{
int value2 = o1.getDestAirportId().compareTo(o2.getDestAirportId());
if (value2 != 0)
{
return value2;
}
else
{
return o2.getArrivalDelay().compareTo(o1.getArrivalDelay());
}
}
}
}
}

CustomKey class -

package com.spark.rdd.tutorial;

import java.io.Serializable;

public class CustomKey implements Serializable
{
private String uniqueCarrier;
private String destAirportId;
private Integer arrivalDelay;

public String getUniqueCarrier() {
return uniqueCarrier;
}

public void setUniqueCarrier(String uniqueCarrier) {
this.uniqueCarrier = uniqueCarrier;
}

public String getDestAirportId() {
return destAirportId;
}

public void setDestAirportId(String destAirportId) {
this.destAirportId = destAirportId;
}

public Integer getArrivalDelay() {
return arrivalDelay;
}

public void setArrivalDelay(Integer arrivalDelay) {
this.arrivalDelay = arrivalDelay;
}
}

Creating the Key-Value Pairs

The data is in CSV format and will be converted into key-value format. The important part of secondary sorting is which value(s) to include in the key to enable the additional ordering. The ‘natural’ key is the UNIQUE_CARRIER and DEST_AIRPORT_ID and ARR_DELAY are the values we’ll include in the key. Same has been done in code using — javaRDD.map() and map.mapToPair(), please refer above code for these.

Partitioning and Sorting Code

Now we need to partition and sort our data. There are two points we need to consider.

  1. We need to group the data by UNIQUE_CARRIER to reach in the same partition during the reduce phase. But our key is a composite key with 3 fields. Just partitioning by key will not work. So, we’ll create a custom partitioner that knows which value to use in determining the partition the data will arrive to. This has been done in above code by extending the Partitioner class.
  2. We also need to tell Spark how we want our data sorted like- ORDER BY (UNIQUE_CARRIER) ASC, (DEST_AIRPORT_ID) ASC, ARR_DELAY DESC; Means, we want the ARR_DELAY to be in descending order, so flights with the biggest delay will be listed first. This has been done by implementing a custom java comparator. Refer in above code.
CustomComparator implements Comparator<CustomKey>, Serializable {}

Summary

Now it’s time to put our partitioning and sorting into action. This is achieved by using the repartitionAndSortWithinPartitions and it is not a group by operation. It will only move data having the same key to the same partition and sort it according to the comparator method on the OrderedRDDFunctions class. From spark java doc, it says —

Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys.

This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

Output —

We can see our results sorted by UNIQUE_CARRIER, DEST_AIRPORT_ID and ARR_DELAY.

--

--