Hitchhiker guide to MapReduce with MRJob in Python

Vicky Bee Wu
Datable
Published in
6 min readMar 7, 2021
  1. What is MapReduce?

MapReduce is a programming paradigm for big data processing, where data is partitioned into distributed chunks and processed by a series of transformations.

The MapReduce programming paradigm processes data in 2 operations: map() and then reduce(). map() is a user-defined function that maps each data record in the data collection. reduce() groups the output of map() with another user-defined function.

2. MapReduce Pipeline

MapReduce works on (key, value) pairs by performing the below steps:

INPUT: list of key-value pairs (k1, v1)

MAP: (k1, v1) >> [list of (k2, v2)]

SHUFFLE: combine (k2, v2) >> (k2, [list of v2])

REDUCE: (k2, [list of v2]) >> (k3, v3)

OUTPUT: list of (k3, v3)

To put things into perspective, let’s go through these steps with a simple example of WordCount. Say we have a document that contains two lines:

the article is about the mapreduce paradigmmapreduce is part of big data ecosystem

We want to compute the frequency of each word in this document. We will work out this problem from the bottom up by first identifying what we want as the end output.

INPUT: (document: text line)

OUPUT: (unique word, frequency)

The end output we want is a list of (unique word, frequency) pairs, and to achieve that we need an intermediate output of each word in the document and a count of 1 — (word, 1). Having a list of (word, 1) will allow us to combine by words and add all the 1s of the same word together.

Next, SHUFFLE is performed on intermediate output 1. It groups all the (word, 1) pairs by words and produces intermediate output 2:

Finally a Reducer is used to sum up all the 1s together, which gives us the end output:

3. MapReduce application in Python — Introducing mrjob

mrjob is a library that allows you to write Python programs that run on Hadoop. With mrjob, you can test your code locally without installing Hadoop or run it on a cluster of your choice. “If you don’t want to be a Hadoop expert but need the computing power of MapReduce, mrjob might be just the thing for you.”

First install mrjob by running:

pip install mrjob

Then import MRJob

from mrjob.job import MRJob

Afterwards you can write a job that is defined by a class and contains a series of steps which include mappers, combiners and a reducers. Mappers, combiners and reducers are optional, though you must have at least one.

Example 1: Assuming we have a text file named book.txt. Our task is to count the frequency of words in the first 1000 lines of this document.

First we create a class called WordCount, remember to put ‘MRJob’ in the argument. Then we create a mapper function and a reducer function inside the class

class WordCount(MRJob): def mapper(self, key_1, value_1):
yield (key_2, value_2)
def reducer(self, key_2, value_2):
yield (key_3, value_3)

Inside the mapper function, our (key, value) input pairs are (_, line) and our output pairs should be (word, 1). Note that (_, line) tells MRJob to ignore the key and take each line of the document as the value.

def mapper(self, _, line):
for word in line:
if length(word) > 0:
yield (word, 1)

Inside the reducer function, our (key, value) input pairs are (word, count) and our output pairs should be (word, sum(count)).

def reducer(self, word, count):
yield (word, sum(count))

This class is then complete. Finally we just need to assign this class to a task.

task0 = WordCount(args = [])

To print out the output, we will read the txt file in streaming fashion which means we will read it row by row, and perform task0 on each row.

Example 2: We have a file ‘citibike.csv’ which contains the data of citibike trips in NYC. Each row of the data represents a unique trip by a biker from station A to station B. The columns we need for this task are “start_station_name”, “end_station_name” and “gender”. Each row of the data represents a unique trip.

Now we are asked to compute the stations with the most riders started from, per each gender. Meaning, what was the station name with the highest number of bike pickups for female riders, for male riders and for unknown riders.

The output will be a list of tuples, each includes a gender label (as indicated below) and another tuple consisting of a station name, and the total number of trips started at that station for that gender. ie: (0: (station_A, 100))

The label mapping for the gender column in citibike.csv is: (‘0’=Unknown; ‘1’=Male; ‘2’=Female).

Same as before, we should start thinking from the bottom up. The output we want in the end is a nested tuple:

#desired output
(gender_label, (max_start_station_name, max_number_of_trips))

where gender_label is the key and (start_station_name, max_number_of_trips) is the value. This can be achieved by using a reducer to extract the max value out of all values if we have an intermediate key-value pair:

#intermediate output1
(gender_label, (start_station_name, number_of_trips_for_station))

In intermediate output 1, we have gender_label as the key and (start_station_name, number_of_trips_for_station) as the value. This means for each gender, we keep a count of each station’s number of pickups, which will look like this: (numbers are made up)

(0, (station_A, 4))
(0, (station_B, 1))
(0, (station_C, 210))
(1, (station_A, 1))
(1, (station_B, 378))
(1, (station_C, 3))
(2, (station_A, 5))
(2, (station_B, 4))
(2, (station_C, 0))

This can be rewritten (mapped) into intermediate output2 by just switching the key from gender_label to (gender, station) pair.

# intermediate output2
((gender_label, start_station_name), number_of_trips_for_station))

Now we have something easy to achieve. We can just keep a record of each row’s gender_label and start_station_name with a count of 1, and sum up all the count by key (gender_label, start_station_name) like the below:

((0, station_A), 1 1 1 1 ))
((0, station_B), 1))
((0, station_C), 1 1 1 ...))
((1, station_A), 1))
((1, station_B), 1 1 1 ...))
((1, station_C), 1 1 1))
((2, station_A), 1 1 1 1 1))
((2, station_B), 1 1 1 1))
((2, station_C), ))

This can be easily done using a reducer on our record of of each row’s gender_label and start_station_name with a count of 1. That will be our intermediate output 3:

# intermediate output3
((0, station_A), 1))
((0, station_A), 1))
((0, station_A), 1))
((0, station_C), 1))
((1, station_B), 1))
((0, station_B), 1))
((0, station_C), 1))
((2, station_A), 1))
((1, station_B), 1))
((1, station_C), 1))
...

To code this in MRJob, we will need a job with 2 steps and each step containing a mapper and a reducer. This requires us to add a step() function into the mrjob class to tell mrjob which mapper and reducer are used together.

def steps():
return [MRStep(mapper=self.mapper1,
reducer=self.reducer1),
MRStep(mapper = self.mapper2,
reducer=self.reducer2)]

Now we can write our mappers and reducers from the top down. For mapper1, we want to achieve intermediate output3:

def mapper1(self, key, row):
yield ((row['gender'], row['start_station_name']), 1)

Then we can use a reducer to sum up all the counts to achieve intermediate output2:

def reducer1(self, gender_station, count):
yield (gender_station, sum(count))

Then we use a mapper to restructure the key-value pair so that we have only gender as the key and station-count pair as the value. This will be out intermediate output 1:

def mapper2(self, gender_station, count):
gender, station = gender_station
yield (gender, (station, count))

Finally we just need to pick out the maximum value from the key-value pair by applying a reducer to intermediate output1:

def reducer2(self, gender, station_count):
genderMap = {'0':'Unknown', '1':'Male', '2':'Female'}
yield (genderMap[gender], max(station_count, key = lambda x : x[1]))

The entire code for this task:

You can find more information on how to write a job in MRJob here. Thanks for reading and have fun creating jobs!

--

--