5. Learning Hadoop with ML-100k — Multiple Joins

Vaishnavi Nandakumar
4 min readDec 2, 2023

--

Introduction

Over the last couple of tutorials, we implemented different types of MapReduce programs. From a simple counter program to implementing joins using distributed cache, I hope this series covered a decent range of some standard problem statements one might come across.

In this post, we will tackle another join-related problem, diving into more specialized processing logic.

Data

We will be using two datasets from the ML-100k dataset for this problem statement.

Movie Ratings data in u.data

User data in u.user

Movie Data in u.item

Problem Statement

Produce a list of the top movies for every age group, with each group spanning a decade

There are two important things to note here. Firstly, this problem statement asks for the top movies and not their IDs. To correlate the IDs with their names, we will have to implement a join between u.item and u.data. Similarly, we will also require a join between u.data and u.user to create the age distribution buckets.

Secondly, this example will make use of two joins. However, the final keys to which we reduce the results to (age buckets) won’t be an attribute of the dataset we have. So we will need to perform additional processing to write the keys and their corresponding values into our context.

Solution

Driver

Since we are implementing a join operation in this example, we are adding two input files into our distributed cache so that they’re available to every worker node.

public class PopularMovieByAgeDistributionDriver {
public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException, URISyntaxException {
Configuration conf1 = new Configuration();
Job job1 = Job.getInstance(conf1, "job1");
job1.setJarByClass(PopularMovieByAgeDistributionDriver.class);
job1.setMapperClass(PopularMovieByAgeDistributionMapper.class);
job1.setReducerClass(PopularMovieByAgeDistributionReducer.class);
DistributedCache.addCacheFile(new URI(args[1]), job1.getConfiguration());
DistributedCache.addCacheFile(new URI(args[2]), job1.getConfiguration());
job1.setOutputKeyClass(Text.class);
job1.setOutputValueClass(Text.class);
FileInputFormat.addInputPath(job1, new Path(args[0]));
FileSystem fs = FileSystem.get(new Configuration());
fs.delete(new Path(args[3]), true);
FileOutputFormat.setOutputPath(job1, new Path(args[3]));
job1.waitForCompletion(true);
}
}

Mapper

Like we described earlier, the mapper function will create two hashmaps to keep track of the information from the input files available from the distributed cache. The userId attribute is what connects these values to each other in this scenario.

After the setup method is initliazed, the mapping takes place. Here, we will not be giving a defined attribute from one of the datasets as a key. Instead, after accessing the age of reviewer for a movie ID, further processing is done to assign the value to an age bucket. This age bucket is then assigned as the key.

The value corresponding to the key here would be the name of the movie obtained by joining the movieID with the movieName from the u.item dataset.

public class PopularMovieByAgeDistributionMapper extends Mapper<LongWritable, Text, Text, Text> {
Text outputKey = new Text();
Text outputValue = new Text();
HashMap<String, String> hashmap = new HashMap<>();
HashMap<String, String> movieHashmap = new HashMap<>();

@Override
protected void setup(Context context) throws IOException{
Path[] files = DistributedCache.getLocalCacheFiles(context.getConfiguration());
for(Path file : files) {
if (file.getName().equals("u.user")) {
BufferedReader reader = new BufferedReader(new FileReader(file.toString()));
String line = reader.readLine();
while (line != null) {
String[] cols = line.split("\\|");
hashmap.put(cols[0], cols[1]);
line = reader.readLine();
}
}

if (file.getName().equals("u.item")) {
BufferedReader reader = new BufferedReader(new FileReader(file.toString()));
String line = reader.readLine();
while (line != null) {
String[] cols = line.split("\\|");
movieHashmap.put(cols[0], cols[1]);
line = reader.readLine();
}
}
}
}

public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] data = value.toString().split("\t");
if(hashmap.get(data[0])!=null){
int age = Integer.parseInt(hashmap.get(data[0]));
int category = age/10;
String keyValue = (category*10) + " - " + (category+1)*10;
outputKey.set(keyValue);
outputValue.set(movieHashmap.get(data[1]));
context.write(outputKey, outputValue);
}
}
}

Reducer

After the mapper method, we are now presented with several age buckets each corresponding to a list of movies. Our next objective is to find out the most popular movie from this list.

So for our reducer, we create a hashmap containing each movie name and their respective occurence counts. The movie with the maximum count is considered to be the most popular and is written into the context.

public class PopularMovieByAgeDistributionReducer extends Reducer<Text, Text, Text, Text> {
Text result = new Text();
public void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
HashMap<String, Integer> map = new HashMap<>();
int val;
for(Text movie: values){
if(map.get(movie.toString())!=null){
val = map.get(movie.toString());
}
else{
val = 0;
}
map.put(movie.toString(), val+1);
}


String maxCount = "";
int maxValueInMap = (Collections.max(map.values()));
for (Map.Entry<String, Integer> entry :
map.entrySet()) {
if (entry.getValue() == maxValueInMap) {

maxCount = entry.getKey();
}
}
result.set(maxCount);
context.write(key, result);
}
}

Running the Solution

Upon running this command, you can view the output

bin/hadoop jar ml-100k.jar org.example.movie.distribution.PopularMovieByAgeDistributionDriver /user/vaishnavink/u.data /user/vaishnavink/u.user /user/vaishnavink/u.item /user/vaishnavink/output-1

Output

Conclusion

With this tutorial, we’ve finished a five part series on learning Hadoop with the ML-100k dataset. I’d appreciate any suggestions and ideas you have to make this better. You can access the code for this tutorial here.

--

--