Improving efficiency and reducing runtime using S3 read optimization

Pinterest Engineering
Aug 12 · 6 min read

Bhalchandra Pandit | Software Engineer

Overview

We describe a novel approach we took to improving S3 read throughput and how we used it to improve the efficiency of our production jobs. The results have been very encouraging. A standalone benchmark showed a 12x improvement in S3 read throughput (from 21 MB/s to 269 MB/s). Increased throughput allowed our production jobs to finish sooner. As a result, we saw 22% reduction in vcore-hours, 23% reduction in memory-hours, and similar reduction in run time of a typical production job. Although we are happy with the results, we are exploring additional enhancements in the future. They are briefly described at the end of this blog.

Motivation

We process petabytes of data stored on Amazon S3 every day. If we inspect the relevant metrics of our MapReduce/Cascading/Scalding jobs, one thing stands out: slower than expected mapper speed. In most cases, the observed mapper speed is around 5–7 MB/sec. That speed is orders of magnitude slower compared to the observed throughput of commands such as aws s3 cp, where speeds of around 200+ MB/sec are common (observed on a c5.4xlarge instance in EC2). If we can increase the speed at which our jobs read data, our jobs will finish sooner and save us considerable time and money in the process. Given that processing is costly, these savings can add up quickly to a substantial amount.

S3 read optimization

The Problem: Throughput bottleneck in S3A

If we inspect implementation of the S3AInputStream, it is easy to notice the following potential areas of improvement:

  1. Single threaded reads: Data is read synchronously on a single thread which results in jobs spending most of the time waiting for data to be read over the network.
  2. Multiple unnecessary reopens: The S3 input stream is not seekable. A split has to be closed and reopened repeatedly each time one performs a seek or encounters a read error. The larger the split, the greater the chance of it happening. Each such reopening further slows down the overall throughput.

The Solution: Improving read throughput

Architecture

Figure 1: Components of a prefetching+caching S3 reader

Our approach to addressing the above-mentioned drawbacks includes the following:

  1. We treat a split to be made up of fixed sized blocks. The size defaults to 8 MB but is configurable.
  2. Each block is read asynchronously into memory before it can be accessed by a caller. The size of the prefetch cache (in terms of number of blocks) is configurable.
  3. A caller can only access a block that has already been prefetched into memory. That delinks a client from network flakiness and allows us to have an additional retry layer to increase the overall resiliency.
  4. Each time we encounter a seek outside of the current block, we cache the prefetched blocks in the local file system.

We further enhanced the implementation to make it a mostly lock-free producer-consumer interaction. This enhancement improves read throughput from 20 MB/sec to 269 MB/sec as measured by a standalone benchmark (see details below in Figure 2).

Sequential reads

Any data consumer that processes data sequentially (for example, a mapper) greatly benefits from this approach. While a mapper is processing currently retrieved data, data next in sequence is being prefetched asynchronously. Most of the time, data has already been pre-fetched by the time the mapper is ready for the next block. That results in a mapper spending more time doing useful work and less time waiting for data, thereby effectively increasing CPU utilization.

More efficient Parquet reads

Parquet files require non-sequential access as dictated by their on-disk format. Our initial implementation did not use a local cache. Each time there was a seek outside of the current block, we had to discard any prefetched data. That resulted in worse performance compared to the stock reader when it came to reading from Parquet files.

We observed significant improvement in the read throughput for Parquet files once we introduced the local caching of prefetched data. Currently, our implementation increases Parquet file reading throughput by 5x compared to the stock reader.

Improvement in production jobs

Improved read throughput leads to a number of efficiency improvements in production jobs.

Reduced job runtime

The overall runtime of a job is reduced because mappers spend less time waiting for data and finish sooner.

Potentially reduced number of mappers

If mappers take sufficiently less time to finish, we are able to reduce the number of mappers by increasing the split size. Such reduction in the number of mappers leads to reduced CPU wastage associated with fixed overhead of each mapper. More importantly, it can be done without increasing the run time of a job.

Improved CPU utilization

The overall CPU utilization increases because the mappers are doing the same work in less time.

Results

For now, our implementation (S3E) is in a separate git repository to allow faster iterations over enhancements. We will eventually contribute it back to the community by merging it back into S3A.

Standalone benchmark

Figure 2 shows the results of a benchmark that compares reading from S3 using S3A versus using S3E.
Figure 2 shows the results of a benchmark that compares reading from S3 using S3A versus using S3E.
Figure 2: Throughput of S3A vs S3E

In each case, we read a 3.5 GB S3 file sequentially and wrote it locally to a temp file. The latter part is used to simulate IO overlap that takes place during a mapper operation. The benchmark was run on a c5.9xlarge instance in EC2. We measured the total time taken to read the file and compute the effective throughput of each method.

Production run

We tested many large production jobs with the S3E implementation. Those jobs typically use tens of thousands of vcores per run. In Figure 3, we present a summary of comparison between metrics obtained with and without S3E enabled.

Measuring resource savings

We use the following method to compute resource savings resulting from this optimization.

resource savings vcore reduction = job vcores without S3E — job vcores with S3E memory reduction = job memory without S3E — job memory with S3E vcores job vcores = mapper vcores + reducer vcores mapper vcores = number of mappers x average duration of mappers reducer vcores = number of reducers x average duration of reducers memory job memory = mapper memory + reducer memory mapper memory = number of mappers x average duration of mappers x average physical memory per mapper reducer memory = numbe
resource savings vcore reduction = job vcores without S3E — job vcores with S3E memory reduction = job memory without S3E — job memory with S3E vcores job vcores = mapper vcores + reducer vcores mapper vcores = number of mappers x average duration of mappers reducer vcores = number of reducers x average duration of reducers memory job memory = mapper memory + reducer memory mapper memory = number of mappers x average duration of mappers x average physical memory per mapper reducer memory = numbe

Observed results

Figure 3 illustrates how the increased throughput allowed each mapper to finish sooner resulting in a significant reduction in VCPU days and GB days.
Figure 3 illustrates how the increased throughput allowed each mapper to finish sooner resulting in a significant reduction in VCPU days and GB days.
Figure 3: Comparison of MapReduce job resource consumption

Given the variation in the workload characteristics across production jobs, we saw vcore reduction anywhere between 6% and 45% across 30 of our most expensive jobs. The average saving was a 16% reduction in vcore days.

One thing that is attractive about our approach is that it can be enabled for a job without requiring any change to a job’s code.

Future direction

At present, we have added the enhanced implementation to a separate git repository. In the future, we would likely update the existing S3A implementation and contribute back to the community.

We are in the process of rolling out this optimization across a number of our clusters. We will publish the results in a future blog.

Given that the core implementation of S3E input stream does not depend on any Hadoop code, we can use it in any other system where large amounts of S3 data is accessed. Currently we are using this optimization to target MapReduce, Cascading, and Scalding jobs. However, we have also seen very encouraging results with Spark and Spark SQL in our preliminary evaluation.

The current implementation can use further tuning to improve its efficiency. It is also worth exploring if we can use past execution data to automatically tune the block size and the prefetch cache size used for each job.

To learn more about engineering at Pinterest, check out the rest of our Engineering Blog, and visit our Pinterest Labs site. To view and apply to open opportunities, visit our Careers page.

Pinterest Engineering Blog

Inventive engineers building the first visual discovery engine, 200 billion ideas and counting.