Spring custom AWS SQS listener with parallel processing

Message-driven architecture as the name says uses messages to integrate distributed applications. It is important to create a system that quickly processes messages using all available resources. And at the same time, it does not allow the endless stream of messages to overwhelm itself. This tutorial is about a highly configured, custom queue listener that allows processing messages in batch.

Ivan Polovyi
Javarevisited
6 min readSep 25, 2022

--

The use case:

There is distributed system that handles purchase transactions of customers.

The challenge:

We got the task of creating a microservice that will listen to the SQS full of messages with information about the purchase transaction of customers. Upon receiving the message the microservice will communicate the API to calculate the loyalty points and then will save the purchase transaction to the SQL database.

By requirement, the microservice will have to be able to handle a large volume of messages. The idea is to with a defined frequency receive the specified amount of messages from the queue, process them in parallel, and then call the database to save all messages.

By controlling the volume of messages that the service will receive and the frequency with which the service will receive messages we won't overwhelm the loyalty API and won't overwhelm the database.

Because we will call the external API no more them it can handle per some period and by saving a chunk of the messages in the database at once we won't need to call a save operation on a database for every single message.

To summarise our microservice has to be able every n second to fetch n messages from the queue, process them in parallel, and save n messages at once to the database.

As we will use Spring to build the microservice we could use Spring Cloud AWS. It is very easy to build an SQS listener using this library. With little to no configurations, one can create the listener method just by annotation it with @SqsListener annotation.

But this library is limited. The maximum amount of messages that can be fetched at once cant be more than 10. Although it allows parallel processing, receiving up to 10 messages doesn't allow saving messages at once to the database.

Because it receives messages, creates multiple parallel threads, and each thread calls the listener method passing one message as an argument.

The solution:

The solution is to build the custom SQS listener using AWS SDK. First, we have to create a bean for the amazon client. It is simple and you can check it yourself in the project. The link to the project I will leave below.

Then I created a method that is a listener of the SQS. To make it periodically poll for the messages from the SQS I've annotated this method with Spring @Scheduled annotation.

This is a very easy-to-use annotation. We just need to annotate the method and configure the rate (time) with which the method will be called. It is highly configurable and supports externalization of the configuration to the property file. It is exactly what I need. So in this example, the method will run every 3 seconds, but this configuration can be changed in the application file.

To poll the messages from the queue we have to create a request using AWS SDK. On the request object, we set the query URL, the number of messages we need to poll at once, and the time each poll will last.

The last property allows the listener to wait for messages during a specified time. Let’s say when the listener polls for the messages and at that moment no queue is empty or only one message is available in the queue then a listener waits for more messages to arrive so he can poll mere messages at once.

Then we have to call a method on the amazon client to receive messages passing the created request. This call returns the list of messages.

Next, we can configure the way the messages will be processed, in parallel or sequentially. This decision is based on the externalized property. This property is a boolean and if it is false the messages will be processed in a sequence, if it is true — in parallel.

Parallel streams using Fork/Join CommonPool. This pool is shared by the whole process of the entire app. More about Fork/Join framework can be read here:

The message is received in JSON format. After, the message is converted from a JSON string to the entity object. Then we call the external API to create loyalty points. There is a little delay for this API, just to simulate the calculation time. And this delay will show how faster can be parallel processing compared to a sequential one.

When it is processed successfully the message is “marked” as processed by adding the message attribute of the same name and setting its value as true. If any error or exception happens during message processing then the message won't have this attribute and the object entity will be null.

After we collect entity objects and save them to the database all at once. In the last step, we will remove one by one successfully processed messages from the queue. The messages processed with an exception will be “returned” to the queue.

After publishing this blog @Connor Butch wrote a comment asking me why I didn't use a batch delete method to remove all messages at once from the queue. Honestly, I didn't pay much attention when I was reading SDK documentation. And I agree with him that is more performant, so I've refactored the code.

When I say “returned” I mean that it won't be deleted from the queue and after some time it will be visible again in the queue depending on the queue configuration. We can try to process the message later or implement the DLQ pattern. More about this can be found below:

The code is below:

The complete code can be found here:

To be able to easily run this project locally with a large number of messages in the queue I've added one util class that generates a file with the messages. To create SQS locally the Localstack is used. The Docker compose is used to create a container with a Localstack. From the directory where the docker-compose file is located execute the command:

On initialization, the Localstack will create the SQS and populate the queue with messages from the file generated previously. How it works is explained here:

The app has a GET API that returns all purchase transactions from the database. By querying this API one can check saved messages.

Short demo video:

Conclusion

This tutorial is about the creation of a highly configurable, robust SQS listener. When you have a simple case I suggest you use Spring Cloud AWS. But if you have a case where you want to control most of the configuration then build the custom listener. Thank you for reading! Please like and follow. Feel free to write in the comment section or on my LinkedIn account if you have any questions or suggestions or better ideas.

Become a member for full access to Medium content.

--

--

Ivan Polovyi
Javarevisited

I am a Java Developer | OCA Java EE 8 | Spring Professional | AWS CDA | CKA | DCA | Oracle DB CA. I started programming at age 34 and still learning.