Practical queueing using SQL (Part 2): Do it simply using Spring Boot and JPA

Vjeran Marčinko
Agency04
Published in
8 min readMay 30, 2019

Part 1: Rationale and general design
Part 2: Do it simply using Spring Boot and JPA
Part 3: Clustering and parallelism

Enough of talking, let’s see how to implement queueing mechanism described in previous post. We gonna do it in Java, and for this little demo, we are picking a combination of Spring Boot, JPA and Gradle, since it’s fairly quick setup with these.

Although I am not a fan of premature abstraction due to frequent situations where it gets more in the way than helps, but just for sake of easier explanation I will abstract whole queueing mechanism in separate package (or two), and the rest of app acts simply as a user of such “queueing library”. This way one can easily see what is application-specific, and what is general queueing code.

We gonna demonstrate this queueing mechanism on some dummy SMS message sending application, but let’s first get you introduced to queueing code.

The code and build/run instructions are available at the end of this post.

Queueing library

Whole “library” is present under com.ag04.jpaqueue package.

Since entities that are to be processed in queueing fashion are application-specific, we gonna extract only their queueing state as JPA Embeddable object (some code omitted for brevity):

Queue producer

As you can guess, just embed this QueueingState object into entity of your choice to prepare it for queueing. Also, there is one important public method in QueueingState class, the one that you have to call when pushing the item entity into queue:

public void scheduleNextAttempt(LocalDateTime nextAttemptTime);

So prior to saving the queuing entity, just make sure to call this method with desired processing time, thus populating next_attempt_time column. Usually this is the current time, which means the item will get processed as soon as possible, but it can even be in the future which effectively makes for delayed/scheduled processing.

So, pushing to queue would typically look something like this:

SomeEntity item = new SomeEntity(....);
item.getQueueingState().scheduleNextAttempt(LocalDateTime.now());
save(item);

Queue consumer

Now comes the most complex part — the consumer code. It is located in one class — com.ag04.jpaqueue.QueueConsumer

Looking at its constructor, you can see it is configured with item batch size, and also with polling period (in seconds). The constructor also requires few other dependency arguments:

  • QueueConsumerModule — application-specific logic required for item consumption
  • RetryPolicy — strategy how to handle retries in case of processing failures; there are couple of implementations available in com.ag04.jpaqueue.retry package
  • PlatformTransactionManager — Spring-provided bean for managing transactions because we need it to open some internal transactions programmatically (this is very rare nowadays in Spring world — other approach would be normally to use @Transactional with additional bean class, but it would require separating code in more classes, so I considered current approach simpler for this demo)

Arguably, the most interesting is QueueConsumerModule which looks like this:

So, these methods should be implemented by concrete application in order to provide:

  • IDs of limited list of pending items where next attempt time is before given time value
  • QueueingState instance for specified item entity (sometimes there can be multiple QueueingState embeddables within same entity when there are multiple queued processings present for same entity); return value is optional because it can happen that item is not present in DB for some external reason
  • Processing logic for specific item entity which returns item’s QueueingState in case of success; return value is again optional for same reason described above

Understandably, we need a scheduler for executing processing job periodically, and as explained in previous post, we want to ensure there must be no parallel executions of such processing task, so we will simply use single-thread executor, and schedule processing task upon application start:

Consuming logic is within this method:

So you can see how consumer first polls for pending items, and then process each of them within its own separate transaction. If the error is caught during the processing, then we handle it in a new transaction. This separate transaction for error handling is required because in some cases JPA’s PersistenceContext cannot be reused for error handling if the exception has already rolled back some nested transaction in processing logic.

The consumer starts its processing job in its initialization method, which in Spring is usually annotated with @PostConstruct, and respectively stops it in its destroy method annotated with @PreDestroy. We will see in next post that we need to change this to make our app cluster-friendly.

Demo app — SMS sending

OK, so let’s get to business — now we gonna set up simple Spring Boot app that simply enqueues some amount of SMS messages and “sends” them using previously described queueing library. Application code is located under com.ag04.smsqueueing package(s) and starter class is com.ag04.smsqueueing.SmsQueueingApplication (annotated with @SpringBootApplication).

By SMS “sending” I mean just logging SMS content to console, but this dummy sender will also introduce some short delay to simulate real world case, as well as trigger occasional exceptions to show how error handling works (actually errors will be triggered quite often). Sender implementation is located in com.ag04.smsqueueing.sender.SmsSenderImpl class.

Queued entity — SMS message

Our queued item entity is SmsMessage class, stored in SMS_MESSAGE table in the database.

So you see, this entity has only a few specific fields (fromAddress, toAddress, text…), and also includes QueueingState embeddable previously described. Note that we also used JPA’s @Index annotation to index “next_attempt_time” column, which is of big importance for efficient polling.

SMS production

Upon application start, the application enqueues some amount of SMS messages. This is visible in com.ag04.smsqueueing.MainApplicationRunner#run method.

SMS consumption

We instantiate QueueConsumer within com.ag04.smsqueueing.SmsQueueingApplication#smsSendingQueueConsumer method:

You can see how we configured this instance to work with batch size 100, and with polling period of 10 seconds. Also, RetryPolicy that we configured will try to process SMS message at most 3 times (LimitedRetryPolicy) and also each retry will be delayed from previous one by 1 minute (FixedDelayRetryPolicy). ExponentialDelayRetryPolicy is another interesting RetryPolicy in the package, which makes each subsequent retry delayed by exponentially increasing duration.

Our implementation of QueueConsumerModule is SmsSendingQueueConsumerModule, and uses Spring Data JPA repository to access SmsMessage in database. You will notice that Implementation of processItem(id) is basically just a call to SmsSender. Also, our findItemIdsWhereQueueingNextAttemptTimeIsBefore(time, limit) doesn’t use SmsMessageRepository because Spring Data JPA currently doesn’t offer parameterized result limiting together with simple projection (ID selection), so we ended up using plain JPA via EntityManager.

Building and running

The project code can be downloaded from BitBucket at:

https://bitbucket.org/ag04/smsqueueing/src/single-threaded-consumer/

You can download the sources using following git command:

git clone https://bitbucket.org/ag04/smsqueueing/src/single-threaded-consumer

Gradle is picked as a build tool, and as one can see in gradle.build, there are only 3 dependencies required for this demo project:

dependencies {
implementation 'org.springframework.boot:spring-boot-starter'
implementation 'org.springframework.boot:spring-boot-starter-data-jpa'
runtimeOnly 'org.postgresql:postgresql'
}

To build the project just execute following command line:

./gradlew clean build

Now we have built the app package (build/libs/smsqueueing-1.0.jar), but prior to running it, we need to make sure we have some place to store our data.

Database setup

We decided to use PostgreSQL database, but any other SQL database would work also since we don’t use any DB specifics here.

One has to create appropriate database (“smsqueueing”) and a user (also “smsqueueing”). For PostgreSQL, this “psql” script can come handy:

shell> psql postgresCREATE USER smsqueueing WITH
LOGIN
CONNECTION LIMIT -1
PASSWORD 'smsqueueing';
CREATE DATABASE smsqueueing
WITH
OWNER = smsqueueing
ENCODING = 'UTF8'
CONNECTION LIMIT = -1;

DB connection configuration is done as standard Spring Boot setup in application.properties:

spring.datasource.url=jdbc:postgresql://localhost:5432/smsqueueing
spring.datasource.username=smsqueueing
spring.datasource.password=smsqueueing

If not already present, all necessary database objects (tables, sequences etc…) will get created upon application start, so you don’t have to worry about it.

Running

Finally we can run the app via:

java -jar ./build/libs/smsqueueing-1.0.jar

Of course, if you gonna start it from your favorite IDE, just import it as Gradle project, and start it via main application class — com.ag04.smsqueueing.SmsQueueingApplication

When you start it, you gonna see at first log lines about SMS production, something like:

And each periodic execution of consumption task, when it polls some pending items, it will log their count:

After which it logs individual item processing lines, such as these for successful processing:

And something like this for unsuccessful one (and these will be frequent in our demo):

Notice the last line that informs about next attempt time when the retry has been scheduled (if any, maybe it’s the last attempt).

A quick look at database (SMS_MESSAGE table) during app runtime shows all relevant fields:

Of course, you can try restarting the service which will add new bunch of SMS messages to our queue, and naturally, if there are old ones that are still in pending state, they will continue to get processed according to their scheduled attempt time.

Now, with this extremely simple queueing setup you can do all sorts of things, something like following, and much, much more:

  • count number of pending items:
    SELECT COUNT(*) FROM sms_message WHERE next_attempt_time <> null
  • count number of pending items aggregated by domain-specific field (eg. fromAddress):
    SELECT from_address, COUNT(*) FROM sms_message WHERE next_attempt_time <> null GROUP BY from_address
  • re-schedule all failed items with retry limit reached, to execute as soon as possible:
    UPDATE sms_message SET next_attempt_time = CURRENT_TIMESTAMP() WHERE status = 'ERROR' AND next_attempt_time = null
  • remove pending items that failed too many times (say 50):
    DELETE FROM sms_message WHERE next_attempt_time <> null and attempt_count > 50
  • remove some specific invalid item which ended up in queue due to a bug:
    DELETE FROM sms_message WHERE ID = 34254642991
  • display failure descriptions for items which last processing attempt was after some specified time:
    SELECT last_error_attempt_message FROM sms_message WHERE status = 'ERROR' and last_attempt_time > to_timestamp('20190523', 'YYYYMMDD')

… and anything else that comes to your mind.

So tell me — how easy would be to implement all those mentioned features with some setup including popular queueing library/server?

To be continued …

And that’s it. In our next post we gonna see how to ensure that only single consumption job is executed in clustered environment, and how to add parallelism to improve the throughput.

--

--