Filtering messages in Azure Service Bus Queue — AzureServiceBusSieve

Abhineet Chaudhary
3 min readFeb 8, 2022

Problem Statement

Messages in a service bus queue are handled in a FIFO (First In First Out) fashion i.e. when a new message is received in the queue, all active consumers on the queue receive a copy of this message and then the message is destroyed from the scope of the queue buffer. Thus, an interested consumer that isn’t active at the time of the message being produced, can not get the message it’s looking for once it gets instantiated. The ideal solution would be to allow filtering of consumers based on their read requirements. This may appear to be a trivial issue at first but in a microservice architecture based on the principles of edge-computing where each service is instantiated at runtime, the issue can become a deal-breaker.

One possible solution to this challenge is to use Azure Service Bus Topics that follows a publisher and subscriber model. Each of these individual subscribers needs to be configured with specific filters. Though service bus allows for up to 2000 subscriptions for a single topic entity; it may not be enough in enterprise-grade environments. Also, messages in a topic can be subject to idle timeout and involve the risk of being dead-lettered if not consumed within the stipulated time frame.

Proposed Solution

This approach allows filtering azure service bus queue messages based on various attributes of the ServiceBusReceivedMessage class mitigating the possibility of pitfalls introduced using the Azure Service Bus Topics. It utilizes the receiveMessages() function with the PEEK_LOCK flag to allow read and subject to filtering criteria without destroying the actual message in the queue. The algorithm ensures that these peeked messages are marked as complete, only if they can permeate the filtering sieve — thus the project’s name :).

Algorithm

How to use

You can have a look at the Github repository and use it as per your use case.

The AzureServiceBusSieve class exposes a configuration class, AzureServiceBusQueueSieveConfiguration, containing the following parameterized attributes:

connectionString (String) — Azure Service Bus Queue Connection String — How to get?

queueName (String)— Azure Service Bus Queue Name

filter (String)— filtering criteria (defined below)

timeoutInMilliseconds (Integer)— the amount of time the service bus hooks onto the service bus messaging read API calls;

Filtering Criteria

Using the configuration object you can subscribe to the receiveFilteredMessages() method to receive a flux of ServiceBusReceivedMessage instances as shown below:

String connectionString = "<YOUR-CONNECTION-STRING>";
String queueName = "<YOUR-QUEUE-NAME>";
String filter = "<ONE-OF-THE-FILTERS>";
int timeoutInMilliseconds = 60 * 1000; //e.g. subscribe for 60 seconds
AzureServiceBusQueueSieveConfiguration azureServiceBusQueueSieveConfiguration = new AzureServiceBusQueueSieveConfiguration(connectionString, queueName, timeoutInMilliseconds, filter);AzureServiceBusQueueSieve azureServiceBusQueueSieve = new AzureServiceBusQueueSieve(azureServiceBusQueueSieveConfiguration);azureServiceBusQueueSieve.receiveFilteredMessages(azureServiceBusQueueSieveConfiguration).subscribe(message -> {//Enter your code here
System.out.println("message received by client-" + message.getBody().toString());
});

Scope

The implementation intends to provide an interface allowing filter-based subscription in a primarily producer-consumer mode of operation. The application can be further expanded to other queuing services like Apache Kafka or AWS SQS.

--

--