MapReduce & Hadoop API revised

Iván de Prado
Iván’s blog
Published in
5 min readFeb 13, 2012

Nowadays, Hadoop has become the key technology behind what has come to be known as “Big Data”. It has certainly worked hard to earn this position. It is mature technology that has been used successfully in countless projects. But now, with experience behind us, it is time to take stock of the foundations upon which it is based, particularly its interface. This article discusses some of the weaknesses of both MapReduce and Hadoop, which we, at Datasalt, shall attempt to resolve with an open-source project that we will soon be releasing.

MapReduce

MapReduce is the distributed computing paradigm implemented by Hadoop. Its interface is based on implementing two functions, Map and Reduce, which process key/value data pairs. The files issued by the Map function are grouped by key and are received as a single group in the Reduce function.

Experience has shown us that the setup proposed by MapReduce for data processing creates difficulties for a series of issues that are quite common to any Big Data project. Let’s take a look at a few of these.

Compound records

Key/value files are sufficient for implementing the typical WordCount, for example, since only two types of data per file are needed: a string for the word and an integer for the counter.

However, the vast majority of cases call for more than 2 fields per record and this does not fit in with the key/value format. To solve this, compound records must be created. There are two ways to do this in Hadoop:

  1. By implementing your own compound data type by implementing Writable. This solution is usually highly complex and time-consuming so it is not feasible in a project with many types of data.
  2. By using a serialization library such as Thrift or Avro.

This latter solution is less efficient, but in general it is preferable to the first option. And even so, it is insufficient. You usually end up creating numerous different records for a single piece of data. This is necessary to adapt the record to the key or the value.

In short, the key/value model is too restrictive and therefore hinders development. A more flexible option is needed.

Sorting

The MapReduce model does not define the posibility of receiving the values sorted in some way in each call to the Reducer function. However, Hadoop enables us to sort in this manner by correctly combining:

  • A key that contains the values for secondary sorting
  • A specific Partitioner
  • A specific group comparison function.
Secondary Sorting in Hadoop

In other words, a mess. Unfortunately, however, this is one of the most common patterns used in programming with Hadoop: you want the value records to reach the reducer in a certain order. For example, this is the case when you want to analyze the events occurring in a one-hour window and you want to analyze them in the order of occurrence, so you need the events to be sorted in order.

One alternative would be to sort them in memory in the Reduce function itself. This would only work if the records received in the Reducer fit into the memory. But weren’t we talking about creating scalable systems?

Getting the records sorted at the Reducer shouldn’t be so difficult.

Joins

Doing joins between two data sets with different schema is almost essential when programming with MapReduce. For example, if you want to calculate the average expense per age group, you need to do a join between People and Sales datasets.

To do this in Hadoop, you would have to create two map functions, one for the People dataset and another one for the Sales dataset. Both map functions have to emit the person ID, which is present in both People and Sales, as the key, but what do they issue in the value? Hadoop has to receive the same type of data in the value, regardless of where it comes from. So we would have to create a new kind of record that joins the Person and the Sale: the same type of data is sometimes a Person, and other times, a Sale. Furthermore, some sorting is needed so that the Person record always reaches the Reduce function before the multiple Sale records done by this Person do (since this is a 1-n join). I won’t go further into detail, but it is easy to see that this is really complicated, keeping in mind how common the pattern is. There are other tricks for doing this, but none of them are satisfactory.

There must be a simpler way of doing a Reduce join and other calculations with heterogeneous data sources.

Hadoop

Instantiation vs. serialization for the processing logic

In Hadoop, the processing logic moves to where the data are. The mechanism that Hadoop uses for this task is to send the Java class that contains the logic to all the nodes, and to create a new instance for each task. For example, the Map and Reduce function to be applied are configured like this:

[code lang=”java”]
job.setMapperClass(MyMap.class);
job.setReducerClass(MyReduce.class);
[/code]

This method works well in most cases, but makes it hard to create modular Map and Reduce functions. The problem is that, with the current mechanism, the only way of transmitting status or configuration to your Map and Reduce functions is by using the Configuration object, which is a key/value map with Strings.

We feel that it is simpler and more robust if the logic is sent by serialization of previously created instances. This would allow us to do things like the following:

[code lang=”java”]
job.setMapperClass(new RegexpFilter(“[a-z]*@[a-z]*”));
[/code]

A lot more elegant and simpler than using the Configuration to send the regular expression.

Multiple output files

Each Hadoop Job usually receives several input files and generates several output files. Hadoop’s default API is ready to issue results in a single file. To be able to issue several files, the MultipleOutputs class should be used. This is sufficient in most cases. But it is not quite easy and it does not fit perfectly.

At any rate, we believe there must be native support for the option of issuing several files, making the job easier.

Alternatives

The alternative interface for Hadoop provided by Avro can be used (see our post on this topic). However, it does not solve all the problems discussed in this article.

There are also higher level tools such as Cascading, Hive and Pig which attempt to solve these problems. However, as higher level tools, they do not allow us to take advantage of all Hadoop’s capacities.

Conclusion

In sum, we need an alternative, low level Hadoop API, which enables all kinds of optimizations, while at the same time simplifying the development for the most common patterns. At Datasalt, we are working on this, and will be releasing the project for the community shortly.

--

--