Designing and scaling a PetaByte Scale System — Part 2

Tarun Kumar
Airtel Digital
Published in
5 min readJul 17, 2020

Challenges Faced while using Object Storage with MapReduce:

Some Key differences between Object Storage File System and Hadoop File System are:

  1. S3 has inconsistent directory listings .When you list a directory, you may not see all the files which have been created in it.
  2. In S3 Object Storage rename is implemented as copy and then delete. In HDFS rename is O(1) process with just change in meta. In S3 Object storage there is actual data movement.

As a result,

  • Files you created may not get listed by list api call.
  • Deleted files may still get discovered. Delete failure results in inconsistent state.
  • Rename takes time proportional to data copy.

A. Classic Hadoop commit Problem:

From Hadoop Website

In HDFS and other filesystems, directory rename() is the mechanism used to commit the work of tasks and jobs. Tasks write data to task attempt directories under the directory _temporary underneath the final destination directory. When a task is committed, these files are renamed to the destination directory (v2 algorithm) or a job attempt directory under _temporary (v1 algorithm). When a job is committed, for the v2 algorithm the _SUCCESS file is created, and the _temporary deleted. For the v1 algorithm, when a job is committed, all the tasks committed under the job attempt directory will have their output renamed into the destination directory. The v2 algorithm recovers from failure by deleting the destination directory and restarting the job. The v1 algorithm recovers from failure by discovering all committed tasks whose output is in the job attempt directory, and only rerunning all uncommitted tasks

None of this algorithm works when working with Object Storage S3 Interface due to above mentioned 2 reasons.

Commit is a very complex problem and hadoop 3.1 has provided S3A Committers with different sets of features. you can read full text here to grasp the problem and how they have solved:

We took inspiration from these committers and implemented our own committer to serve our purpose and works with old hadoop versions as well.

In brief what we do. We have done some modifications on top of hadoop s3A committer:

  1. Our committer works with old hadoop versions as well.
  2. The task write output to a local directory . We have written cleaning logic for failed tasks. Usually People suffer from local directory full, if lots of tasks fail. Hadoop Committer don’t have cleaning logic.
  3. When task commit is initiated, it sends multiple multipart PUT requests to Object storage. We have added support to send PUT requests in parallel. This is useful when executor is running with few cores.
  4. Every PUT request we make is saved in a file in hadoop. It saves meta related with PUT in a file.
  5. The Standard FileOutputCommitter V1 is used to manage the commit/abort of these files. That is: it copies only those lists of files to commit from successful tasks into a temporary job commit directory.
  6. When job is committed finally by driver, committer reads the pending file list for every task committed in HDFS, and completes those put requests by sending complete post request to Object Storage. Hadoop committer here don’t consider failure scenario, if somebody kills driver in between. Some files will be visible and output directory will be in inconsistent state. We have handled this in our commit protocol. Whenever job succeed we write a success flag, if next job run don't see success marker, it checks and clean old job files in S3 Object storage using meta of previous run.
  7. To handle inconsistent directory view, we have written a listing layer on top of S3. We update all files written successfully here. All subsequent jobs in pipeline use this for file listing.
  8. Also implemented connection caching at executor level. Single connection created per executor.

Essence is we don’t use rename operation anywhere . It saves lot of data copy in between storage layer and also provides consistent view of data.

B. Random Access Issue while reading ORC Files:

We noticed that the network traffic to Spark executor was many times larger than the amount of input data requested by spark executor. We used wireshark to capture s3 get data requests. We observed meta requests to ORC file were carrying more data than asked.

Lets understand read process of ORC File:

An ORC file contains groups of row data called stripes, along with auxiliary information in a file footer. At the end of the file a postscript holds compression parameters and the size of the compressed footer.

First time we read postscript and then file footer which is few KB in size . Then we get stripe information which is also few KB in size and read stripe footer and in end we read Row data.

Now what happens is when ORC Reader ask for File Postscript, it asks for 16KB of data. But S3 reader implements streaming reader, which reads additional data due to various system buffers. We can’t simply close stream after reading data as closing stream close connection and creating new connection is very heavy process. We have to drain the data left in stream to process next request. So we observed while S3 recommends buffer sizes to be large in MB’s while reading ORC it was having performance implications. We adjusted buffer size after doing many trails and this helped decreasing network traffic by half and speed up query performance by 2 times in simple read and 5x-10x in filter push down and filter cases.

C. S3 Connection issue

Our object storage had upper limit of 3600 connections for optimum performance. Although more connections could be created, but it resulted in performance degradation. We cached S3 File system connection wherever possible.

--

--