Work queue with Go and RabbitMQ

We want to build a work queue system. We want to maintain a queue of tasks / work where we would push new work. Workers will actively monitor the queue and do these work as they come. A work queue system is ideal for background jobs which can take time, take longer than your average http request. A common example I use often to describe work queues — if your app handles user uploaded photos, creates several versions (thumbnails, different sizes) and shares them on different social media, how would you handle these? Resizing the photos and uploading them on other sites will definitely take time. Do you want to do this inside your http handler? You should not. Instead, when a photo is uploaded, store the photo somewhere convenient and pass the details to background workers which will then process the photo and upload wherever needed.

We need a messaging system

To accomplish what we want to do, we need a system that can work as a queue of messages. We want to have a distributed work queue system, where the workers and the work producers would live on different servers. There can be many source of work and many workers processing those. To build such a distributed system, we need a centralized message queue / message broker. We need a system where we can pass messages and this system will deliver these messages to our workers.

There are several tools that can help us with it — Redis, Kafka, RabbitMQ, ZeroMQ, IronMQ, AWS SQS — there are plenty of choices. Redis is very popular and I personally use it a lot with Celery / Bull. But RabbitMQ is a better choice in some aspects. In our example, we’re going to use RabbitMQ with Go to create a very simple work queue system.

Understanding RabbitMQ concepts

There are several concepts we should keep in mind while working with RabbitMQ.

Producers & Consumers: A producer is someone who creates new messages / work. Consumers are who consume them. In our example, when a file upload happens, the http handler produces a message for our workers to consume. The http handler / web app is the producer, the background workers are the consumers.

Exchange and Queues: From the name, you can guess they are somehow related to message handling. Exchanges receive messages from producers and deliver them to queues. Consumers consume messages from the queues. RabbitMQ comes with very powerful message routing features. We can, in many ways, customize the way messages are delivered to the different queues.

A command line example

In this post, we really don’t want to go into building a full fledged web app with file uploads. We want to keep it short. So we will do things on the command line. We will build a command line publisher tool which will publish/produce messages. And a consumer which will consume messages. We will then run multiple instances of the consumers in parallel to show how we can scale this system by adding more workers.

We are going to build a very sophisticated (?!?) calculator that can take two numbers and output their sum on the standard output. And we need to make it web scale, so we need to use Go and RabbitMQ.

Initial setup

Before we can start building out million dollar calculator, we need to have RabbitMQ installed somewhere we can connect to. For us developers, what’s a better place than localhost? Let’s install RabbitMQ on our local machine and have it running.

The RabbitMQ installation might vary from platform to platform. On a MacBook, I installed using Homebrew. On a linux distro, it’s probably available from the package manager. For Windows, there should be installable packages.

Once we have RabbitMQ installed, we need to have the Go AMQP package installed on our system. I am using Go Modules to install it. You may go get it, or use a dependency management system.

go get github.com/streadway/amqp

Building a consumer

We created a consumer directory inside which, we will build our consumer app. The consumer app needs to connect to RabbitMQ, declare a queue it wants to listen to and then start consuming messages.

Before we begin, we’re going to build an error handling function. This should help us through the tedious error handling scenario of Go.

If an error is not nil, print a message, the error details and quit. That’s what the above function does.

Now let’s start setting up a connection to RabbitMQ.

We’re going to try to connect to RabbitMQ and quit if it fails. The connection url is stored in a top level shared.go file. The value is set to: “amqp://guest:guest@localhost:5672/”.

If the connection succeeds, we need to establish a channel. Do not confuse it with Go’s channel. RabbitMQ has it’s own concept of channels. A connection is a TCP connection from the client to server. A connection is not cheap to create. A channel serves as the communication protocol over the connection. Channels are quite cheap. We should aim to limit connections to a minimum number while establishing as many channels as we need, on top of these connections.

We can now start talking to RabbitMQ. We need tell the server about the queue we’re interested in.

Next we create a queue named add. If you’re curious what the function arguments are, take a look at the docs here.

RabbitMQ starts delivering messages to consumers in round robin fashion. So it equally distributes work among all workers. If some work take way longer and some finish very first, one worker will have a lot of accumulated tasks, another one will not break a sweat at all. One worker will always be busy, one will be always idle. To eliminate such scenarios, we ask RabbitMQ to deliver new messages only when the worker has acknowledged previous message. The Qos function documentation can explain more.

Let’s go ahead and start consuming messages.

You can check the arguments here. This time, we will get a go channel. We can range over this channel to get messages.

We want to send messages as JSON. To represent a task for add operation, we would define a type again in our shared.go file.

The messageChannel we have, we can start ranging over it, decoding the message bodies into AddTask instances and then summing Number1 and Number2 to get results.

We’re launching a goroutine with the go func() call. This runs in background. So we need a way to ensure our main cli (worker) running on the foreground doesn’t reach it’s end and quit. We can use a channel and listen on it to keep waiting indefinitely.

Meanwhile in the goroutine, we’re ranging over the messages, processing the message body, handling the errors and finally acknowledging messages. While making the call to Consume, we have set autoAck to false . So we have to manually acknowledge a message that we have processed it. If we don’t acknowledge a message and worker loses connection, RabbitMQ redelivers that message to other workers. This allows us to gracefully retry messages even when a worker crashes.

It is also very important to remember to manually acknowledge messages when we have auto ack off. Otherwise, RabbitMQ won’t delete the messages (they are not acknowledged — so they are not done yet), the messages will fill up RabbitMQ memory and cause mayhem. We don’t want that to happen.

This concludes our consumer. If we run go build inside our consumer directory and run ./cosumer, it should start running (given we have done everything correctly).

➜  consumer git:(master) ✗ ./consumer
2019/02/23 20:54:55 Consumer ready, PID: 36361

Consumer looks like this:

Building a Publisher / Producer

Now let’s create a producer that will generate random numbers and send to the add queue.

For the publisher, we also need to repeat the connection, channel and queue declaration. So we can skip those parts and directly jump into the interesting parts.

You may be wondering why we’re declaring the queue in both consumer and publisher — it’s because we don’t know which one will start first. So we make sure that a queue is always there before we start consuming / publishing.

We have previously seen the AddTask type, let’s generate two random numbers and create an instance. Then we encode it into JSON, ready for publishing to the exchange.

Let’s publish it:

The code so far looks like this:

If we build this code and run ./publisher, it will queue a task. If we have one or more consumers running, we can see the results.

➜  publisher git:(master) ✗ ./publisher
2019/02/23 21:09:59 AddTask: 221+345

And consumer window:

➜  consumer git:(master) ✗ ./consumer
2019/02/23 20:54:55 Consumer ready, PID: 36361
2019/02/23 21:09:59 Received a message: {"Number1":221,"Number2":345}
2019/02/23 21:09:59 Result of 221 + 345 is : 566
2019/02/23 21:09:59 Acknowledged message

Empty Exchange Name and Queues

We publish messages to exchanges, consume from queues. In our publisher example, we didn’t specify a exchange name. If the exchange name is empty string, RabbitMQ directly delivers the message to the queue passed as the queue name.

More Advanced Usages

In our use case, we have used a very simple named queue. RabbitMQ exchanges can do a lot more. There are several types of message exchanges which can help us fanout messages (deliver same message across multiple queues) or do topic based matching while delivering messages to queues. Exchanges and queues can do intelligent, configurable message routing to serve complex use cases and build advanced distributed systems.


The code is available on Github: here.