How Rue La La Bulk Loads into DynamoDB

How we scale DynamoDB for bulk writes

Stephen Harrison
Rue Gilt Groupe Tech Blog
10 min readMar 25, 2018

--

The main story

This article is about one aspect of a larger project we did at Rue La La. See the following for the big picture.

Background

Our Data Science API usage pattern is 100% read-dominated 23+ hours a day, and write dominated for short windows of about 20 mins, triggered by Databricks Apache Spark jobs writing data to S3. We bulk load our data in one big batch, usually in the wee hours. Then there are no writes at all until the next time the data science completes and we start the bulk load all over again. This is usually daily, but can be more often for up-to-date recommendations for changing flash sales.

The upshot

We can use DynamoDB autoscaling for reads. While the read traffic patterns are far from smooth because Rue La La is a flash sale site, changes in capacity typically occur over periods of minutes in the lead up to our usual sale times like 11am and 3pm. This is the target use case for DynamoDB auto-scaling. This is in contrast to bulk writes, where we’re either loading as fast as we can or not at all. This is most definitely not the target use case for DynamoDB auto-scaling. So for bulk writes we must scale DynamoDB capacity ourselves.

We stream DynamoDB writes from the data science jobs’ output in S3 with a small footprint custom importer running in AWS Batch, which runs Docker images. This importer adjusts provisioned write capacity directly before and after the import. We show you the code and configuration you’ll need to pull this off.

The problem

DynamoDB does not offer integrated bulk load directly. The recommended approach is to create an AWS Data Pipeline to import from S3. We found deal-breaker issues with this approach, including that Data Pipeline and EMR are finicky to set up, lag behind Spark updates, and do not provide sufficient—or often any—logging for key failures.

The example Data Pipeline scenarios read files from S3 and write the corresponding DynamoDB items. Apache Spark distributes the Dataset in memory across EC2 instances in a cluster. The cluster is large when the data is large. This makes perfect sense when you’re playing to Spark’s strengths by operating on the data. However, when writing to DynamoDB we only need a few items at a time to batch writes efficiently. Reading a whole Dataset into large-scale distributed memory, for example, is wasteful.

Import tasks must also map the input schema to the exact format of a DynamoDB Item. We do not assume Rue Data Science jobs know anything about how data is stored and delivered. In fact, we deliberately align the output format of Data Science tasks to the input schema, usually from our Snowflake Data Warehouse. This removes surprises with naming of output columns.

AWS Glue is promising, but does not directly support DynamoDB as an endpoint as of this writing.

First we tried DynamoDB auto-scaling

Writing data at scale to DynamoDB must be done with care to be correct and cost effective. We started by setting the provisioned capacity high in the Airflow tasks or scheduled Databricks notebooks for each API data import (25,000+ writes per second) until the import was complete. Then setting it back down to 1 when the importers completed. We sometimes forgot, which cost money, sometimes a lot, for write capacity we weren’t consuming. And sometimes the initial capacity increase failed, which caused the imports to take too long or just fail.

We started by writing CloudWatch alarms on write throttling to modulate capacity. We had some success with this approach. Then Amazon announced DynamoDB autoscaling. It works for some important use cases where capacity demands increase gradually, but not for others like all-or-nothing bulk load.

There’s also an unpleasant side-effect you need to know about that was a deal-breaker for our use case. More on that shortly.

We had to pay particular attention to the following:

  • The console UI does not expose two important properties of the autoscaling we found valuable: ScaleInCooldown and ScaleOutCooldown, which default to 0. These settings limit the rate of successive events for scale in and out, requesting a pause of at least the given number of seconds between changes. In order to give these parameters non-zero values you must use the SDK, the AWS CLI, or CloudFormation
  • Even with the cool-down parameters set to limit the frequency of capacity changes, we had little or no control over the magnitude of changes. Consequently, capacity was reduced to minimum even when there was just a pause in imports. This can happen, for example, because all data API importers share bounded compute clusters in AWS Batch and are competing for resources. We’re trying to limit the maximum size of these clusters since unchecked they’re a significant expense.
  • You’re only allowed four capacity decreases per table per day. So a transient decrease counts as one of these. And before you know it, you’re out of available decreases so must remember to decrease manually at the next window, currently after an additional two hours.
  • But perhaps the worst surprise of all was that auto scaling only gets triggered when there’s traffic. So after we autoscale up with bulk imports we stay at the expensive high capacity when we abruptly stop importing and our traffic goes to zero. We have a nice solution to this issue, which we’ll publish shortly.

We ran a custom importer for each file in the dataset

Our original design sent S3 events for each part... file in the Data Science export to the batch importer as follows.

We started with an AWS Batch Job per part file

This has the advantage that we could set arbitrary DynamoDB capacity and AWS Batch cluster size to maximize throughput. But we quickly found this apparent benefit was outweighed by the lack of sufficiently fine grained control over pipeline throughput. Consequently, some part... files were stalled waiting for AWS Batch cluster capacity, while others were running into DynamoDB write throttling constraints. Overall throughput suffered, data skew between related datasets was evident, and AWS Batch cluster sizes were often large and mostly dormant.

The final version imports the whole dataset at once

It seems obvious to us now, but this is the model that allows the greatest throughput with the fewest AWS resources. The code is a bit subtle in places. We’ll show you why and how we address the nuances.

We only send ans3:ObjectCreated event for a single sentinel file in each dataset. We picked the _committed... file written by Spark, which conveniently contains the manifest of data files in the dataset and is written at the end of the Spark export when they’re all available in S3 and stable.

One import AWS Batch Job per dataset is where we ended up

This single importer has to be a bit smarter now. First, it must unpack or discover the part... files to be imported. Next, it must set sufficient DynamoDB write capacity before any data is imported and wait for the provisioning to succeed. (Actually, even before that it must wait for any pending scaling on the table to complete or the request will fail. We ran into this during testing quite often.) But when it’s underway, the importer has full control over parallelism and throughput. We found it invaluable to expose a number of tuning parameters through environment variables in the importer as follows:

  1. JOB_CPUS and JOB_MEMORY tell the Lambda how to configure and launch the Dicker container for the AWS Batch job
  2. THREAD_COUNT configures the concurrency of the importer
  3. MAX_WRITE_CAPACITY and MIN_WRITE_CAPACITY define the DynamoDB target write capacity we need during and after the import respectively. (Target capacity is a key insight we learned early on. See details below why it’s ok not to even attempt to change capacity in some circumstances.)

You need a special InputStream for long-running S3 reads

It’s easy to open an S3ObjectInputStream to a file in S3. But AWS S3 API reads are HTTP requests and those time out if you wait too long to complete a read, typically a few minutes. All our data science import jobs have long running reads.

So just opening an S3 file using the provided Java InputStream implementation is impractical, because all these files are large and reads for successive byte blocks could be infrequent while we wait for DynamoDB write capacity to spin up or other blocking DynamoDB operations. So you have to either read the whole file at once (impractical because of limited memory) or take a streaming approach.

We wrote a custom InputStream that reads sequential smaller byte blocks from S3, typically 1Mb. The implementation uses the underlying S3ObjectInputStream to access data in S3 without the need to keep a connection open for more than about 100ms. We read bytes from the S3 file, convert to lines, deserialize each JSON structure, convert to DynamoDB Items, and write the results to DynamoDB. This sounds complicated, but it’s a pretty straightforward data pipeline. We made it nicely threaded and non-blocking, because why not.

See the source for S3InputStream at the end of this story. (And if anyone can point us to a suitable abstract class that provides the common boilerplate, please let us know and we’ll fix it with pleaure. We should probably have made our own come to think of it.)

Should I use DynamoDBMapper?

No. Well maybe yes, if you can guarantee low volume and fixed columns. But definitely no for all other cases. DynamoDBMapper is positioned as a higher level interface for DynamoDB CRUD. It uses Java class annotations. Here are its pros and cons.

The DynamoDBMapper API will take a JavaIterable of instances of classes annotated with @DynamoDBDocument, and write micro-batches of 25 using the low level DynamoDB API. It handles capacity-related retries via a callback where you provide the retry interval, probably using Fibonacci back-off.

You might think that by passing an Iterable we’d be safe assuming the micro-batches were streaming. Unfortunately, DynamoDBMapper dereferences the entireIterable to create all the micro-batches before writing the first one. So for data of any significant size, you’ll find your whole Iterable gets converted to an in-memoryList and you may run out of memory.

Since the @DynamoDBDocument annotation is applied to concrete Java classes, the bean properties are mapped to DynamoDB columns. You can map properties to columns with different names with optional binding names in the annotation, but you can’t (easily) use dynamic columns.

Modulating DynamoDB write capacity effectively

Out initial attempts at implementing effective scaling didn’t work because capacity control in Airflow our job scheduler, extrinsic to the importer. As it turns out, we were way too focused on separation of concerns. The issues we ran into were mainly concerned with synchronizing the timing of operations between Airflow and the importer. We could not guarantee proper synchronization between Airflow and the import tasks, so we did not always complete the corresponding capacity decreases, leading to AWS billing surprises. So we put the write capacity provisioning control in the importer itself. It works without issue now.

We have a pair of methods increaseWriteCapacity() and decreaseWriteCapacity() defined as follows.

It’s important that the common method, setWriteCapacity() must eliminate spurious requests to change capacity as follows

  1. If the capacity is already at least high (or low) enough, then ignore the request because honoring it will have the opposite effect.
  2. If the capacity is already equal to the requested capacity, ignore that too, because same-value provisioning API requests throw an exception in the SDK. However, if you’re just changing the write capacity, for example, you have to copy the current read capacity and pass that in the request. You have to set both at the same time.

Here’s the code for this

Changing DynamoDB provisioned capacity needs care

We use the provided predicate passed to setWriteCapacity() to filter out requests that would either fail or inadvertently change capacity in the opposite direction.

Simple and efficient multithreading in the importer

As we already noted, the importer 100% I/O bound: it’s always waiting for reads of byte blocks from S3 or writes of items to DynamoDB to complete.

Rather than add global throttling in the importer across threads, we did sometime much simpler. We matched the DynamoDB write capacity to the observed total throughput in the importer for a single thread, taking the speed we can read S3 into consideration. Then all we had to do was configure the number of threads to achieve optimum throughput.

We create a thread pool, Executors.newFixedThreadPool(IMPORT_THREADS), and schedule CompletableFutures in this bounded pool with

Then create all the CompletableFutures, one for each file

Finally, wait for them all to finish and log a nice celebration with

Apologies if this was old hat to you, but we found too many examples of threading made unnecessarily complex because they didn’t take advantage of some of the excellent support already available to you in the standard Java libraries.

We typically provision 10,000+ WCUs for a single DynamoDB table and match that to 4 threads in the importer consuming and converting the data in S3 files.

In summary

The Data Science team at Rue La La turned to DynamoDB for storing medium to large data backing APIs.

Bulk loading data into DynamoDB must be done with care. So must reading large files from S3. So must matching throughput and parallelism in the importer. The provided tools have some issues that required work-arounds.

We hope you have been able to see how we approached each of these issues. If you have questions, or have a better way to accomplish the same thing please let us know.

Appendix A: Source for our bulletproof S3InputStream

If you’re reading large S3 files, then this Java class is for you. We don’t time out and we don’t fill up memory.

--

--