Re-implement Melbourne Shuffle on Apache Hadoop

Jianan Lu
Princeton Systems Course
12 min readJan 27, 2020

Jianan Lu and Susan Tan, COS 518, Fall 2019

1. Introduction

The migration of large dataset storage and processing onto the cloud poses many challenges in how to ensure security and privacy for data owners. Current approaches use data encryption and trusted execution environment to provide perfect isolation for local data processing. VC3 [1] is the first such system that runs distributed MapReduce computations while keeping all local mapper and reducer computations secure inside SGX processors. However, as the recent work by Ohrimenko et al. [2] shows, these techniques are not sufficient enough because the shared nature of our cloud infrastructure today enables both the cloud provider as well as other tenants to observe side information on one’s memory, storage and network usage. They found that the volume of intermediate traffic between Map and Reduce nodes can leak significant information about the sensitive dataset on which operations are performed, even if the traffic is encrypted. To remedy this problem, they proposed a few solutions, one of which uses Melbourne shuffle [3] to hide any correlation between the intermediate traffic (e.g. the observable I/O between Mappers and Reducers) and the input dataset with reasonable performance overhead.

In this project, we re-implement Melbourne shuffle on the latest release of Apache Hadoop 2 [5] (Hadoop 2.10.0). We wanted to see how much effort it requires to integrate Melbourne shuffle into the current mapreduce framework of Hadoop. In addition, we wanted to better understand how this feature can impact the performance of the system (e.g time spent in each phase of a MapReduce task, time to completion, memory usage and such).

2. MapReduce in Hadoop

2.1. Hadoop Overview

Apache Hadoop is a scalable and reliable framework that allows distributed processing of large datasets. As shown in Figure 1, it has three major components:

  • YARN: Yet Another Resource Negotiator that manages computation resources, including CPUs and memory, for applications running above it.
  • HDFS: Hadoop Distributed File System that stores data persistently across different nodes on a Hadoop cluster. It uses data replication and fault tolerance to ensure high reliability.
  • MapReduce: an application framework that consists of the Map phase and the Reduce phase. During the Map phase, input data is divided into smaller pieces where each one runs a map function inside a container on a single node in parallel with others. The Reduce phase aggregates results output by the Map phase and invokes the reduce function to generate a final output.

The YARN infrastructure and HDFS are completely decoupled: one provides resource management while the other provides data storage. It is worth noting that MapReduce is one of many possible application frameworks that can run on top of them.

Figure 1: High-Level Overview of Hadoop Architecture

2.2. The MapReduce Paradigm

We now describe the execution of a MapReduce job in Hadoop in more detail. A MapReduce client first submits a MapReduce job to Resource Manager controlled by YARN. In the meantime, the client will also move necessary resource files from local file system to Hadoop Distributed File System (HDFS). Resource Manager monitors the job progress and periodically notify the client of current job status (map progress, reduce progress, or any exceptions encountered). Resource Manager then choses a Node Manager with available resources and requests a container for MRAppMaster. MRAppMaster will execute map and reduce tasks defined by the client that’s included in the job configuration. It first negotiate with Resource Manager for most available node (nodes that contains most resources required for the job), grabs resources such as input splits from HDFS, and then starts map and reduce tasks on different slave nodes (node Y and node Z in Figure 2).

Figure 2: Execution of a MapReduce Job in Hadoop

When a map task is initialized, a MapTask context is created and later passed into a Mapper class. A MapReduce client will overwrite map() function within Mapper class, and optionally overwrite setup() and cleanup() method to set and clean local variables. At the end of the map() function, the client calls context.write() method writing intermediate key value pairs into local files on nodes where the computation takes place. This is done by two threads running in parallel from the MapTask class. First, the intermediate key-value pair enters a circular buffer. Simultaneously another Thread called SpillRecord is running, and as soon as the records collected in the circular buffer reaches a threshold (by default it is 80%), the records in the buffer (in memory) is written into local files partitioned according to the hashcode of the key. The number of output files are determined by the number of reducers either specified by the user, or if not specified, determined by the framework based on the workload. Each intermediate output file corresponds to the partition that’s based on the hashcode of the key. In other words, the process guarantees key-value pairs with the same key get assigned to the same partition and therefore goes to the same reducer.

Figure 3: Current Hadoop Implementation For a Map Task

A reduce task starts by merging intermediate output files from all map tasks either locally or over the network if they are on different physical nodes. For instance, Reduce 1 will read all file 1 in Figure 3 and generate a merged and sorted input file. A ReduceContextImpl class takes in this input file and binds to a Reducer class. It then iterates through the final input file. For every unique key encountered, it passes this key and an iterable of values under this key to reduce() function for further processing. The Reducer class returns when there is no more unique key to process.

Figure 4: Current Hadoop Implementation For a Reduce Task

3. Problems

3.1. Threat Model

Our threat model is consistent with [2]. In particular, we assume a passive attacker who can observe the volume of any communication (e.g. network traffic and disk reads/writes) between mappers, reducers and the storage unit.

3.2. The Problems

As Figure 2 illustrates, a MapReduce job can consist of multiple map and reduce tasks, each residing in its own container on possibly a different physical node. Before each reducer starts, it reads from the intermediate output files that correspond to its partition from all mappers. However, this intermediate traffic can leak some valuable information about the underlying key value pairs.

  1. Issue 1: The partition assignment of key value pairs is very predictable. Hadoop currently uses hashCode of the key to decide which partition the record should go (see Figure 3). If the attacker has some knowledge about the input dataset in advance (e.g. the type of the key is integer), he can precisely predict what keys each reducer will process if they appear in the input dataset.
  2. Issue 2: The number of intermediate key value pairs transferred between mappers and reduces reveals key popularity for each partition, making it possible to guess what the key is and what the corresponding value is, especially when with the aid of publicly available information like word frequency, disease popularity and such.

4. Melbourne Shuffle for MapReduce

To tackle the above two problems, Ohrimenko et al. [2] uses Melbourne shuffle to make intermediate traffic appear data independent. In this post, we will not explain the detailed algorithm and proof of Melbourne shuffle [3]. Instead, we want to give some intuition of why it is better than other oblivious sorting algorithms in the case of MapReduce, and a brief summary of the three building blocks that implement Melbourne shuffle.

Assume input data is split into batches, Melbourne shuffle guarantees that local application of the shuffle within each batch will result in a global oblivious shuffle of the original input data. This is advantageous because, instead of adding a global shuffling layer between the Map and Reduce phase which can incur a lot of performance overhead, now every map task can perform the shuffle locally right before writing intermediate records to the local disk.

The Melbourne shuffle algorithm deploys three core techniques: pseudo-random permutation, padding with dummy records, and probabilistic encryption.

4.1. Keyed Pseudo-Random Permutation

A pseudo-random permutation takes in a secret key k and updates the location of every record in a dataset D. The resultant sequence appears indistinguishable from a truly random permutation over D to anyone who does not know about the secret key. In Hadoop MapReduce, the user can provide a secret permutation key to the framework to randomize the partition assignment. Thus, the partition where a key value pair will go is completely independent from the value of the key as long as the permutation key remains unknown to the attacker.

4.2. Padding with Dummy Records

To hide the exact number of records in each intermediate output file, a straightforward way is to pad each file upto max number of records. The attacker will see is that every reducer (or partition) receives the same amount of key value pairs from each map task. He or she learns little information about the key popularity.

4.3 Probabilistic Encryption

Hiding the content of intermediate output files is important because otherwise the attacker can learn everything (e.g. the key being processed and the final result) by reading the plaintext. Probabilistic encryption ensures that ciphertext of same record will be different each time it is encrypted. As a result, given an encrypted record, the attacker cannot decide whether it corresponds to a previously seen record, or a new record, or valid record, or a dummy record.

5. Our Implementation

We added ~300 lines of code in Java to incorporate the Melbourne shuffle feature into the source code of Hadoop 2.10.0 [5]. This section covers major changes in the codebase.

Each MapReduce job starts with a job configuration. We added methods for user programs to provide a 4-byte integer as the secret permutation key. If set, it is then passed into every map task to invoke the Melbourne shuffle functionality.

Before entering the circular buffer (shown in Figure 3), intermediate key value pairs first get their partition numbers assigned in MapTask class. We updated the logic to use our written PRP partitioner when Melbourne shuffle is enabled. We used the Goldreich-Goldwasser-Micali (GGM) construction [4] to build the PRP. We used Random() objects in Java to first build a length-doubling pseudo-random number generator (PRNG). The user-provided secret key serves as the initial input to the PRNG. Then, We converted the hashCode of the intermediate record’s key into a 32-bit input array and passed it to the PRP. The output of PRP module the total number of partitions will become the partition for this record. Because PRP is deterministic, records with the same key will always go to the same partition and reducer to ensure correctness.

Ohrimenko’s paper [3] pads every partition of all map tasks upto max records. However, this max value highly depends on the dataset itself. We found it difficult to determine the optimal value for max without doing a key popularity analysis for the dataset beforehand. Instead, we relaxed this requirement. For every mask task, we set max to be the size of the partition containing the most intermediate records. That is, output files of the same map task will always have the same size, though they might have different sizes from other map tasks’. This does not help the attacker at all because every reducer will eventually receive the same amount of records. In MapTask class, we added an ArrayList to keep track of the size of each partition. By the end of the map task, we calculated the value of max and padded every partition upto max records. To distinguish between valid and dummy records, we attached a validity flag to every intermediate records. Instead of directly storing values, we stored <value, validity_flag> in the intermediate output files. Finally, we updated iterator methods in ReduceContextImpl (shown in Figure 4) to ignore dummy records. Currently, our modification in ReduceContextImpl contains a logic flaw that sometimes outputs incorrect values for some keys. This can affect the correctness of MapReduce jobs. Although we believe this should not greatly impact the performance of reduce tasks, time of the Reduce phase measured in our experiments should be taken with a grain of salt.

Hadoop 2.10.0 already supports encryption of intermediate data by setting the properties in mapreduce’s configuration file. We did not implement this part.

Our code is available at https://github.com/amberlu/hadoop-2.10.0-src-mapreduce.

Figure 5: Total Time Comparison Between 10 and 50 reducer tasks

5. Evaluation

We set up our experiments on the ns cluster with 62 nodes. Each node has 16 2.6GHZ Intel Xeon CPU cores. To run hadoop on a traditional high-performance cluster, we used MyHadoop framework [6]. MyHadoop allows users to provision and deploy Hadoop clusters within the batch scheduling environment (Slurm workload manager for ns cluster) of such systems with minimal expertise required.

In all of our experiments, we disabled both combiner and intermediate data encryption. This is because the combiner stage can complicate the number of dummy records added as well as their sizes. Further, we wanted to better understand the performance overhead of Melbourne shuffle besides data encryption.

We tested our implementation with the word count example provided by mapreduce framework. Word count counts the number of each word appeared in the input file and write the occurrence of each word into an output file. In our experiment, we used MovieLens 20M (ml-20m) data set as the inputs to word count. Ml-20m contains fivedata sets. Below is a size summary of the 5 data sets.

Table 1: MovieLens Dataset Summary

We ran our experiments with 10 reduce tasks and 50 reduce tasks. To reduce the resource contentions between tasks, we assigned additionally 5–10 nodes until the performance is saturated (that there are enough nodes for each worker). The result shows (Figure 5) that there exists a performance degradation due to our secure implementation. To diagnose where the overhead comes from, Figure 6 shows that most of the overhead comes from map stage caused by the pseudo random function. This is because for every intermediate record, the PRP is invoked once and, in our case of GGM construction, the PRNG is invoked 32 times. For future work, an optimized algorithm will be applied to reduce such overhead. One method is to pre-build the tree structured PRP when MapTask is created and save it as a class variable. Future calls to PRP involves traversing the tree and returning the value at the right leaf node, without doing extra computation. Figure 6 on the other hand, also shows with some datasets (links, genome-tags & ratings), we see a performance gain in reducer execution time. This indicates that our technique evenly distributes hot keys among the workers and therefore has minimal effect on the reduce phase.

Figure 6: Map & Reduce Time comparison between baseline and Melbourne Hadoop

Figure 7 reports the total output size from the mappers. We see that on average, we wrote 50% more dummy records into local files. The amount of dummy records depend on how evenly the partitions are divided. For future work, a better partition function could be developed to reduce the amount of dummy records written. Another improvement we could make in the future is to design a flexible threshold of dummy record padding to avoid the overhead introduced.

Figure 7:Total Map Output Comparison between baseline and Melbourne Hadoop

References

[1] Schuster, Felix, Manuel Costa, Cedric Fournet, Christos Gkantsidis, Marcus Peinado, Gloria Mainar-Ruiz, and Mark Russinovich. “VC3: Trustworthy Data Analytics in the Cloud Using SGX.” In 2015 IEEE Symposium on Security and Privacy, 38–54. San Jose, CA: IEEE, 2015. https://doi.org/10.1109/SP.2015.10.

[2] Ohrimenko, Olga, Manuel Costa, Cédric Fournet, Christos Gkantsidis, Markulf Kohlweiss, and Divya Sharma. “Observing and Preventing Leakage in MapReduce.” In Proceedings of the 22nd ACM SIGSAC Conference on Computer and Communications Security — CCS ’15, 1570–81. Denver, Colorado, USA: ACM Press, 2015. https://doi.org/10.1145/2810103.2813695.

[3] Ohrimenko, Olga, Michael T. Goodrich, Roberto Tamassia, Eli Upfal. “The Melbourne Shuffle: Improving Oblivious Storage in the Cloud.https://arxiv.org/pdf/1402.5524.pdf.

[4] Goldreich, Oded, Shafi Golderwasser, Silvio Micali. “How to construct random functions.” Journal of the Associations of Computing Machinery, Vol. 33, №4, October 1986, pp. 792–807. https://dl.acm.org/doi/10.1145/6490.6503

[5] Apache Software Foundation. Hadoop.http://wiki.apache.org/hadoop/, 01/07/20.

[6] Krishnan, Sriram, Mahidhar Tatineni, and Chaitanya Baru. “MyHadoop — Hadoop-on-Demand on Traditional HPC Resources,” n.d., 8.

--

--