Apache Spark : Secondary Sorting in Spark in Java

Chandra Prakash
Feb 18 · 5 min read

We all might have seen secondary sorting in Mapreduce/Hadoop and its key implementation. There are enough information and blogs available on this. Let’s learn how secondary sorting can be achieved using Spark in Java.

Before we go ahead, you can refer the dataset from here — https://www.kaggle.com/bhoomika216/airline-dataset-for-analysis?select=FINALdata1.csv

As per the definition, secondary sorting problem relates to sorting values associated with a key in the reduce phase. Sometimes, it is called value-to-key conversion. The secondary sorting technique will enable us to sort the values (in ascending or descending order) passed to each reducer. In simple terms, you can relate it to ORDER BY in sql. Also, It relies on using a Composite Key which will contain all the values we want to use for sorting. Now, using this dataset which you might have downloaded from kaggle, we will try to achieve secondary sorting. Let’s head into that now, The data we are using is the airline on-time performance data and also there are several data points available, but we’ll focus on which airlines have the most late arrivals and at which airport these late arrivals occur. From this statement, we can determine our sort order: AirlineId (or UNIQUE_CARRIER), AirportId (or DEST_AIRPORT_ID) and delay time(or ARR_DELAY). I mean to say — ORDER BY (UNIQUE_CARRIER) ASC, (DEST_AIRPORT_ID) ASC, ARR_DELAY DESC. Before we jump into code, we need to understand the approaches first, there are at least two possible approaches for sorting the reducer values. These solutions may be applied to both the MapReduce/Hadoop and Spark frameworks.

1st Approach —

Performing Secondary Sorting in Memory or In Reducer Sort

In this approach, we collect the grouped data at each reducer’s node/executor, it will get buffered and sorted in memory (using Java’s Collections.sort()). There is a scalability issue with this approach because of the in memory sort. As soon as the grouped data will exceed the reducer’s node/executor’s memory then an Out of Memory exception will be thrown and hence job will fail. It will work with low size dataset.

2nd Approach —

Performing Secondary Sorting Using Spark Framework or In shuffle sort

In this approach, we will first understand — “Composite Key”. A composite key is formed when a value from the payload is promoted and is appended to the key or natural key. Let’s discuss more on this —

Input Format

The input data is in CSV format and have below columns, those in bold are our values on which secondary sorting will be applied :

DAY_OF_MONTH, DAY_OF_WEEK FL_DATE, UNIQUE_CARRIER, CARRIER ORIGIN_AIRPORT_ID, ORIGIN_CITY_MARKET_ID, ORIGIN_STATE_ABR, DEST_AIRPORT_ID, DEST_CITY_MARKET_ID, DEST_STATE_ABR, CRS_DEP_TIME, DEP_TIME, DEP_DELAY_NEW, TAXI_OUT, WHEELS_OFF, WHEELS_ON, TAXI_IN, CRS_ARR_TIME, ARR_TIME, ARR_DELAY, CANCELLED, CANCELLATION_CODE DIVERTED

Secondary sorting will be applied in similar to — ORDER BY (UNIQUE_CARRIER) ASC, (DEST_AIRPORT_ID) ASC, ARR_DELAY DESC;

Composite Key will be like

The ‘natural’ key here is the UNIQUE_CARRIER and DEST_AIRPORT_ID and ARR_DELAY are the values which we’ll include our natural key. Defining the composite key (by adding the DEST_AIRPORT_ID and ARR_DELAY to the natural key i,e UNIQUE_CARRIER) enables us to sort the reducer values using the Spark framework, i,e the system shuffle will first sort by the first part of the key and then by the second. We will see how we can achieve this in Spark, for now, just remember that we have something called — “repartitionAndSortWithinPartitions()” since Spark 1.2. Let’s look at code now to understand what exactly has been done.

CustomKey class -

Creating the Key-Value Pairs

The data is in CSV format and will be converted into key-value format. The important part of secondary sorting is which value(s) to include in the key to enable the additional ordering. The ‘natural’ key is the UNIQUE_CARRIER and DEST_AIRPORT_ID and ARR_DELAY are the values we’ll include in the key. Same has been done in code using — javaRDD.map() and map.mapToPair(), please refer above code for these.

Partitioning and Sorting Code

Now we need to partition and sort our data. There are two points we need to consider.

  1. We need to group the data by UNIQUE_CARRIER to reach in the same partition during the reduce phase. But our key is a composite key with 3 fields. Just partitioning by key will not work. So, we’ll create a custom partitioner that knows which value to use in determining the partition the data will arrive to. This has been done in above code by extending the Partitioner class.
  2. We also need to tell Spark how we want our data sorted like- ORDER BY (UNIQUE_CARRIER) ASC, (DEST_AIRPORT_ID) ASC, ARR_DELAY DESC; Means, we want the ARR_DELAY to be in descending order, so flights with the biggest delay will be listed first. This has been done by implementing a custom java comparator. Refer in above code.

Summary

Now it’s time to put our partitioning and sorting into action. This is achieved by using the repartitionAndSortWithinPartitions and it is not a group by operation. It will only move data having the same key to the same partition and sort it according to the comparator method on the OrderedRDDFunctions class. From spark java doc, it says —

Repartition the RDD according to the given partitioner and, within each resulting partition, sort records by their keys.

This is more efficient than calling repartition and then sorting within each partition because it can push the sorting down into the shuffle machinery.

Output —

Image for post
Image for post

We can see our results sorted by UNIQUE_CARRIER, DEST_AIRPORT_ID and ARR_DELAY.

Analytics Vidhya

Analytics Vidhya is a community of Analytics and Data…

Sign up for Analytics Vidhya News Bytes

By Analytics Vidhya

Latest news from Analytics Vidhya on our Hackathons and some of our best articles! Take a look.

By signing up, you will create a Medium account if you don’t already have one. Review our Privacy Policy for more information about our privacy practices.

Check your inbox
Medium sent you an email at to complete your subscription.

Chandra Prakash

Written by

Big Data Developer - Spark / Flink / Java 8 https://www.linkedin.com/in/chandra-prakash-28932652/

Analytics Vidhya

Analytics Vidhya is a community of Analytics and Data Science professionals. We are building the next-gen data science ecosystem https://www.analyticsvidhya.com

Chandra Prakash

Written by

Big Data Developer - Spark / Flink / Java 8 https://www.linkedin.com/in/chandra-prakash-28932652/

Analytics Vidhya

Analytics Vidhya is a community of Analytics and Data Science professionals. We are building the next-gen data science ecosystem https://www.analyticsvidhya.com

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store