Journey to MIT 6.824 - Lab 1 MapReduce

Qingping Meng
CodeX
Published in
7 min readApr 21, 2021
Photo by Nick Morrison on Unsplash

The motivation

Early this year I was randomly browsing our company’s discussion group, and the MIT online course MIT 6.824 Distributed System happened to come across to me. After I did a quick look at the course syllabus, I was immediately intrigued by it and decided to follow this course as much as I can.

https://pdos.csail.mit.edu/6.824/schedule.html

I was having a tough time with my daily work dealing with a complex and highly distributed system at that time, and I hoped I could get some inspiration from this course as well as systematic understanding of distributed system rather than several fragmented online articles here and there.

Another reason was all labs in this course used Golang, a language that is highly adopted in cloud native application and open-source projects like k8s and Dapr. I thought this would be good opportunity for me to learn Go.

Now I finally finished the lab1 MapReduce and got all tests passed. I feel it would be helpful to share how I made it, the mistakes I made, which I believe is a lot more valuable than the final code.

MapReduce Overview

Now, let’s step into the topic: Lab 1 of the MapReduce. Before we dive deep into it, at least you should read this paper about MapReduce to have a basic knowledge.

To summary the core idea of MapReduce, usually it takes a large set of key value pairs as input and produces another usually smaller set of key value pairs.

  • Map function: Each input key value pair (k1, v1) is processed by a map function. The map function will output a list of (k2, v2), where k2 is the intermediate key.
  • Reduce function: Each (k2, list of v2)) is passed to a reduce function, the reduce function usually merge the list of v2 to a smaller set of values list(v2) as the final output.

Implement MapReduce

When I first cloned the lab1 framework, honestly, I navigated around for a long time and get confused and was like what am I supposed to do now. After reading the hints and the paper back and forth, finally I started to understand how things were wired up.

All the three files that need to add my own codes are:

  • src/mr/coordinator.go to add the logic to maintain the status
  • src/mr/worker.go to add the logic to execute each map/reduce job.
  • src/mr/rpc.go to add the RPC calls argument and reply contract.

The general idea about this lab is straightforward. In this lab, the framework will start one coordinator and multiple workers. When a new worker is spawn, it requests a job from coordinator via RPC call -> executes the received job -> produces outputs -> reports the job result. Repeat these steps until the entire MapReduce is done.

There are four major parts to be implemented here:

  • Design the data structure to hold MapReduce status
  • Design the communication between coordinator and worker
  • Handle map/reduce job in worker and produce proper output
  • Coordinator

Data Structure

First, let’s think about what status the coordinator needs to maintain and pick the right data structure to hold them:

  • The total number of reducers, denoted as nReducer int
  • The status of map jobs. A mapStatus map[string]int to keep track of each input file name and its status. mapStatus['pg-grimm.txt']=0 means input file pg-grimm.txt hasn’t started running yet.
  • The status of reduce jobs. A reduceStatus map[int]int to keep track of each reducer and its status. reduceStatus[0]=1 means the first reducer is running.
  • A self-incremental id for each map job. Every time a map job is assigned to a worker, this id increments by 1.
  • The intermediate file locations. A intermediateFiles[int][]string to keep track of intermediate files for each reducer.

That’s all the status for coordinator to maintain. Since works talk to coordinator concurrently, we need a mutex lock as well. The Coordinator struct looks like below:

type Coordinator struct { 
mapStatus map[string]int
mapTaskId int
reduceStatus map[int]int
nReducer int
intermediateFiles map[int][]string
mu sync.Mutex
}

RPC

Once we have the data structure defined, let’s think about what communication message are needed between coordinator and worker.

Request a job:

When a worker is spawn, it requests a job from coordinator via RPC. The coordinator iterates over mapStatus to find the first unassigned map job with the following payload:

type MapJob struct { 
InputFile string
MapJobNumber int
ReducerCount int
}
  • The InputFile is the input file to be opened.
  • The MapJobNumber is used as part of the intermediate files
  • The ReducerCount is used to decide the key partition.

If no unassigned map job found but there are still running map jobs, the coordinator sends a nil map job to the worker so worker can request again later as reduce job can only start when all map jobs are completed.

If all map jobs are completed, the coordinator iterates over the reduceStatus to find the first unassigned reduce job with the following payload:

type ReduceJob struct { 
IntermediateFiles []string
ReduceNumber int
}
  • The IntermediateFiles are the intermediate files for the reducer number ReduceNumber

If all jobs are completed, a signal to exit is sent to the worker, and below is the request job response:

type RequestTaskReply struct { 
MapJob *MapJob
ReduceJob *ReduceJob
Done bool
}

Report a completed job

Every time a worker completes a job, it reports the completed job to coordinator so coordinator can update the status. The request payload depends on the job type and response payload is empty.

If the job is a map job, the work should tell the below information to the coordinator, InputFile is the map job identifier and IntermediateFile is the location of files produced by this map job.

type ReportMapTaskArgs struct { 
InputFile string
IntermediateFile []string
}

If the job is a reduce job, the work only reports the reducer number to the coordinator.

The worker

Now it’s time to jump into the most challenging part to implement the handler of map/reduce jobs.

Map job

When a worker receives a map job, it opens the input file first, read all the content and it’s ready to call the map function mapf(filename, string(fileContent)). Note the filename is not used inside map function in word count program but mapf function requires a key anyway.

Inside the mapf function, it simply splits contents into an array of words and for each work, it produce a key value pair, e.g. {“hello”, 1}, so the final output from mapf function is a array of key value pairs kva.

A significant difference from real MapReduce is that all the intermediate data is in one place kva, but for real MapReduce, we need to partition the intermediate data into N partitions, where N is the number of reducers.

So, the worker needs to iterate on kva. For each key value pair (k,v), use the provided hash function to decide which reducer this key belongs to:

partitionedKva := make([][]KeyValue, reduceCount)  
for _, v := range kva {
partitionKey := ihash(v.Key) % reduceCount
partitionedKva[partitionKey] = append(partitionedKva[partitionKey], v)
}

At last, persist the partitionedKva[i] to the disk with the filename mr-x-i, where xis the map job number. After this is completed, the work can report a map job is completed via RPC.

Reduce job

When a reducer receives a reduce job, the job includes all the intermediate files what need to be handled by this reducer. The worker first opens all the intermediate files one by one to build an array of key value pairs kva.

Usually the intermediate key value pairs require sorting, so we can use the provided function to sort kva. The sorted kva looks like below:

{"Key": "A", "Value": "1"}
{"Key": "A", "Value": "1"}
{"Key": "About", "Value": "1"}

The reducef function takes a key and values for the key. In the word count program, the worker needs to iterate on the sorted kvato find all the values under the same key, something like below:

reducef("A", ["1", "1"]) => output A     2
reducef("About", ["1"]) => output About 1

Every time a reducef output a result, we can persist it into a temp file, once all keys are reduced, we can rename the temp file the final output file mr-out-x, where x is the reducer number. The reason it doesn’t directly write to the final output is the worker can crash at any time, this can prevent reducer produce a corrupted output file.

The coordinator

There are two things left for coordinator:

  1. Handle worker crash
  2. Exit when all jobs are done.

To exit, the coordinator should implement the Done function properly. This is very straightforward, only return true when all reduceStatus are completed.

Worker can crash at any time, and if a worker is crashed, the coordinator need to update the job assigned to the crashed worker status back to NotStarted.

To implement this, we need to track the time a job is assigned to a worker, and the coordinator will spawn a ticker to check if any running job is timed out our not.

At this moment if we run bash test-mr.sh you should see all test passed.

Conclusion

Thanks for reading to here. I know this was tedious and very likely not funy. Looking back to this lab 1 MapReduce when I finished it, it's not extremely hard. The key point is to understand the core idea of MapReduce:

  • What does map function expect as input?
  • What does intermediate output look like?
  • What does reduce function expect as input?
  • What does the final output look like?

Once you figure them out, you should be able to write your own codes without any problem.

Finally, here is my rough and dirty implementation of lab1: https://github.com/QingpingMeng/mit6.824/tree/lab1

--

--