4. Learning Hadoop with ML-100k — Joins using Distributed Cache

Vaishnavi Nandakumar
3 min readDec 2, 2023

--

Introduction

The tutorial previously covered, introduced you to using a single input dataset in a MapReduce program for a required problem statement. What if we wanted to handle more than one input source?

In this post, we will be covering MapReduce Joins and their implementation. You can checkout the previous tutorial here.

Data

We will be using two datasets from the ML-100k dataset for this problem statement.

Movie Ratings data in u.data

User data in u.user

For our given problem statement, we’d want to join these two datasets using the user_id key to connect them together.

Problem Statement

Find the age distribution of the movie reviewers

The initial dataset includes user IDs and corresponding movie ratings, while the second dataset comprises user information, specifically their ages.

Our objective is to analyze the age distribution of the reviewers. For each age group, we aim to determine the count of reviewers. Essentially, it follows the same MapReduce approach as our first tutorial counter program. The only difference here is, this time we need the join operation to access the key (age attribute) from the u.user dataset.

Solution

Driver

Unlike our previous tutorial, this time we will start by explaining the driver first as it includes a new topic — DistributedCache.

What is a Distributed Cache?

When our MapReduce program is executed, it will want a way to access u.user which is a relatively smaller file to perform the join operation. The Distributed Cache allows a way to provide a local copy of this file to all worker nodes and delete them once they’re done being used.

public class AverageDistributionDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
Configuration conf1 = new Configuration();
Job job1 = Job.getInstance(conf1, "job1");
job1.setJarByClass(AverageDistributionDriver.class);
job1.setMapperClass(AverageDistributionMapper.class);
job1.setReducerClass(AverageDistributionReducer.class);
DistributedCache.addCacheFile(new URI(args[1]), job1.getConfiguration());
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(IntWritable.class);
FileInputFormat.addInputPath(job1, new Path(args[0]));
FileSystem fs = FileSystem.get(new Configuration());
fs.delete(new Path(args[2]), true);
FileOutputFormat.setOutputPath(job1, new Path(args[2]));
job1.waitForCompletion(true);
}
}

Mapper

In our mapper, we are implementing two methods. The setup method is used to initialise the map reduce task with our custom processing logic.

In our scenario, we access the u.item data from the cached files and process it to generate a hashmap linking each user ID with its corresponding age.

Subsequently, we make use of this hashmap within our map method. For every user ID extracted from the Movie Rating data (u.data), we employ the hashmap to retrieve the associated age. This age attribute is then assigned a value of one and recorded in the context.

public class AverageDistributionMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
IntWritable outputValue = new IntWritable();
Text outputKey = new Text();
HashMap<String, String> hashmap = new HashMap<>();
@Override
protected void setup(Context context) throws IOException {
Path[] files = DistributedCache.getLocalCacheFiles(context.getConfiguration());
for(Path file : files) {
if (file.getName().equals("u.user")) {
BufferedReader reader = new BufferedReader(new FileReader(file.toString()));
String line = reader.readLine();
while (line != null) {
String[] cols = line.split("\\|");
hashmap.put(cols[0], cols[1]);
line = reader.readLine();
}
}
}
}

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] data = value.toString().split("\t");
if(hashmap.get(data[0])!=null){
outputKey.set(hashmap.get(data[0]));
outputValue.set(1);
context.write(outputKey, outputValue);
}
}
}

Reducer

The reducer is pretty straightforward, it gives the count for all key values which in this case is the age.

public class AverageDistributionReducer  extends Reducer<Text, IntWritable, Text, IntWritable> {
IntWritable result = new IntWritable();
public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
int sum = 0;
for(IntWritable i: values){
sum+=i.get();
}
result.set(sum);
context.write(key, result);
}
}

Running the Solution

Upon running this command,

bin/hadoop jar ml-100k.jar org.example.age.distribution.AverageDistributionDriver /user/vaishnavink/u.data /user/vaishnavink/u.user /user/vaishnavink/output-1

Output

You can view one output file with the given values.

Conclusion

You made it to the end! You can access the code for this example here. In our next tutorial, we will implement another example where joins are used and have some changes in the processing logic being applied.

--

--