A Brief History of Priority Queues at CMB

David O'Steen
Coffee Meets Bagel Engineering
6 min readMay 19, 2017

We run a lot of asynchronous tasks at Coffee Meets Bagel, and as such, we need some sort of queuing system(s) to manage them. Two years ago, when I started working for the company, our primary task management system was Celery. Present day, we have largely migrated our tasks off of Celery and use either Amazon Simple Queue Service (SQS), or our in-house implementation of a priority queue built primarily with Redis. This post will mostly focus on the history and implementation details of this in-house queue. Let’s call it the CMB-PQ.

History

A common usage pattern at CMB (and I’m sure for many other organizations) is putting a profile ID in a queue, and then having a worker process dequeue the profile ID and perform some operation. In some contexts, we find it helpful to have a highly granular prioritization within the queue. One such use case is the queue and worker process we use to update people’s recommended matches. Given this business / engineering requirement, the first version of our PQ was born.

CMB chose Redis, which offers guarantees of atomicity given its single-threaded implementation. As an in-memory datastore, it is also very fast. Conveniently, it already has a priority queue data type built in via sorted sets, which solves our granular prioritization problem. It would certainly be possible to simply leverage this data type, but for some contexts in CMB, there are engineering requirements that no two workers can process the same item in parallel.

Thus, the first version of the CMB-PQ consisted of two components: a sorted set in Redis, and a locking key for each item in the queue ‘in progress’. This was implemented by registering Lua scripts on our Redis server (we only had one at the time) for each of the core operations. These scripts are atomic, so we can safely write our core priority queue operations.

The supported operations in our initial implementation were enqueue, peek, and remove. The peek operation wasn’t named as a dequeue because it doesn’t actually remove the item from the queue. The high-level implementation of the peek operation was:

  1. Starting from the first (highest priority) item in the sorted set, loop through the set to find the first item that doesn’t have an associated lock key.
  2. Once an item is found, set a lock key for that item with a timed expiration, and return the item.

The worker process is then responsible for calling the separate remove script, which relinquishes the lock key and removes the item from the sorted set. The enqueue operation was a simple matter of adding the item to the sorted set. In that state, the PQ served a few business / engineering functions:

  • Granular prioritization
  • Guarantee of no two workers processing the same item in parallel
  • Automatic retry (expiring lock key has the effect of relinquishing the item to the pool of workers)

Fast forward a bit, and a new business need surfaced. We wanted the ability to dynamically schedule a large number of tasks per user. After evaluating existing technologies and options, we came to the conclusion that leveraging our PQ system was a viable option.

Version 2 of the CMB-PQ added optional scheduling. In this version, the priority used for a scheduled queue was the timestamp of when we wished to execute the task on the given item in the queue. A second peek operation was defined that was specific to this use case:

  1. Check the timestamp of the first item in the queue. If it is greater than now, return nothing.
  2. Same steps as V1 of the PQ

Since the queue is ordered, we know if the first item in the queue is scheduled for the future, all the other items are as well. It was a fairly lightweight solution to the problem that didn’t require disrupting existing existing PQ systems.

Fast forward again, and we noticed that Redis latency was getting spiky. Uh oh. Monitoring activity revealed a troubling pattern: we were scanning through hundreds of locked keys in our busiest PQ, and these scans were blocking our entire production system. At smaller scale, it had never surfaced as a problem because we were running a low number of worker processes. As we scaled these up, we had explicitly made the peek operation more expensive.

Thus, Version 3. After a lot of thought about a more sensible design that still met our business and engineering requirements, we decided to get rid of the locking mechanism, and instead use two queues: amain sorted set where we originally enqueue profiles, and a retry sorted set to where items are moved after a worker process picks them up. The priority in the retry queue is always a timestamp and represents the time an item should be retried. As part of the definition of the queue, we added a hard-coded retry time into our Lua scripts to accommodate our longest running processes.

The implementation of peek became:

  1. Check the priority on the item at the front of the retry sorted set. If the timestamp is less than now, put the item back in the retry queue with timestamp now + retry_seconds. Return the item.
  2. If no item was returned in step 1., check the item at the front of the main sorted set. If this is a scheduled PQ, check the priority and if it is greater than now, return nothing.
  3. Remove the item from the main sorted set and add it to retry sorted with priority now + retry_seconds. Return the item.

In this implementation, every peek operation now ran in constant time. The improvement of our Redis server’s performance was obvious once we deployed the new implementation.

Fast forward again. We had done a lot of work to optimize the throughput of our heaviest worker process, and we finally got to a point where we were no longer continually running backlogs on that queue. Alas, we started getting reports from customer service that some people are receiving matches outside their preferences. Uh oh.

Our perpetual backlog had been masking a flaw in the design of V3: we were no longer meeting the engineering requirement that items (in this case profile ID) should not be run on two workers at the same time. Our enqueue operation blindly added items to the main sorted set, regardless of whether it was in the retry set. We could have changed it to check the retry set first and skip enqueueing if it was already being run, but this seemed undesirable from a business requirement perspective. What if a user changed their profile preferences twice in quick succession? We still want to respect the latest change, so effectively throwing out a request was a non-starter.

And we arrive now at Version 4. In addition to the two sorted sets, we added a backlog hash. We also made retry_seconds customizable per instantiation of our queue class. The peek operation remained unchanged, but the enqueue operation became:

  1. Check if item is in retry set. If so, add to the backlog hash with the item as key and the priority as the value.
  2. Otherwise, add item to the main set.

And the remove operation became:

  1. Remove the item from the retry and main sets.
  2. Check the backlog hash for the item. If it exists, add it to the main set with the priority we saved in the value of the hash.

Problem solved, we have restored the requirement that items can not be run in parallel. Well, sort of. If you’ve been following closely, you might have noticed that if a worker takes longer than the retry_seconds to complete an item, another worker process can pick it up. It’s a known limitation, but we’re not engineering around it at the moment. We have noted it in the documentation and advised setting a generous retry_seconds if parallel processing is a concern.

Implementation

The following are our current implementations of each of the PQ operations (note, we changed the name from peek to check_out, as we felt that was a fairer representation of mutating the state of the item in the queue):

Takeaways

Our CMB-PQ is not perfectly suited for every problem, but it currently solves several of our business needs in an efficient, reliable fashion. Things I have learned from working on it:

  • Document the business and engineering requirements of any given system in detail. That way, if myself or others need to update the system, they can do so with more context about how it might affect existing dependencies.
  • Occasionally revisit and refactor systems that have undergone multiple iterations of change, even if it is painful. Others consuming the mechanism as part of their work will be able to do so more efficiently and with less risk for error.
  • Be vigilant with alerting and logging errors. This is an infinite retry system, and without alerting on the length of the queue and error logging, you risk blocking entire asynchronous classes of workers.

--

--