MultiCombiner

LiveRamp
LiveRamp Engineering
4 min readFeb 12, 2014

We’re happy to announce that we’ve added a new tool built on top of Cascading, called MultiCombiner, to our open source project cascading_ext. MultiCombiner allows one to run arbitrarily many aggregation operations, each with their own distinct grouping fields, over a single stream of tuples using only a single reduce step. MultiCombiner uses combiners internally to ensure that these aggregation operations are done efficiently and without unnecessary I/O operations.

Combiners are a useful tool for optimizing Hadoop jobs by reducing the number of records which need to be shuffled, decreasing sorting time, disk I/O, and network traffic on your cluster. Combiners work by partially reducing on records during the map phase which have the same key, and emitting the partial result rather than the original records, in essence doing some of the work of the reducer ahead of time. Combiners decrease the number of records generated by each map task, reducing the amount of data that needs to be shuffled and often providing a significant decrease in time spent on I/O. Because each combiner only has access to a subset of the records for each key in a random order, combiners are only useful for operations which are both commutative and associative, such as summing or counting. For a quick combiner tutorial check out this combiner tutorial.

Combiners are supported by Cascading through AggregateBy and the Functor and Aggregator interfaces, which implement the combiner and reducer logic respectively. Cascading’s combiner implementation provides a few advantages over Hadoop’s native handling, including methods for composing aggregation operations that use the same grouping fields and implementation that allows for combination at the end of reduce step as well as the end of a map.

What Cascading’s and Hadoop’s implementations both lack is a way to run many combiner-backed aggregation operations over a single stream using different grouping fields in an efficient way. In both Hadoop and Cascading, such a computation would require a full read and shuffle of the data set for each of the different grouping fields. This can make it prohibitively expensive to add new stats or aggregates to a workflow. We wanted the flexibility to add new stats over our datasets without significantly increasing computation time, and so we built MultiCombiner.

An Example Usage

Suppose that it is your job to compute stats for a large retailer. The retailer stores records of purchase events in an HDFS file with the following format:

user_id , item_id, item_amount, timestamp

You are required to compute

1) The total number of items purchased by each user
2) The total number of items of each kind sold

Using the features built into Cascading, we might build the flow this way:

Pipe purchaseEvents = new Pipe("purchase_events");Pipe countByUser = new SumBy( purchaseEvents , new Fields("user_id"), new Fields("item_amount"), new Fields("total_items_by_user"));

Pipe countByItem = new SumBy( purchaseEvents , new Fields(“item_id”), new Fields(“item_amount”), new Fields(“total_items_by_item”));

Because each SumBy contains a GroupBy, this assembly will spawn 2 entirely separate Hadoop jobs, each running over the same data set. While the combiners used will mitigate the damage somewhat, your task is still reading, shuffling, and writing the same data twice. Using MultiCombiner, we can combine these two aggregation operations into a single Hadoop job, significantly reducing the total amount of I/O we have to do.

Pipe purchaseEvents = new Pipe("purchase_events");CombinerDefinition countByUserDef = new CombinerDefinitionBuilder()
.setGroupFields(new Fields("user_id"))
.setInputFields(new Fields("item_amount"))
.setOutputFields(new Fields("total_items_by_user"))
.setExactAggregator(new SumExactAggregator(1))
.get();

CombinerDefinition countByItemDef = new CombinerDefinitionBuilder()
.setGroupFields(new Fields(“item_id”))
.setInputFields(new Fields(“item_amount”))
.setOutputFields(new Fields(“total_items_by_item”))
.setExactAggregator(new SumExactAggregator(1))
.get();

MultiCombiner combiner = MultiCombiner.assembly(purchaseEvents, countByItemDef, countByUserDef);

Pipe countsByUser = combiner.getTailsByName().get(countByUserDef.getName())

The tails of the MultiCombiner assembly will give access to the results for each definition separately. The tail pipes have the same name as the definitions, so you can use the definition names as keys to the map provided by the getTailsByName method. Alternatively, there’s MultiCombiner.singleTailedAssembly which will emit the results in a single stream.

Things to Watch Out For

The ability to run arbitrary aggregators over a single stream is pretty useful, but there’s no magic going on in the background. Each aggregator used in a MultiCombiner emits roughly the same number of tuples as if it were being used on its own. Because the sorting involved in the shuffle is an n*log(n) operation, shuffling the output of many aggregators all at once is less efficient than shuffling their outputs separately. This is usually not an issue because of the time saved reading the data set only once, but may matter if the number of tuples being shuffled is much larger than the data being read from disk. Additionally, each aggregator must keep its own map in memory for its combiner. Because of the additional memory pressure, combining for many aggregators can potentially be less efficient.

Conclusion

The time we’ve saved from using this tool in our own flows has been enormous. We originally used MultiCombiner to compute 4 different aggregates over a 6.5 terabyte dataset. We’ve since increased the number of aggregates to 8, and have seen an approximately 10% increase in the amount of time the job takes. Compare this to the 100% increase we would have expected from doubling the number of aggregates, and its easy to see how MultiCombiner can improve the performance of complex data aggregation workflows.

--

--