Lightspeed S3 uploads

Aurélien Didier
Inside Doctrine
Published in
8 min readDec 15, 2021
Photo by Ahsan Avi on Unsplash

S3, one the first bricks of Amazon Web Services is a service for objects storing. With hundreds of trillions of objects handled and even more concurrent requests consumed, it is a key service for distributed objects storage.

At Doctrine, we process a large variety of data collections, ranging from raw unstructured documents to databases archives, as small as a few KB to several MB or GB several times a day. Among all these tasks, we’ll focus on one that does not seem to perform really well and show how we managed to get a 100x improvement in processing time.

Context

Initial setup

We orchestrate all our tasks with Apache Airflow (https://airflow.apache.org/). The important thing that helps to understand this article is that Airflow will schedule and execute dozens of tasks for us.

The one particular process we want to share about deals with an archive of JSON files. Within Airflow the pipeline is defined as such:

  1. download the zip archive and store it on S3
  2. download the zip from S3, extract all files and upload them to another bucket on S3
  3. Process the json collection

(1) and (2) are separate in order for us to keep a backup and a state of the archive.
(2) is the task that’s going to be interesting here.

Following the basic S3 operators Airflow has, we’ve come up with an operator that:

  1. using the S3 hook, downloads the archive
  2. unzips all the files with python’s ZipFile to temporary folders
  3. using the S3 hook, uploads all the extracted files

All of that seems pretty straightforward and is actually a good first version since we rely on Airflow hooks, we have a clear separation between sub-tasks, and all of this is easily tested.

Problem

Small archives are perfectly fine, but as they grew in size, the process became too slow. That fast enough task became a bottleneck task. Right now the archive gathers ~400k files.

Measures

In fact, we process this kind of archive on a daily basis, and at some point the collection we’re processing grew enough to make the task last about 40 hours! In the end, that still works but that means each day we get the data from 2 days before…

Our unzip-to-s3 task, completing in 40 hours

Our first uneducated reaction was to give more resources to the process but we wouldn’t operate at maximum capacity.

On a seven core machine (actually 8 minus 1 reserved), we would only use 0.2 CPU at a time and a constant 4GB of memory just for file handlers. That’s where we started looking for an optimisation.

Key factors

Looking at those charts and the Airflow hook implementation, here are the key factors we’ve been looking at:

  1. Disk: do we really need it?
  • we’re using only temporary folders and files, so no need for persistence in any way
  • each function / service we use expects a file stream, so we could provide a memory object

2. CPU: we’re obviously doing sequential operations

  • we don’t have any shared information or shared state between all the files that are being uploaded, so we should be able to distribute the upload over several processes or threads safely
  • S3 is obviously large enough so we can send several files at a time
  • files within the archive range from a few KB to a few MB, so they are small enough for us to deal with several at a time on a single machine
  • downloading and unzipping are pretty fast at that point and the bottleneck is definitely the upload of all the extracted files

Revamping

Remove the disk usage

As we said, we really don’t need to use the disk for such an operation. 2 operations are using the disk:

  • the downloaded archive file
  • all the extracted files

Handle a file download in memory

Here is a glimpse of how we started:

We’re handling one temporary file that’s the archive we’re downloading, and then all the extracted files go to a temporary folder.

We can find here the actual Airflow S3 hook and the usage of two of its main methods: download_fileobj and load_file.

Since we’re using file_obj, we can actually provide seekable memory files instead of disk through these kind of changes:

When it comes to download in memory, prefer S3 resources instead of client (hook). On the other end we can upload the content from memory instead of a disk file.

In fact, there’s nothing really new here, we’re just cutting some extra steps. The code is significantly shorter and more readable. In addition, using only memory does not mean you’ll necessarily use more of it. In fact, the BytesIO.read() method uses lazy loading, coupled with ZipFile, that can do it too, we're using considerably less memory (see more in Performance → Resources).

Concurrent uploads

Unfortunately the native S3 hook in Airflow, an actual S3 client, is not entirely thread-safe (it is for some actions though, hence the “not entirely”). That’s where a magic tool from boto3 comes: S3Transfer. This package comes natively with boto3, so evereybody should be able to use it right away.

In short, here is what S3Transfer brings to the game:

  • a thread-safe managerTransferManager to which we provide an S3 client
  • an extensive configuration for the manager: TransferConfig

Here is an illustration of how simple it is to use:

That’s pretty straightforward. For the sake of the example, I’ve setup as many concurrent calls as threads I want to use. Among all the things we can configure, max_submission_concurrency is the number of concurrent calls for uploads. But we should look into some measures before setting this value, depending on the size of the files we're dealing with.

Then, back to the previous example, where we removed the disk usage:

Here is what we’re doing here:

  • define a pure (stateless) function that wraps the upload using the TransferManager instance
  • setup a pool of threads, sized against the concurrency we’ve requested in the TransferManager configuration
  • distribute the function calls with the whole list of files from the archive

Key factors

  • we’re distributing over threads and not processes, since we’re opening a single archive, e.g. a shared memory object. Handling several archives could be done through multiprocessing since these archives could be safely distributed over several processes, or even on several machines as the handling of any archive is separate from the others.
  • we’re providing the concurrency factor. This needs to have a proper initial evaluation since it depends on the size of the files we are uploading and the actual machine we are running on. With small files in that case, we can play safely with a lot of threads.
  • we’re leveraging lazy loading in the thread pool with imap_unordered (it would have worked with imap as well) and by providing zipfile.namelist(). imap is to iterators (lazy-loading) what map is to lists, hence the usage of imap now.

Performance

Our latest satisfying test lead us to use 40 threads for the tasks. This is not the final configuration as there’s still some room for more resources.

Time

So far we’ve reached a processing time a little bit above 24 minutes:

The two improvements described above are illustrated in that chart:

We can see 2 successive improvements

First a drop from 40 hours to 8 hours when we stopped using the disk, then a drop from 8 hours to 24 minutes when we started distributing over threads. And with a similar archive file each time. In fact, the archive we’re getting continues growing.

Resources

So, we have roughly made a 5x by dropping the disk usage, and then a 20x when distributing over 40 threads. Meaning that each upload seems to be twice as long as the original upload with the client but we can do lot more at a time.

Resources usage after optimisation

As we started describing above, using the memory more does not mean using more memory. In fact, we have an easier access to lazy-loading capable methods and we’re actually saving some. From a constant 4GB usage, we have dropped to less than 900MB. That’s an unexpected yet interesting ~4x improvement.

Regarding the CPU usage, we’ve significantly increased it but it’s still pretty low compared to the machine’s resources. We’re using about 2 cores per second which is approximately 10x as more usage as the sequential upload, but we do upload 100x times faster (from 40 hours to 24 minutes), so we practically go 10x faster for each file! The yield is far more profitable.

It’s a satisfying setting but there’s still room for improvement.

It’s not just about the upload

More things to put in parallel

We have talked about the upload, and there’s obviously some equivalent optimisations for the other operations, such as the download.

Download large quantity of files

Let’s start with the symmetric operation: operate on a large quantity of files, but this time we’re talking about downloading from S3.

When setting the transfer manager configuration, we can submit a max_request_concurrency value, in order to allow as many concurrent operations between S3 and our machine. This defaults to 10, so we could already set threads for up to 10 parallel transfers, which would lead, in optimal conditions, to a 10x faster download than the regular client.

Download large files

This one is pretty cool! Whenever we have to handle large files, we can tell the TransferManager to download parts of it in a concurrent way. It does not change anything in the end for us, it still results in a single file at the end of the process.

With a regular client:

The time needed to download a file grows as the file size grows

With a TransferManager:

The time needed to download a file can be cut by the number of threads provided

That’s rather useless for small files, but improvements are staggering for files that are a few GB.

In fact, there are even more settings that can be provided for the download than for the upload in S3Transfer!

What about the AWS cli?

We’ve been focused on boto3 with a programmatic approach and there’s still the CLI option. The CLI is undeniably fast and this is no surprise because it actually uses a TransferManager as well, see the code at https://github.com/aws/aws-cli/blob/develop/awscli/customizations/s3/s3handler.py

It is perfectly set for operations such as copies (command cp), or file move (command mv) since it involves only native operations. Also the concurrency is automatically configured depending on the quantity of files. There are still some hard limits that can be tweaked by providing a transfer manager configuration with the command aws s3 config. The CLI really focuses on the download but for uploads boost, I’d recommend to dive into S3Transfer.

Conclusion

It’s been an exciting trip in the design and implementation of boto3 and the CLI. A deep dive into the library helps a lot anyone who’s facing performances bottlenecks with S3. There are lots of impressive capabilities.

Break your previous design and look for 10x improvements instead of focusing on linear refinement that will give you a few 10% in performance.

I hope this helps!

--

--