Tracking capacity usage using a cache

Recently, I had to find a way to download close to 30 000 000 rows of data from AWS DynamoDB locally so I could run some analysis on the data. The downloading by itself is relatively easy using the scan operation. But there is a cost to using that operation. And like most AWS services, capacity usage is something we generally need to predetermine and make sure we don’t go over. I won’t go into details about scan and cost calculation as it is already documented here: http://docs.aws.amazon.com/amazondynamodb/latest/developerguide/QueryAndScan.html. It should suffice to know that there is a way to query the capacity used on a particular operation.

The download portion of the code is rather simple. Here’s an example:

Map<String, AttributeValue> lastProcessedKey;

do {

ScanRequest reqest = new ScanRequest()
.withTableName(...)
.withAttributesToGet(...)
.withReturnConsumedCapacity(ReturnedConsumedCapacity.TOTAL);

if (null != lastProcessedKey) {
request.setExclusiveStartKey(lastProcessedKey);
}

// sleep a bit if needed since we don't want to use up too much read capacity
throttle();
ScanResult result = dynamo.scan(request);

//TODO: implement code to handle the results

lastProcessedKey = result.getLastEvaluatedKey();

} while (null != lastProcessedKey);

The throttle() method is for making the code stop making requests for a small amount of time. This bit is important to prevent making too many calls and consuming too much read capacity. Otherwise, Dynamo itself will just start throttling and possibly throwing exceptions.

At first glance, we can make throttle do something very simple such as sleep for 1000 ms. But that can be inefficient as we may be able to do execute more calls between sleeps? To put in different words, what if we could dynamically throttle?

As it turns out, it is possible to calculate the total amount of thread sleeping based on the total consumed capacity over the last second. We choose the time to be 1 second because DynamoDB’s capacity usage is based over the last second. So we basically need some way to keep track of usage, but only for the last second. If we used a counter, we need to somehow decrement the counter after a second has passed, but it also has to be the correct amount as well. This sounds like a rolling window. But how can we easily implement this without having to write our own code to keep track of time or buckets?

This is where using Google’s Guava library comes in really handy, specifically its Cache implementations. Here’s the code to track the last second using a cache.

import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;

import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.RemovalListener;
import com.google.common.cache.RemovalNotification;


public class CapacityUsage {

private AtomicLong counter = new AtomicLong(0L);
private AtomicInteger consumedSoFar = new AtomicInteger(0);

private final int limit;
private final Cache<Long, Integer> consumptionTracking;

private RemovalListener<Long, Integer> removalListener = new RemovalListener<Long, Integer>() {

@Override
public void onRemoval(RemovalNotification<Long, Integer> notification) {
consumedSoFar.addAndGet(-1 * notification.getValue());
}

};

public CapacityUsage() {
this(30, 1L, TimeUnit.SECONDS);
}

public CapacityUsage(int limit, long expireAfterDuration, TimeUnit units) {
this.limit = limit;

this.consumptionTracking = CacheBuilder.newBuilder()
.expireAfterWrite(expireAfterDuration, units)
.removalListener(removalListener)
.build();
}

public void recordConsumption(int consumed) {
this.consumedSoFar.addAndGet(consumed);
consumptionTracking.put(counter.getAndIncrement(), consumed);
}

public boolean isLimitBreached() {
return consumedSoFar.get() > limit;
}

public int getAvailableCapacity() {
return Math.max(0, limit - consumedSoFar.get());
}

public int getLimit() {
return limit;
}

}

As you can see, the Cache is created to expire elements given a time duration and time unit. As we continually call recordConsumption(…), consumedSoFar will increment. But as elements expire, consumedSoFar will decrement.

Here’s the download code that records the consumption.

Map<String, AttributeValue> lastProcessedKey;

do {

ScanRequest reqest = new ScanRequest()
.withTableName(...)
.withAttributesToGet(...)
.withReturnConsumedCapacity(ReturnedConsumedCapacity.TOTAL);

if (null != lastProcessedKey) {
request.setExclusiveStartKey(lastProcessedKey);
}

// sleep a bit if needed since we don't want to use up too much read capacity
throttle();
ScanResult result = dynamo.scan(request);

// record usage here!
this.capacityUsage.recordConsumption(result.getConsumedCapacity().getCapacityUnits().intValue());

// TODO: implement code to handle the results

lastProcessedKey = result.getLastEvaluatedKey();

} while (null != lastProcessedKey);

Given that, we can have the throttle method look like so:

private void throttle() {
try {

int available = capacityUsage.getAvailableCapacity();
Thread.sleep((int) (1250.0 * (1.0 - (available/capacityUsage.getLimit()))));

} catch (InterruptedException e) {
log.warn("Failed to throttle", e);
}
}

I’ve given the max time to sleep as 1.25 seconds to give some buffer. And as you can see, the total time to sleep is a percentage based on available capacity. With a capacity limit of 50, I was able to keep our ReadCapacity usage hovering around 50 the whole time.

Like what you read? Give Will Wang a round of applause.

From a quick cheer to a standing ovation, clap to show how much you enjoyed this story.