Fast Critical Path with Async Queueing

[Cross-posted from http://coderstocks.blogspot.com/2015/02/fast-critical-path-with-async-queueing_18.html]

We were recently working on a large scale data migration project, moving hundreds of millions of records from one set of systems to another set of systems in an organized and very specific fashion. The overall system is comprised of several components including a scheduler, an exporter, an importer and an API to keep track of the state of each item.

The scheduler is responsible for pulling records from multiple sources and scheduling them for migration in accordance with our current strategy. The strategy might be something like, schedule records fairly using source-X and source-Y. Another example would be, prefer records from source-Z, but if there are none available, then fairly schedule records using source-X and source-Y. The sources range from SQL queries against multiple databases to SQS queues.

The scheduler also has “back pressure” so that it does not schedule too many records and get far ahead of the downstream systems by feeding too much data. It checks with the downstream system to see how much that system has left to process before attempting to feed more data. This allows us to be very nimble and switch strategies and sources as changing requirements dictate.

The initial implementation can be thought of as an infinite loop, where each iteration of the loop requests a specific number of records from each of the sources and then passes them along to the target system for processing. This is the critical path of the scheduler. If anything blocks this loop for too long, downstream systems may suffer as they run out of data to process and we are no longer at peak throughput.

This worked incredibly well initially. Eventually, some of those sources began to slow down. Some sources that were backed by SQL queries reached a point where we were not able to tune any more performance out of the queries. This slowness caused the scheduler to bottleneck the entire migration system.

We found out that the slow queries could be effectively sped up, in terms of records selected per second, by selecting large result sets since this would spread the query overhead over many more records, giving us a much higher effective throughput. The problem was, this could not be done in the critical path of the scheduler’s main loop since it would bottleneck the other sources and therefore the entire system each time a large-result-query ran.

In order to exploit the “larger result set” concept, we created a decorator class to wrap some of our existing LimitedSource interface. The LimitedSource interface declares one method,getNextBatch(int batchSize) which returns up to the requested number of items from the source. The decorator class, which we named QueueingLimitedSource, has a constructor with 3 parameters:

public QueueingLimitedSource(LimitedSource<T> wrapped, Integer capacity, Integer replenishThreshold) {
this.wrapped = checkNotNull(wrapped, "LimitedSource cannot be null");
this.capacity = checkNotNull(capacity, "QueueSize cannot be null");
this.replenishThreshold = checkNotNull(replenishThreshold, "ReplenishThreshold cannot be null");
checkArgument(capacity > replenishThreshold, "Capacity must be larger than ReplenishThreshold");
checkArgument(replenishThreshold >= 0, "ReplenishThreshold must be greater than or equal to zero");
    // Initialize the default task executor    ThreadPoolTaskExecutor defaultExecutor = new ThreadPoolTaskExecutor();
defaultExecutor.initialize();
this.executor = defaultExecutor;
}

The first parameter is the existing LimitedSource implementation, which returns new records to process. The second parameter is the desired capacity of the QueueingLimitedSource’s internal queue. The third parameter is the threshold at which the internal queue should be replenished.

Each time the scheduler asks a QueueingLimitedSource for records, theQueueingLimitedSource attempts to pull the requested number of items from it’s internal queue. Then it checks to see if the number of items in it’s queue is less than the replenish threshold. If it is, and the source is not already in the process of replenishing it’s internal queue, the source asynchronously replenishes it’s queue.

public List<T> getNextBatch(int batchSize) {
List<T> chunk = pullQueuedChunk(batchSize);
checkQueueSize();
return chunk;
}
private List<T> pullQueuedChunk(int resultSize) {
return dequeItems(resultSize);
}
private void checkQueueSize() {
int queueSize = queue.size();
LOG.info("Checking queue size: queue={} size={} capacity={} threshold={}",
queueName, queueSize, capacity, replenishThreshold);
if (queueSize <= replenishThreshold) {
replenish();
}
}
private void replenish() {
if (replenishing.compareAndSet(false, true)) {
executor.execute(replenishRunnable);
} else {
LOG.info("Skipping replenish of queue={} because it is already in progress", queueName);
}
}
private Runnable replenishRunnable = new Runnable() {
public void run() {
String replenishMessage = "replenish: status={} queue={} size={} capacity={} threshold={}";
LOG.info(replenishMessage, "started", queueName, queue.size(), capacity, replenishThreshold);
int itemsAskedFor = remainingCapacity();
        try {
List<T> items = wrapped.getNextBatch(itemsAskedFor);
queue.addAll(items);
LOG.info(replenishMessage, "completed", queueName, queue.size(), capacity, replenishThreshold);
} catch (Exception e) {
LOG.error("Error replenishing: queue=" + queueName, e);
throw e;
} finally {
if (!replenishing.compareAndSet(true, false)) {
LOG.warn("Replenishing indicator was out of sync for queue={}", queueName);
}
}
}
};

Sounds great, right? There’s a little bit more to it. Let’s think through an example run. Let’s say the capacity of the queue is 20 (in our case it’s actually in the tens or hundreds of thousands) and the threshold is 10. On the first run, the queue is loaded up with items 1–20. When the queue depletes to 10 items (items 10–20 remaining), the replenish job will run. Let’s assume it’s a SQL query and it now picks up items 10–20. Hmm, we already have items 10–20 in our queue, so the queue will be filled with 10–20 (the original items), then 10–20 (the items from the replenish). Obviously this is not what we want since we don’t need to process duplicates. For our case, there is no negative impact if duplicates are processed, but it is advantageous if we do not process any duplicates since processing the same item more than once is redundant. If we can avoid processing duplicates, that will lead to higher overall throughput of the entire migration system.

Our first thought was to use some Set implementation to represent the internal queue in an effort to avoid duplicate items. The problem with that approach is that we lose insertion order. Losing the FIFO nature of a queue could potentially lead to an individual item effectively being stuck in the internal queue indefinitely since there is no guarantee about the order the items will be dequeued in. Therefore, using a FIFO data structure was our first requirement. Another requirement we identified was that the addition of a duplicate item should not push that item to the end of the internal queue, since that could possibly result in an item constantly being pushed back to the end of the queue each time it is replenished.

The final solution was to build a ConcurrentSetQueue which implements the Queue interface and is composed of a ConcurrentHashSet and a ConcurrentLinkedQueue. This solution allowed us to trade space (memory usage) for performance since we can use the internal set for detecting duplicates and also checking the size of the structure all while effortlessly maintaining the FIFO structure.

With the ConcurrentSetQueue and the QueueingLimitedSource, our scheduler’s loop is no longer blocked by slow sources. It continuously iterates and picks up data from eachQueueingLimitedSource as records become available in each queue.

Before implementing this change, our throughput was very choppy. There were valleys as we waited for scheduler queries to finish, followed by peaks as the scheduler pushed the data along and then began waiting for queries again.

After the change was implemented, our throughput graphs tightened up and smoothed out significantly as we were continually pushing a fairly consistent number of records to the downstream systems.

As we were planning this change, we also looked at the ConcurrentSkipListSet as an alternative to implementing the ConcurrentSetQueue. This structure sorts elements by natural sort order or a given Comparator. In order for this to be FIFO, we would have to sort them by the time at which they were added to the structure. This would imply a wrapper that stored insertion time and implemented the Comparator. The wrapper would need to wrap each item as it was added and then be stripped from the item as it was extracted. That, in addition to the warning about the size() operation not being constant time guided us away from this solution.

After we implemented this solution, we discovered the LinkedHashSet, which would have been a decent alternative to implementing the ConcurrentSetQueue. The downside to this structure is that it does not implement the Queue interface so there is no poll() method to retrieve the next item. The next best thing is the iterator, however it will throw ConcurrentModificationExceptions if the structure changes (via replenish) while items are being extracted. Had we gone this route, our solution likely would have involved some sort of wrapper around the LinkedHashSet which, when retrieving items, cloned the internal set, extracted the necessary number of items and then removed the extracted items from the authoritative set. We also would have needed to make it concurrent using Collections.synchronizedSet().