2. Learning Hadoop with ML-100k — MapReduce Sort Program

Vaishnavi Nandakumar
4 min readNov 26, 2023

--

Introduction

In the previous post, we covered a basic problem statement covering a MapReduce problem to count the number of distinct values of an attribute (movie rating) in the ML-100k dataset. You can find the previous post here.

In this post, let’s go a step further and understand how we can sort this output.

Data

The data we will be using for this problem would be the Movie Ratings data as defined in u.data of the ML-100k dataset.

Problem Statement

Generate a sorted list of the most popular movies

For this problem statement, the popularity metric of a movie will depend on the number of times it has been rated. It will not depend on the rating value.

So to deal with this, we need two steps:

  1. Calculate the number of times each movie has been reviewed.
  2. Sort the results in descending order based on the calculated count.

Solution

Mapper 1 — Count movie ratings

We follow the same steps as in the previous tutorial i.e. iterate through every record and assign the attribute value as the key corresponding to one.

In this case, data[1] represents the movie Id.

public class CountMapper extends Mapper<Object, Text, Text, IntWritable> {
private final static IntWritable one = new IntWritable(1);
private final static Text word = new Text();
public void map(Object key, Text value, Mapper.Context context) throws IOException, InterruptedException {
String[] data = value.toString().split("\t");
String movieCode = data[1];
word.set(movieCode);
context.write(word, one);
}
}

Reducer 1 — Aggregate count value

All data with the same key goes to the same reducer and it then produces the total count of ratings each movie ID has received.

public class CountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
private static final IntWritable result = new IntWritable(0);
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for(IntWritable i: values){
sum+=i.get();
}
result.set(sum);
context.write(key, result);
}
}

Mapper 2 — Swap keys

Hadoop MapReduce automatically sorts the output of the map phase before it is passed to the reduce phase. The sorting is based on keys from the mappers. In our case, we need to sort the result based on the total count of each movie ID.

So for this, we introduce another mapper that swaps the key-value pair. Now we have the key corresponding to the count and the value corresponding to the movie ID.

public class KeySwapperMapper extends Mapper<Object, Text, IntWritable, Text> {
IntWritable frequency = new IntWritable();
Text t = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] data = value.toString().split("\t");
frequency.set(Integer.parseInt(data[1]));
t.set(data[0]);
context.write(frequency, t);
}
}

Comparator

The default sorting behaviour is based on the natural order of the keys. However, since we need the list to be descending in order we use custom comparator to define the sorting logic.

public class IntComparator extends WritableComparator {
public IntComparator() {
super(IntWritable.class);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2,
int s2, int l2) {
Integer v1 = ByteBuffer.wrap(b1, s1, l1).getInt();
Integer v2 = ByteBuffer.wrap(b2, s2, l2).getInt();
return v1.compareTo(v2) * (-1);
}
}

Why don’t we need another reducer?

Once the keys are swapped, we do not require any additional aggregation. The MapReduce framework will treat each mapper’s output as a separate partition and will write the sorted key-value pairs directly to the output.

Driver

Now we have two jobs that we need to connect in the driver program. The basic idea behind chaining these two programs together is by feeding the output of the first program as the input for the next one.

    public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf1 = new Configuration();
Job job1 = Job.getInstance(conf1, "job1");
job1.setJarByClass(PopularMovieDriver.class);
job1.setMapperClass(CountMapper.class);
job1.setCombinerClass(CountReducer.class);
job1.setReducerClass(CountReducer.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job1, new Path(args[0]));
FileSystem fs = FileSystem.get(new Configuration());
fs.delete(new Path(args[1]), true);
FileOutputFormat.setOutputPath(job1, new Path(args[1]));
job1.waitForCompletion(true);

Configuration conf2 = new Configuration();
Job job2 = Job.getInstance(conf2, "job2");
job2.setJarByClass(PopularMovieDriver.class);
job2.setMapperClass(KeySwapperMapper.class);
job2.setSortComparatorClass(IntComparator.class);
job2.setOutputKeyClass(IntWritable.class);
job2.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job2, new Path(args[1]));
fs.delete(new Path(args[2]), true);
FileOutputFormat.setOutputPath(job2, new Path(args[2]));
System.exit(job2.waitForCompletion(true) ? 0 : 1);
}
}

What happens if we don’t specify the OutputKeyClass and OutputValueClass?

In a Hadoop MapReduce job, the setOutputKeyClass and setOutputValueClass methods are used to specify the types of the key and value that the final output of the MapReduce job will have enabling proper serialization and deserialization during the MapReduce process.

Not specifying the classes will lead to runtime errors and unexpected behaviours.

Running the Solution

Output

One thing to note while running this JAR is that now we have to provide an extra input parameter to store the results of the second mapper as well. Once you’ve run the following command —

bin/hadoop jar ml-100k.jar org.example.PopularMovieDriver /user/vaishnavink/u.data user/vaishnavink/sorted-movies-output/pop-movie-data-output-1 user/vaishnavink/sorted-movies-output/pop-movie-data-output-2

You can expect to see two folders

  1. Represents the count for every movie ID without sorting

2. Generates the sorted list

Conclusion

You made it to the end! You can find the code I’ve used here. In my next post I’ll be writing about computing averages in a MapReduce program.

--

--