SQS Consumer Design: Achieving High Scalability while managing concurrency in Go

Recently we’ve had to redesign our async queuing system. After researching several options, we went with AWS Simple Queue Service(SQS) and Simple Notification Service(SNS). With a combination of these two services, we were able to duplicate and improve our previous (and failing) RabbitMQ set-up. The great thing about the AWS offering is that they are built from the ground up for micro-services. They abstract the vast majority of the complexity of a complicated async messaging system, from handling horizontal scaling via the message waiting period to Dead Letter Queues and a multitude of other features, right out of the box.

As per AWS suggestion on scaling, they recommend scaling horizontally across multiple services as well as within a single service by utilizing multiple threads. In this blog post, we will focus on the best way to retrieve and handle messages from SQS using Golang within a single service by utilizing multiple threads.

Throughout this example, we are going to be dealing with an AWS queue with 20,000 messages. We will measure three different techniques of managing concurrent workers in Golang, as well as a synchronous control test. We will measure the total number of items consumed over the course of 3 minutes and take the average for a messages consumed per minute (mpm).

All examples will use the go aws-sdk to longpoll the sqs server. A successful receipt of a message will be processed by a basic handler that makes an http request to another micro-service before returning and completing the request. Upon completion, we delete the message from the queue, which is SQS’s way of expressing the message has been consumed.

The Control

In this example, we will have a control. A synchronous consumer

func (c *consumer) Consume() {
for {
output, err := receiveSQSMessages(c.QueueURL)
if err != nil {
//log error
continue
}
  for _, m := range output.Messages {
c.run(newMessage(m))
    if err := h(m); err != nil {
//log error
continue
}

c.delete(m) //MESSAGE CONSUMED
}
}
}

With a synchronous consumer, we get an average processing time of 160mpm

Basic Concurrency

Let’s add some goroutines. While goroutines are cheap, we still want to control them to some extent. Allowing a million goroutines to spawn over time will probably cause our program to run out of memory and run slower than our control. In this example, we will use a simple atomic counter to keep track of and add a limit to the total amount of workers allowed. We use the atomic package because it is safe to use concurrently. While simple, using the atomic package is not recommended for basic use as they tend to work differently than expected.

In the following examples, we will now allow SQS to batch 10 messages per request

func (c *consumer) Consume() {
c.workerPool = 50 // should be configurable
c.workerCount = 0
maxMessages := 10
 for {
if atomic.LoadInt32(&c.workerCount) < c.workerPool {
output, err := receiveSQSMessages(c.QueueURL, maxMessages)
if err != nil {
continue
}
   for _, m := range output.Messages {
atomic.AddInt32(&c.workerCount, 1)
go c.run(newMessage(m))
}
}
}
}
func (c *consumer) run(m *message) error {
defer atomic.AddInt32(&c.workerCount, -1)
  if err := h(m); err != nil {
return err
}

return c.delete(m) //MESSAGE CONSUMED
}

With a basic concurrent consumer, we get an average processing time of 2,700 mpm. That is a huge improvement.

Interestingly enough, despite the fact that we allowed 50 goroutines to run at once, only 2 responses (20 goroutines total) would ever be running at one time. This is because the messages were processed faster than the long polling could retrieve a message from SQS. We have now found the http request from SQS to be the primary bottleneck

Worker Pools

Worker pools are the go idiomatic way to manage goroutines in an application. They are more efficient in both memory and speed, and properly distribute workloads across all primed workers. In most cases, this would be the final solution (hint: it’s not)

func (c *consumer) Consume() {
maxMessages := 10
jobs := make(chan *message)
for w := 1; w <= c.workerPool; w++ {
go c.worker(w, jobs)
}
  for {
output, err := retrieveSQSMessages(c.QueueURL, maxMessages)
if err != nil {
//log error
continue
}
    for _, m := range output.Messages {
jobs <- newMessage(m)
}
}
}
// worker is an always-on concurrent worker that will take tasks when they are added into the messages buffer
func (c *consumer) worker(id int, messages <-chan *message) {
for m := range messages {
if err := h(m); err != nil {
//log error
continue

}
   c.delete(m) //MESSAGE CONSUMED
}
}

First, we create a pool of worker pools (50 in this case). These workers are listening for a job. Ranging over the messages channel is a blocking operation, so the worker will be dormant until there is a message available. If you were handling errors or message handlers differently, you could use a select within a For loop to handle various options in the same way.

Additionally, the dispatch of the message in the consumer is also a blocking operation. It will only continue if there is a worker available. This ensures that the consumer will not continue retrieving messages if there are no available workers.

The great thing about this approach is it achieves something close to a round-robin amongst the various workers and is much faster in most cases.

With a worker pool consumer, we get an average processing time of 2,766 mpm. Not a huge improvement of the previous example. In this case, while the code is more idiomatic and reliable, we still have the same single point of failure and bottleneck, in that the long polling request to AWS is just not fast enough.

Final Solution

The next logical step was to remove the bottle neck by incorporating the http request itself in the worker. We depart from the worker pool concept in that we no longer need a dispatcher (i.e. a system to send jobs or workloads to the worker pool via the channels). We just need a set of goroutines that indefinitely queries the sqs servers for messages and just handles them.

func (c *consumer) Consume() {
for w := 1; w <= c.workerPool; w++ {
go c.worker(w)
}
}
func (c *consumer) worker(id int) {
for {
output, err := retrieveSQSMessages(c.QueueURL, maxMessages)
if err != nil {
continue
}
  var wg sync.WaitGroup
for _, message := range output.Messages {
wg.Add(1)
go func(m *message) {
defer wg.Done()
if err := h(m); err != nil {
//log error
continue
}
c.delete(m) //MESSAGE CONSUMED
}(newMessage(m))

wg.Wait()
}
}
}

At 6750 mpm we have succeeded in 200% faster message processing than the previous example and a 4000% increase from the synchronous example

In this example, we create a set of workers based on the configurable amount, that are all responsible of their own longpolling and message handling. When a set of messages is received, a waitgroup is used to syncronize an additional set of goroutines to manage the message payload before querying again

While this solution deviates from the elegance of the workerpool, it removes the bottle neck as well as the single point of failure. The end result is an exponentially faster consumer, which succeeds in a single instance what would have taken 50 instances.

Have another solution? Please leave a comment