Improving efficiency and reducing runtime using S3 read optimization
Bhalchandra Pandit | Software Engineer
We describe a novel approach we took to improving S3 read throughput and how we used it to improve the efficiency of our production jobs. The results have been very encouraging. A standalone benchmark showed a 12x improvement in S3 read throughput (from 21 MB/s to 269 MB/s). Increased throughput allowed our production jobs to finish sooner. As a result, we saw 22% reduction in vcore-hours, 23% reduction in memory-hours, and similar reduction in run time of a typical production job. Although we are happy with the results, we are exploring additional enhancements in the future. They are briefly described at the end of this blog.
We process petabytes of data stored on Amazon S3 every day. If we inspect the relevant metrics of our MapReduce/Cascading/Scalding jobs, one thing stands out: slower than expected mapper speed. In most cases, the observed mapper speed is around 5–7 MB/sec. That speed is orders of magnitude slower compared to the observed throughput of commands such as aws s3 cp, where speeds of around 200+ MB/sec are common (observed on a c5.4xlarge instance in EC2). If we can increase the speed at which our jobs read data, our jobs will finish sooner and save us considerable time and money in the process. Given that processing is costly, these savings can add up quickly to a substantial amount.
S3 read optimization
The Problem: Throughput bottleneck in S3A
If we inspect implementation of the S3AInputStream, it is easy to notice the following potential areas of improvement:
- Single threaded reads: Data is read synchronously on a single thread which results in jobs spending most of the time waiting for data to be read over the network.
- Multiple unnecessary reopens: The S3 input stream is not seekable. A split has to be closed and reopened repeatedly each time one performs a seek or encounters a read error. The larger the split, the greater the chance of it happening. Each such reopening further slows down the overall throughput.
The Solution: Improving read throughput
Our approach to addressing the above-mentioned drawbacks includes the following:
- We treat a split to be made up of fixed sized blocks. The size defaults to 8 MB but is configurable.
- Each block is read asynchronously into memory before it can be accessed by a caller. The size of the prefetch cache (in terms of number of blocks) is configurable.
- A caller can only access a block that has already been prefetched into memory. That delinks a client from network flakiness and allows us to have an additional retry layer to increase the overall resiliency.
- Each time we encounter a seek outside of the current block, we cache the prefetched blocks in the local file system.
We further enhanced the implementation to make it a mostly lock-free producer-consumer interaction. This enhancement improves read throughput from 20 MB/sec to 269 MB/sec as measured by a standalone benchmark (see details below in Figure 2).
Any data consumer that processes data sequentially (for example, a mapper) greatly benefits from this approach. While a mapper is processing currently retrieved data, data next in sequence is being prefetched asynchronously. Most of the time, data has already been pre-fetched by the time the mapper is ready for the next block. That results in a mapper spending more time doing useful work and less time waiting for data, thereby effectively increasing CPU utilization.
More efficient Parquet reads
Parquet files require non-sequential access as dictated by their on-disk format. Our initial implementation did not use a local cache. Each time there was a seek outside of the current block, we had to discard any prefetched data. That resulted in worse performance compared to the stock reader when it came to reading from Parquet files.
We observed significant improvement in the read throughput for Parquet files once we introduced the local caching of prefetched data. Currently, our implementation increases Parquet file reading throughput by 5x compared to the stock reader.
Improvement in production jobs
Improved read throughput leads to a number of efficiency improvements in production jobs.
Reduced job runtime
The overall runtime of a job is reduced because mappers spend less time waiting for data and finish sooner.
Potentially reduced number of mappers
If mappers take sufficiently less time to finish, we are able to reduce the number of mappers by increasing the split size. Such reduction in the number of mappers leads to reduced CPU wastage associated with fixed overhead of each mapper. More importantly, it can be done without increasing the run time of a job.
Improved CPU utilization
The overall CPU utilization increases because the mappers are doing the same work in less time.
For now, our implementation (S3E) is in a separate git repository to allow faster iterations over enhancements. We will eventually contribute it back to the community by merging it back into S3A.
In each case, we read a 3.5 GB S3 file sequentially and wrote it locally to a temp file. The latter part is used to simulate IO overlap that takes place during a mapper operation. The benchmark was run on a c5.9xlarge instance in EC2. We measured the total time taken to read the file and compute the effective throughput of each method.
We tested many large production jobs with the S3E implementation. Those jobs typically use tens of thousands of vcores per run. In Figure 3, we present a summary of comparison between metrics obtained with and without S3E enabled.
Measuring resource savings
We use the following method to compute resource savings resulting from this optimization.
Given the variation in the workload characteristics across production jobs, we saw vcore reduction anywhere between 6% and 45% across 30 of our most expensive jobs. The average saving was a 16% reduction in vcore days.
One thing that is attractive about our approach is that it can be enabled for a job without requiring any change to a job’s code.
At present, we have added the enhanced implementation to a separate git repository. In the future, we would likely update the existing S3A implementation and contribute back to the community.
We are in the process of rolling out this optimization across a number of our clusters. We will publish the results in a future blog.
Given that the core implementation of S3E input stream does not depend on any Hadoop code, we can use it in any other system where large amounts of S3 data is accessed. Currently we are using this optimization to target MapReduce, Cascading, and Scalding jobs. However, we have also seen very encouraging results with Spark and Spark SQL in our preliminary evaluation.
The current implementation can use further tuning to improve its efficiency. It is also worth exploring if we can use past execution data to automatically tune the block size and the prefetch cache size used for each job.