How to write a MapReduce framework in Python

An easy to follow guide

“I tried to explain as much as I could,” Poppet says. “I think I made an analogy about cake.” “Well, that must have worked,” Widget says. “Who doesn’t like a good cake analogy?” ― Erin Morgenstern, The Night Circus

I do not know why I decided to name this framework mapcakes, but I love that name and everybody loves cake.. anyway…

MapReduce is an elegant model that simplifies processing data sets with lots of stuff (a.k.a large datasets). As a result of a weekend project here’s an overly simplistic Python MapReduce framework implementation. In this post you will be guided through the steps I followed and an example implementation for counting words applied toUnveiling A Parallel: A Romance” taken from project gutenberg. The finished version of the code is present as mapcakes on github. Here are a couple of choices that were made for the implementation:

  • We are going to use CPython version 2.7.6.
  • The multiprocessing module is used to spawn processes, by calling the start() method on a created Process object.
  • There is an output file corresponding to each reduce thread.
  • The outputs can be merged into one single file in the end.
  • The results of the map step (as well as the output files for each reduce thread) are stored in memory using JavaScript Object Notation(JSON).
  • One may choose to delete or leave these files in the end.

If you are not yet familiar with the mapreduce framework, you will find a gentle introduction to it on this link at Quora. Enjoy :)

Implementing the MapReduce class

First, what we will do is write a MapReduce class that will play the role of an interface to be implemented by the user. This class will have two methods: mapper and reducer that must be implemented later on (An example implementation for a word count using MapReduce is presented below in the section Word Count Example). Hence we start by writing the following class:

import settings
class MapReduce(object):
"""MapReduce class representing the mapreduce model
    note: the 'mapper' and 'reducer' methods must be
implemented to use the mapreduce model.
"""
def __init__(self, input_dir = settings.default_input_dir, output_dir = settings.default_output_dir,
n_mappers = settings.default_n_mappers, n_reducers = settings.default_n_reducers,
clean = True):
"""
        :param input_dir: directory of the input files,
taken from the default settings if not provided
:param output_dir: directory of the output files,
taken from the default settings if not provided
:param n_mappers: number of mapper threads to use,
taken from the default settings if not provided
:param n_reducers: number of reducer threads to use,
taken from the default settings if not provided
:param clean: optional, if True temporary files are
deleted, True by default.
"""
self.input_dir = input_dir
self.output_dir = output_dir
self.n_mappers = n_mappers
self.n_reducers = n_reducers
self.clean = clean
    def mapper(self, key, value):
"""outputs a list of key-value pairs, where the key is
potentially new and the values are of a potentially different type.
Note: this function is to be implemented.
        :param key:
:param value:
        """
pass
    def reducer(self, key, values_list):
"""Outputs a single value together with the provided key.
Note: this function is to be implemented.
        :param key:
:param value_list:
        """
pass

For the explanation of the different settings check the settings module section below. Next, we will need to add a run() method, for the MapReduceclass, that will execute the map and reduce operations. For this we need to define a run_mapper(index) method (where index refers to the current thread), which will use the mapper and store the results on disk, and a run_reducer(index) which will apply the reducer to the results of the map and store the results on disk . The run() method will spawn the desired number of mappers then the desired number of reducers. TheProcess object from the multiprocessing module is used as follows:

def run_mapper(self, index):
"""Runs the implemented mapper
    :param index: the index of the thread to run on
"""
# read a key
# read a value
# get the result of the mapper
# store the result to be used by the reducer
pass
def run_reducer(self, index):
"""Runs the implemented reducer
    :param index: the index of the thread to run on
"""
# load the results of the map
# for each key reduce the values
# store the results for this reducer
pass
def run(self):
"""Executes the map and reduce operations
    """
# initialize mappers list
map_workers = []
# initialize reducers list
rdc_workers = []
# run the map step
for thread_id in range(self.n_mappers):
p = Process(target=self.run_mapper, args=(thread_id,))
p.start()
map_workers.append(p)
[t.join() for t in map_workers]
# run the reduce step
for thread_id in range(self.n_reducers):
p = Process(target=self.run_reducer, args=(thread_id,))
p.start()
map_workers.append(p)
[t.join() for t in rdc_workers]

Now, we must complete our run_mapper and run_reducer methods. But, since these methods require reading and storing data from one input file, we will first create a FileHandler class. This class will split the input file using a split_file(number_of_splits) method (where number of splits is the total number of chunks that we want as a result of the split). The FileHandler class will also join the outputs using ajoin_files(number_of_files, clean, sort, decreasing) method (where number_of_files is the total number of files to join, clean, sort anddecreasing are all optional boolean arguments set to True by default in our case. clean indicates whether we want to deleted the temporary files after the join, sort indicates whether or not to sort the results and decreasing indicates whether we want to sort in the reverse order). We this in mind, we start by writing the FileHandler object as follows:

class FileHandler(object):
"""FileHandler class
Manages splitting input files and joining outputs together.
"""
def __init__(self, input_file_path, output_dir):
"""
Note: the input file path should be given for splitting.
The output directory is needed for joining the outputs.
        :param input_file_path: input file path
:param output_dir: output directory path
        """
self.input_file_path = input_file_path
self.output_dir = output_dir
    def split_file(self, number_of_splits):
"""split a file into multiple files.
        :param number_of_splits: the number of splits.
        """
pass
    def join_files(self, number_of_files, clean = None, sort = True, decreasing = True):
"""join all the files in the output directory into a
single output file.
        :param number_of_files: total number of files.
:param clean: if True the reduce outputs will be deleted,
by default takes the value of self.clean.
:param sort: sort the outputs.
:param decreasing: sort by decreasing order, high value
to low value.
        :return output_join_list: a list of the outputs
"""
pass

We then complete writing the split and join methods:

import os
import json

class FileHandler(object):
"""FileHandler class
Manages splitting input files and joining outputs together.
    """
def __init__(self, input_file_path, output_dir):
"""
Note: the input file path should be given for splitting.
The output directory is needed for joining the outputs.
        :param input_file_path: input file path
:param output_dir: output directory path
        """
self.input_file_path = input_file_path
self.output_dir = output_dir
    def begin_file_split(self, split_index, index):
"""initialize a split file by opening and adding an index.
        :param split_index: the split index we are currently on, to be used for naming the file.
:param index: the index given to the file.
        """
file_split = open(settings.get_input_split_file(split_index-1), "w+")
file_split.write(str(index) + "\n")
return file_split
    def is_on_split_position(self, character, index, split_size, current_split):
"""Check if it is the right time to split.
i.e: character is a space and the limit has been reached.
        :param character: the character we are currently on.
:param index: the index we are currently on.
:param split_size: the size of each single split.
:param current_split: the split we are currently on.
        """
return index>split_size*current_split+1 and character.isspace()
    def split_file(self, number_of_splits):
"""split a file into multiple files.
note: this has not been optimized to avoid overhead.
        :param number_of_splits: the number of chunks to
split the file into.
        """
file_size = os.path.getsize(self.input_file_path)
unit_size = file_size / number_of_splits + 1
original_file = open(self.input_file_path, "r")
file_content = original_file.read()
original_file.close()
(index, current_split_index) = (1, 1)
current_split_unit = self.begin_file_split(current_split_index, index)
for character in file_content:
current_split_unit.write(character)
if self.is_on_split_position(character, index, unit_size, current_split_index):
current_split_unit.close()
current_split_index += 1
current_split_unit = self.begin_file_split(current_split_index,index)
index += 1
current_split_unit.close()

Now, we can complete our run_mapper and run_reducer methods like this:

def run_mapper(self, index):
"""Runs the implemented mapper
    :param index: the index of the thread to run on
"""
input_split_file = open(settings.get_input_split_file(index), "r")
key = input_split_file.readline()
value = input_split_file.read()
input_split_file.close()
if(self.clean):
os.unlink(settings.get_input_split_file(index))
mapper_result = self.mapper(key, value)
for reducer_index in range(self.n_reducers):
temp_map_file = open(settings.get_temp_map_file(index, reducer_index), "w+")
json.dump([(key, value) for (key, value) in mapper_result
if self.check_position(key, reducer_index)]
, temp_map_file)
temp_map_file.close()

def run_reducer(self, index):
"""Runs the implemented reducer
    :param index: the index of the thread to run on
"""
key_values_map = {}
for mapper_index in range(self.n_mappers):
temp_map_file = open(settings.get_temp_map_file(mapper_index, index), "r")
mapper_results = json.load(temp_map_file)
for (key, value) in mapper_results:
if not(key in key_values_map):
key_values_map[key] = []
try:
key_values_map[key].append(value)
except Exception, e:
print "Exception while inserting key: "+str(e)
temp_map_file.close()
if self.clean:
os.unlink(settings.get_temp_map_file(mapper_index, index))
key_value_list = []
for key in key_values_map:
key_value_list.append(self.reducer(key, key_values_map[key]))
output_file = open(settings.get_output_file(index), "w+")
json.dump(key_value_list, output_file)
output_file.close()

And, finally we slightly modify the run method to enable the user to specify whether or not to join the outputs. The run method becomes:

def run(self, join=False):
"""Executes the map and reduce operations
    :param join: True if we need to join the outputs, False by default.
"""
# initialize mappers list
map_workers = []
# initialize reducers list
rdc_workers = []
# run the map step
for thread_id in range(self.n_mappers):
p = Process(target=self.run_mapper, args=(thread_id,))
p.start()
map_workers.append(p)
[t.join() for t in map_workers]
# run the reduce step
for thread_id in range(self.n_reducers):
p = Process(target=self.run_reducer, args=(thread_id,))
p.start()
map_workers.append(p)
[t.join() for t in rdc_workers]
if join:
self.join_outputs()

The final code is present in the github MapCakes repository on the following link: https://github.com/nidhog/mapcakes

The ‘settings’ Module

This module contains the default settings and utility functions to generate the path names for the input, output and temporary files. These utility methods are described in the comments of the code snippet below:

# set default directory for the input files
default_input_dir = "input_files"
# set default directory for the temporary map files
default_map_dir = "temp_map_files"
# set default directory for the output files
default_output_dir = "output_files"
# set default number for the map and reduce threads
default_n_mappers = 4
default_n_reducers = 4
# return the name of the input file to be split into chunks
def get_input_file(input_dir = None, extension = ".ext"):
if not(input_dir is None):
return input_dir+"/file" + extension
return default_input_dir + "/file" + extension


# return the name of the current split file corresponding to the given index
def get_input_split_file(index, input_dir = None, extension = ".ext"):
if not(input_dir is None):
return input_dir+"/file_"+ str(index) + extension
return default_input_dir + "/file_" + str(index) + extension


# return the name of the temporary map file corresponding to the given index
def get_temp_map_file(index, reducer, output_dir = None, extension = ".ext"):
if not(output_dir is None):
return output_dir + "/map_file_" + str(index)+"-" + str(reducer) + extension
return default_output_dir + "/map_file_" + str(index) + "-" + str(reducer) + extension

# return the name of the output file given its corresponding index
def get_output_file(index, output_dir = None, extension = ".out"):
if not(output_dir is None):
return output_dir+"/reduce_file_"+ str(index) + extension
return default_output_dir + "/reduce_file_" + str(index) + extension
        
# return the name of the output file
def get_output_join_file(output_dir = None, extension = ".out"):
if not(output_dir is None):
return output_dir +"/output" + extension
return default_output_dir + "/output" + extension

Word Count Example

In this example we assume that we have a document and we want to count the number of occurrences of each word in the document. To do this we need to define our map and reduce operations so that we can implement the mapper and reducer methods of the MapReduce class. The solution to the word count is pretty straightforward:

  • map : we split the text, we take the words that contain only ascii characters and lowercase the words. Then, we send each word as a key with a count of 1.
  • reduce : we simply sum all the previous values for each word.

Hence, we implement the MapReduce class as follows:

from mapreduce import MapReduce
import sys
class WordCount(MapReduce):
def __init__(self, input_dir, output_dir, n_mappers, n_reducers):
MapReduce.__init__(self, input_dir, output_dir, n_mappers, n_reducers)
    def mapper(self, key, value):
"""Map function for the word count example
Note: Each line needs to be separated into words, and each word
needs to be converted to lower case.
"""
results = []
default_count = 1
# seperate line into words
for word in value.split():
if self.is_valid_word(word):
# lowercase words

That’s all folks! :D

Thanks for reading! :) If you enjoyed it, hit that heart button below. Would mean a lot to me and it helps other people see the story.

Love Python? Here’s a tutorial on how you can make a chatbot for slack in less than an hour.