Asynchronous Queueing in Redis with Akka

Orji samuel
The Startup
Published in
6 min readJun 6, 2019

This post assumes a basic knowledge of Scala and Akka.

I was working on a project that involved sending an SMS through a 3rd party API, I needed to check if the messages were sent or not, to inform the user of the service of the message status.

But what happens when the message doesn’t go through or the 3rd party API is broken, surely I am not going to make the user of this service wait until the message finally goes through after retrying a number of times, isn’t there a way I can queue the message elsewhere and have another service fetch these messages asynchronously from the queue and retry without me having to monitor it ?

This gave me a lot of issues as I couldn't find any resource that helped with me queueing to Redis and having a background service retry sending the message until I came up with a solution that worked for me.

I figured I will try to send the message initially, and if it fails, I push it asynchronously into Redis, I’ll also configure a Worker Actor, that will fetch the messages from Redis, and then retry at some later time I configure.

I built a small project showing how to implement this. We start by adding our dependencies for connecting to our Redis instance as well as serializing JSON amongst others in our build.sbt.

The flow of the project is really straight forward. We need a Scheduler(Actor), whose job is to fetch from Redis at configured intervals and then send the result to a Worker(Actor) who then acts on the result [No pun intended :)]. Since all of this is asynchronous, the user of the messaging service can go about his job and not worry about how to handle the queue provided he has properly configured this service.

First, we define our Redis Instance, as shown below

Now, we have our Redis Instance, Now Let us Create our Worker Actor. I decided to make it a Polymorphic Actor, in case functionality may want to be extended in the base actor. But for the sake of this tutorial, I just created a simple Worker Actor from its Companion object as I always love to :).

So our Worker Actor simply takes a function as an argument which is applied to the string element Dequeued from Redis.

In my project, I subscribed to a heartbeat message from the Redis Actor amongst other messages, to ensure that my Redis Server is alive and if not, publish an error and send an alert to relevant services. The heartbeat message was handled by the base actor in the specificReceive, so the child Actors only have to deal with handling the result from Redis queue and nothing else. This is one major reason why my worker actor is polymorphic.

Now we need our Scheduler or Listener (Actor) that should connect the Redis Instance and the worker actor by fetching from the Redis queue and sending the message to the Worker Actor to take appropriate actions.

There may be scenarios where you may have multiple queues, hence the need for having a scheduler for each queue so you can have full control of what to do with each queue, and how frequently you should query Redis for data. It is also very essential that the scheduler creates the worker actor, so we don’t have a scenario where we just have an idle worker actor with no scheduler available.

I added the maxNumDeq argument so we can control how many elements we dequeue at once before we reschedule another request to fetch from Redis, this will help to prevent overloading the worker in cases where the worker may be performing a resource-intensive task.

One other thing, I used Props as the type for the worker in the scheduler because one may want to use a pool of worker actors instead of just a singular actor, so using a Props seemed to be the best way to represent both a pool of actors and a single Actor

Now we have our Redis Instance, worker actor and scheduler all set up, the next question is how to set up the scheduler ( If there are multiple queues that we should listen to). I opted for using a Queue Manager (Actor ) which for the sake of this tutorial will simply handle the creation of schedulers. But in a production environment, this manager can set up a Supervision Strategy, watch the child Actors (schedulers) or contain some other form of logic as a parent.

As was the case of the scheduler creating the worker actor, in this case too, I’m having the queue manager create the scheduler. I prefer this kind of hierarchy as it helps me a lot in debugging.

Now we have a way of creating a Scheduler, Redis instance, and worker Actor. What’s left is how we are going to connect the pieces together, and for the sake of this tutorial, I wrote a mock Messaging Service that purposely fails when sending messages. When the message is initially sent, retry is originally set to true, so its queued when it fails the first time, and this field is later set to false so the messaging service doesn’t enqueue anymore, this responsibility is passed to the Worker to handle what to do with the queued messages, either to retry or perform some other task.

Now, to test how this works, let’s try to run our app,

I first create my Redis Instance, Messaging Service, and then send messages to the messaging service that will enqueue to Redis (because I configured the messages to fail). I then created my worker function, that will act on what was dequeued from Redis. This function provides so much flexibility to the user, who may want to swap gateways, publish an error to a Dashboard or anything else for that matter.

The function supplied above is really simple, it takes the element and converts it to a case class, then it initially tries to send the message to the Messaging service, but sets the enqueue argument to false, so the messaging service doesn’t enqueue it anymore, and then when it receives a response from the messaging service as to the status of the message sent, it checks if it should try enqueueing again, with the handleQueuedElement function that reduces the number of times the message is to be enqueued by 1. This ensures that the message is enqueued nothing more than numRetry (this variable is supplied when creating the SendMessage object to control how many times we should try enqueueing).

I then created my scheduler, that fetches data every 10 seconds and passed the result to the queueManager.

For my project, instead of just printing to the console, when the element couldn’t be enqueued anymore after trying the message 5 times, I simply logged an error and alerted an Email service that sends a mail to registered System Admins to take proper actions.

It is very obvious that there are so many println statements all over. This was done to help readers better understand what is going on. In production, this shouldn’t be so, we can modify our code, to Log errors or write to a database.

I sincerely hope this post can help solve queueing problems in projects as it did for me, I have tried to properly separate concerns here to offer the developer more flexibility as to how all the services should be modeled and connected. Each of the services can be modified to suit a specific need, but at the end of the day, what’s important is that we understand how to solve our problem :).

The Code for this project can be found here →

--

--

Orji samuel
The Startup

Scala and Golang Developer, Fitness Addict, Loves watching movies and making music in my spare time.