How Meetup scales notification queue consumers

On a normal day, Meetup attempts to send anywhere from eight to ten million notifications (via email or push) reminding Meetup members an event is happening that night, a friend messaged them on the platform, or a new group was created. Notifications and their reliability are a key part of Meetup being Meetup.

One of my first projects as a Site Reliability Engineer (SRE) was to look into a longstanding production issue — our notifications queue would get backed up, and many notifications would be sent late, or yikes, not at all.

Our notifications system, “Switchboard”, sends messages via Simple Queue Service (SQS) on AWS. A group of tasks then consumes messages from the queues on SQS and sends them out to various notification channels, like APNS, GCM, and SendGrid. The core issue was that sometimes we wouldn’t autoscale the consumers correctly to handle spikes in notifications — for instance, large batches of notifications coming from cron jobs — coming through the system.

But first, some definitions to know before continuing

Feel free to skip this section if you’re familiar with AWS SQS, ECS, queues, and autoscaling, or to refer to it as you read and encounter an unfamiliar term.

  • Message: some data that can be passed around between processes
  • Queue: a data structure with network access that receives messages and stores them in order, which facilitates passing messages between processes
  • Consumer: a process that reads and processes messages from a queue
  • Alert: an automatic mechanism to contact an on-call engineer when a critical business metric crosses a threshold at which it is considered harmful or potentially harmful to the business
  • Resolution: when a critical business metric that has alerted goes back to an acceptable level
  • Queue backup: an influx of messages to the queue that outpaces consumers’ ability to read them, to the point where it triggers an alert
  • SQS: Simple Queue Service, an AWS product that offers serverless managed queues (but not consumers)
  • Container: a packaged unit of software and its dependencies, ready to be executed in an isolated environment
  • Containerizing: the act of packaging a process into a container
  • Task: an instance of a containerized process (such as a queue consumer)
  • Service: a group of identical parallel tasks
  • Scale out: start and add tasks to a service
  • Scale in: stop and remove tasks from a service
  • Overprovisioned: running too many tasks for relatively low load
  • Underprovisioned: not running enough tasks for relatively high load
  • Autoscaling: scaling a service out or in based on a metric and threshold(s) at a regular interval
  • Autoscaling event: a particular occurrence of scaling a service in or out based on the position of a metric in relation to its threshold at a specific point in time
  • Scaling metric: a metric that’s used in conjunction with a scaling threshold to trigger an autoscaling event
  • Scaling threshold: a minimum or maximum number for a scaling metric to trigger an autoscaling event
  • Cooldown period: the minimum time to wait between autoscaling events
  • Step size: how many tasks to scale out or in
  • Flapping: scaling out and in repeatedly quickly in an unstable manner
  • EC2: Elastic Compute Cloud, an AWS product that offers servers
  • ECS: EC2 Container Service, an AWS product that schedules tasks on EC2 servers, groups tasks by service, and exposes facets for defining service autoscaling rules
  • Latency: undesirable time between when a message is expected to be processed and when it is processed

First pass: ApproximateAgeOfOldestMessage

So we dug into our autoscaling rules. We found that we were scaling on a metric called ApproximateAgeOfOldestMessage, which pretty much does what it says on the tin: it tells you how old the oldest message in the queue is. The thinking behind this, presumably, was that when a lot of messages become backed up in the queue, there will be an old one, and our system should scale out more consumers. What’s more is that it’s one of the default metrics published by SQS, so we got it for free.

fig. 1: ApproximateAgeOfOldestMessage, the initial scaling metric
fig. 2: Running tasks, split by consumer type

Unfortunately, this caused some issues. First, sometimes one message would get stuck in a queue, and it would be the only message left, causing us to scale out consumers needlessly and waste money. We would only scale in when the age of the oldest message dropped to zero (fig. 1, 2). Second, we could be scaling out too slowly to handle sudden large spikes in notification load, but we’d never know it, because the only visible effect would be the same: one old message. In short, it wasn’t the metric we were looking for (fig. 3).

fig. 3: Obi-Wan Kenobi telling you to look elsewhere

Second pass: ApproximateNumberOfMessagesVisible

So we moved on. Next, we tried scaling on ApproximateNumberOfMessagesVisible, the number of messages backed up in the queue. It was another built-in metric coming from SQS, so all we had to change was the metric and threshold we scaled on in the scaling policy. We changed it to scale out when there were 50 messages backed up in the queue, and scale in when there were 0. This would allow us to scale appropriately in response to spikes in notifications, while not scaling out needlessly when one message became stuck in the queue. This was the metric we were looking for, and it brought the usage of our ECS cluster way down (fig 4, 5). Hooray!

fig. 4: Total running tasks, split by consumer type. Note the sharp drop in consumers.
fig. 5: Running tasks, split by consumer type. Note the sharp drop in consumers.

That was all fine and well, but we were still experiencing many lost and/or late notifications. Sometimes, load would increase more quickly than we could handle it (fig 6). The queue would still become backed up (fig 7), and we would scale out to handle the queue only after it became backed up (fig 8). This resulted in messages being sent with increased latency in the event of increases in load (fig 9). We tried a few things: scaling out more quickly, scaling in more slowly, adjusting the cooldown period between scaling, adjusting the step size, and adjusting the scaling threshold. None of it seemed to make a difference.

fig. 6: Load steadily ramping up to a plateau
fig. 7: Number of messages in the queue during a high load event
fig. 8: Consumer tasks scaling out too late and scaling in too early
fig 9: Unsteady processing and latency

What’s more is that we were missing a lot of scaling data at the bottom of the graph. Whether or not we were experiencing sustained load (fig 10), we scaled in when there weren’t any messages left in the queue (fig 11). Sometimes, we scaled in too early, which would cause the queue to back up again almost immediately (fig 11), resulting in flapping (fig 12).

fig 10: An extreme load event with a large plateau in the middle
fig 11: messages backing up in the queue and not getting fully drained
fig. 12: queue consumers not scaling appropriately to handle extreme or sustained load
fig 13: unsteady processing and latency

When 0 messages were backed up in the queue, we had no way to anticipate when a spike in load would cause the queue to get backed up before it actually happened. We needed to see the part of the data that happened below 0 — which is impossible when you’re talking about an amount of something (fig 14). But we couldn’t tell how effectively the backup was handled by autoscaling. Were we overprovisioned by a lot? Were we provisioned by just the right amount? Were we slightly underprovisioned? It was impossible to tell. Regardless, we were scaling in as though we were overprocessing.

fig 14: Is it alright to scale in? We just don’t know.

Third time’s the charm: Lambdas and Calculus

We needed a way to see that data and act on it. Luckily, as my manager Will Weaver reminded me, calculus exists. The derivative, or rate over time, of ApproximateNumberOfMessagesVisible (fig 15, 16) would tell us how many messages were coming into the queue relative to how many the consumers were processing.

fig 15: ApproximateNumberOfMessagesVisible
fig 16: derivative of ApproximateNumberOfMessagesVisible

But since we didn’t have that data below 0, we needed another way to approximate the derivative. What would tell us the rate of messages coming in vs. going out? Turns out SQS publishes those two metrics. They’re somewhat confusingly named: NumberOfMessagesSent refers to messages sent to the queue, i.e. by a publisher, and NumberOfMessagesReceived refers to messages received from the queue, i.e. by a consumer. It’s important to remember here that the queue doesn’t do anything on its own. It’s essentially a data structure with network access, with other parts of your system pushing and popping messages to and from it.

When we divide NumberOfMessagesSent (fig. 17) by NumberOfMessagesReceived (fig. 18), we get something better than the derivative of ApproximateNumberOfMessagesVisible. We get an accurate representation of the load our consumers are experiencing relative to their capacity, which is exactly what we want in an autoscaling metric. If load:capacity is significantly greater than 1, we scale out. If it’s approximately 1, we do nothing. If it’s significantly less than 1, we scale in (fig. 19, 20). This last case represents that data we were missing when ApproximateNumberOfMessagesVisible would have gone below 0.

We needed to write a Lambda to take these metrics, divide them, and push them to a new metric that we could then connect to a CloudWatch Alarm that would autoscale the consumers. One nice thing about this approach is we can run a Lambda more often than CloudWatch metrics supposedly come in — every 1 minute vs. every 5 minutes, respectively. We can also poll the CloudWatch API more often than metrics show up on a dashboard and get more up-to-date metrics that way. This means we have access to all this data before ApproximateNumberOfMessagesVisible backs up too badly (fig 21). We can also add more stepAdjustments — stratified autoscaling thresholds— for higher ratios to handle more and more extreme spikes in load (fig 19). So in the event of a sudden spike in messages, we can anticipate it and scale to handle the load before it becomes a problem, ensuring more reliable delivery of these messages.

fig. 17: NumberOfMessagesSent — incoming messages to the queue
fig. 18: NumberOfMessagesReceived — messages picked up for processing — follows the incoming curve closely
fig. 19: MessageProcessingRatio: the ratio of NumberOfMessagesReceived to NumberOfMessagesSent
fig. 20: Running Tasks — following the load curve more closely than before
fig. 21: ApproximateNumberOfMessagesVisible — showing short-lived queue backups

What are the tradeoffs of this approach? First, we have to own and operate our own CloudWatch Metric publisher. This includes dealing with issues such as the Metric’s data inexplicably disconnecting from the Alarm every so often. But there’s not much to operate, since this is a small serverless function. This was also before the days of CloudWatch Metric Math, which as of this writing doesn’t support connecting to CloudWatch Alarms. In a reality where that was possible, Metric Math would be a much better solution to this problem, since it would take away much of the operational overhead.

Another thing that happened was that while this fixed the specific issue of not being able to process a queue fast enough, it exposed other issues in our architecture and design now that we were scaling up many more consumers to handle big spikes in load: we ran out of keys to represent in-flight messages when obtaining locks to avoid sending the same notification more than once. But that’s another story!

We also ended up using far more consumer tasks than we previously did, since our load:capacity was much higher than we initially thought. As a result, we quickly ran out of MemoryReservation in the ECS cluster on which Switchboard runs. This caused not only Switchboard to intermittently fail, but also other services running on that cluster. Scaling out that cluster to handle the increase in tasks solved that issue.

Conclusion

We started out with an autoscaling mechanism for Switchboard that was built with good intentions. Through trial and error, we eventually found a metric that fulfilled those intentions, with the added benefit of forcing us to strengthen other aspects of Switchboard’s architecture and infrastructure. While there’s still much work to be done, Switchboard is a more reliable and observable system today than it was before.