3. Learning Hadoop with ML-100k — MapReduce Average Program

Vaishnavi Nandakumar
4 min readNov 27, 2023

Introduction

Over the last tutorials, we learnt how to write MapReduce programs for simple counting and sorting problem statements. This article would be an extension of the same except this time we will be performing the average operation.

You can checkout the previous tutorials here.

Data

The data we will be using for this problem would be the Movie Ratings data as defined in u.data of the ML-100k dataset.

Problem Statement

Generate a sorted list of the most highly rated movies

To compute the overall rating of a movie ID, we need to find its average. So much like the previous tutorial we have two steps here.

  1. Calculate the average rating value of each movie ID.
  2. Sort the results in descending order.

Solution

Mapper 1

In the previous counter examples, we iterated through every record and assigned each key to one.

However, in this case we assign each key to its corresponding rating value set inside and IntWritable object.

public class AverageRatingsMapper extends Mapper<Object, Text, Text, IntWritable> {
private static final Text movie = new Text();
private static final IntWritable rating = new IntWritable();

public void map(Object key,Text value, Context context) throws IOException, InterruptedException{
String[] data = value.toString().split("\t");
String movieCode = data[1];
movie.set(movieCode);
rating.set(Integer.parseInt(data[2]));
context.write(movie, rating);
}
}

Reducer 1

This reducer is where things get slightly tricky. There are two things to note here.

  1. To calculate the average, we need to not only find the cumulative sum of the ratings but also keep a count of the total number of ratings as defined by the countSum attribute.
  2. The output of the average here would be of the type FloatWritable so we have to make sure that it’s defined accordingly everywhere.
public class AverageRatingsReducer extends Reducer<Text, IntWritable, Text, FloatWritable> {
private static final FloatWritable avg = new FloatWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int ratingSum = 0;
int countSum = 0;

for(IntWritable val : values){
ratingSum += val.get();
countSum++;
}

avg.set((float) ratingSum /countSum);
context.write(key, avg);
}
}

Mapper 2

As we discussed before, we need to swap keys to implementing the sorting. You can refer the previous tutorial for more explanation.

public class KeySwapperMapperFloat extends Mapper<Object, Text, FloatWritable, Text> {
FloatWritable frequency = new FloatWritable();
Text t = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] data = value.toString().split("\t");
frequency.set(Float.parseFloat(data[1]));
t.set(data[0]);
context.write(frequency, t);
}
}

Comparator

The comparator class must also ensure compatibility with FloatWritable. This custom comparator is designed for comparing float values. In the compare method, we extract the float value from its byte representation and utilize -1 to arrange it in descending order.


public class FloatComparator extends WritableComparator {
public FloatComparator() {
super(FloatWritable.class);
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2,
int s2, int l2) {
Float v1 = ByteBuffer.wrap(b1, s1, l1).getFloat();
Float v2 = ByteBuffer.wrap(b2, s2, l2).getFloat();
return v1.compareTo(v2) * (-1);
}
}

Driver

Putting all of these together, you have a driver class that chains both the jobs.

public class AverageRatingsDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf1 = new Configuration();
Job job1 = Job.getInstance(conf1, "job1");
job1.setJarByClass(AverageRatingsDriver.class);
job1.setMapperClass(AverageRatingsMapper.class);
job1.setReducerClass(AverageRatingsReducer.class);
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job1, new Path(args[0]));
FileSystem fs = FileSystem.get(new Configuration());
fs.delete(new Path(args[1]), true);
FileOutputFormat.setOutputPath(job1, new Path(args[1]));
job1.waitForCompletion(true);

Job job2 = Job.getInstance(conf1, "job2");
job2.setJarByClass(AverageRatingsDriver.class);
job2.setMapperClass(KeySwapperMapperFloat.class);
job2.setSortComparatorClass(FloatComparator.class);
job2.setOutputKeyClass(FloatWritable.class);
job2.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job2, new Path(args[1]));
fs.delete(new Path(args[2]), true);
FileOutputFormat.setOutputPath(job2, new Path(args[2]));
System.exit(job2.waitForCompletion(true) ? 0 : 1);
}
}

Running the Solution

Once you’ve run the following command —

bin/hadoop jar ml-100k.jar org.example.AverageRatingsDriver /user/vaishnavink/u.data user/vaishnavink/avg-rating/output-1 user/vaishnavink/avg-rating/output-2

Output

You can ideally expect to see two folders with the output.

  1. Average rating value for all movie IDs.

2. Sorted list of movie IDs

Conclusion

You can access the code for this tutorial here. In the next article, I will be covering joins and distributed cache in HDFS.

--

--