1. Learning Hadoop with ML-100k — MapReduce Count Program

Vaishnavi Nandakumar
4 min readNov 22, 2023

--

Introduction

Hadoop is an open source framework that helps to store and process large datasets. It uses distributed storage and parallel processing to break down data into smaller workloads that are worked on by programming models.

I have decided to document my progress in learning more about the Hadoop ecosystem through a series of blog posts. We will go over the basic concepts in Hadoop while working on a problem statements at the same time.

To stay consistent, I would be using the same ML-100k dataset throughout the series. In the initial post, we’ll tackle a straightforward problem using MapReduce to count distinct values from the dataset. As we advance through the tutorial, we’ll also discuss conceptual questions, exploring the reasons and methods behind certain implementation choices.

I’ve formulated most of the problem statements and solutions independently. I’m welcome to any suggestions on how I could make it better.

ML-100K Dataset

This ml-100k dataset consists of 100,000 movie ratings (1–5) from 943 users on 1682 movies. You can download the dataset from here.

In this tutorial we will be using the movie rating dataset as defined in u.data.

Now that we have an idea of the type of data we’re dealing with, let’s get working.

Problem Statement

Analyze Rating Popularity in Movie Review

We want to figure out which ratings are most common in movie reviews. To do this, we need to count how many times each different rating shows up in the dataset.

Solution

Mapper

The mapper function takes input data in the form of key-value pairs, representing lines from your dataset. It has the following format

class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>

We iterate through every line and split it into an array of columns. The rating value we get is then written into the context where we assign it a value of one.

public class RatingsCountMapper extends Mapper<Object, Text, Text, IntWritable>{
private final static IntWritable one = new IntWritable(1);
private final static Text word = new Text();
public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String[] data = value.toString().split("\t");
String rating = data[2];
word.set(rating);
context.write(word, one);
}
}

Why are the IntWritable and Text classes used?

The main answer is serialization. Java’s serialization is too bulky and slow which makes it very costly on the Hadoop environment. These classes also implement interfaces such as implements interfaces like Comparable, Writable and WritableComparable all of which are useful for MapReduce. The Writable interface serializes the objects in a very light manner that makes the process fast and compact making it very efficient for the huge number of remote calls happen between the nodes in a cluster.

What does Object key do here?

Hadoop uses TextInputFormat to process input files. In this format, files are broken into lines. The key represents the line offset i.e the position in the file and values are the line of text. This key is LongWritable type by default.

What is a Context?

Essentially, Context provides an interface for the mapper or reducer to interact with the Hadoop framework during the execution of a MapReduce job. Besides this it also takes care of Task Information, Configuration details, Statuses, Counters etc.

Reducer

The reducer function is responsible for aggregating the values received into another set of key-value pairs.

From our mapper function we assigned every instance of a rating value to correspond to one. To get the total count, we’d have to aggregate it using the sum operation.

public class RatingsCountReducer extends Reducer<Text, IntWritable, Text, IntWritable>{
private static final IntWritable result = new IntWritable(0);
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for(IntWritable i: values){
sum+=i.get();
}
result.set(sum);
context.write(key, result);
}
}

Driver

Now since we have defined our Mapper and Reducer classes, we can now define the Driver class that puts everything together.

The job object holds information about the the input, output format and the various other parameters of the map reduce job.

public class RatingsCountDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(conf);
Job job = Job.getInstance(conf, "rating-count");
job.setJarByClass(RatingsCountDriver.class);
job.setMapperClass(RatingsCountMapper.class);
job.setCombinerClass(RatingsCountReducer.class);
job.setReducerClass(RatingsCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job, new Path(args[0]));
fs.delete(new Path(args[1]), true);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}

What is a Combiner class?

A combiner serves as an intermediate processing step between the Mapper and Reducer in Hadoop. Its primary role is to optimize data processing. By doing so, it helps prevent network congestion and reduces the workload on both the mapper and reducer, making the process more flexible and scalable.

However, it’s important to note that using a combiner introduces potential challenges. It may temporarily increase the load on local disk storage during its operation and can lead to data inconsistencies if the reducer operations lack associativity or commutativity.

Running the Solution

Output

After creating a JAR file and ensuring all input files are present in HDFS, you can run the program like the following-

bin/hadoop jar ml-100k.jar org.example.RatingsCountDriver /user/vaishnavink/u.data /user/vaishnavink/rating-count-output

If all goes well, your output should look like this -

Conclusion

Congrats on making to the end! You can access the entire code here. Over the course of this tutorial you have learnt how to implement a simple counter implementation in MapReduce. In the next post, I will be writing on how we could chain multiple MapReduce job in a program.

--

--