Utility API to Process Messages from any DLQ to Anypoint MQ

Rohit Singh
Another Integration Blog
5 min readDec 4, 2023

--

In this blog, I will create a general utility API that can be utilized to process the messages from any Dead Letter Queue (DLQ) and publish them to Anypoint Messaging Queue (Anypoint MQ).

What is DLQ?
The dead-letter queue (or undelivered message queue) is the queue to which messages are sent if they cannot be routed to their correct destination.

What is Anypoint MQ?
Anypoint MQ is a multi-tenant, cloud messaging service that enables customers to perform advanced asynchronous messaging scenarios between their applications.

Use Case
We want to develop a general utility API that can be called by the customer when traffic is low to process the DLQ messages and then push it to a specific Anypoint MQ for further processing of messages.

Solution
We will call the utility API endpoint and provide the DLQ name in the query parameters. We will use a REST API to consume messages from DLQ and then provide acknowledgment (ACK) to the DLQ. Once the ACK has been provided, then the consumed message will be stored in an object store. We will maintain the DLQ-to-MQ mapping in configuration properties so that messages can be pushed to the required Anypoint MQ only. To publish messages to Anypoint MQ, we retrieve the messages from the object store, push the message to Anypoint MQ, and then remove that message’s key from the object store.

Error Handling
We will be handling two different error scenarios:
1. If the current access token has expired, then we will attempt to obtain a new access token.
2. If any error occurs while publishing a message to Anypoint MQ, then we will send the message again to the DLQ and remove that key from the object store to prevent duplicate messages in the object store.

As a demonstration, we have created two queues in the Anypoint MQ platform: utility-poc-mq and utility-dlq.

DLQ and MQ
DLQ and MQ

In Anypoint Studio, we have created a single endpoint which anyone may invoke to process DLQ messages.

The flow for the public endpoint to invoke processing of DLQ messages
The /dlqPoc endpoint

Next, we invoke the access token endpoint of the Anypoint MQ REST API to get the access token, and then store the obtained token into a variable. The REST API call as a cUrl command would resemble the following:

curl — location ‘https://anypoint.mulesoft.com/accounts/oauth2/token' \
— header ‘Content-Type: application/json’ \
— data ‘{
“client_id”: “********************”,
“client_secret”: “*************************”,
“grant_type”: “client_credentials”
}’

We are again invoking a REST API, but this time we do it within a For Each scope, so that we can consume every one of the messages from the DLQ. To avoid an infinite recursion, we set a flag to false when there are no further messages to consume and an HTTP status code of 204 is returned from the REST API.
When messages are returned back from the REST API, a For Each scope is used to store each returned message in the object store (messageId: payload) and then an ACK is sent back to the DLQ via Anypoint MQ’s REST API.

curl — location ‘https://mq-us-east-1.anypoint.mulesoft.com/api/v1/organizations/{orgID}/environments/{envID}/destinations/utility-dlq/messages?pollingTime=20000&batchSize=10&lockTtl=10000' \
— header ‘Authorization: bearer 2ce4651b-0eb4–434c-9725-*********’
curl — location — request DELETE ‘https://mq-us-east-1.anypoint.mulesoft.com/api/v1/organizations/{orgID}/environments/{envID}/destinations/utility-dlq/messages/{msgID}' \
— header ‘Authorization: bearer 348a743a-6ed6–4535-bf59–**********’ \
— header ‘Accept: application/json’ \
— header ‘Content-Type: application/json’ \
— data ‘{
“messageId”: “044ad5e6–4f2c-46e5–9*************”,
“lockId”: “AQEBysDnFRm4hfhCinYbLnmI0bGtkABv1KPqpSZEeaV3pgtqjxdZghMx2Rr+7MnH0J6kZburbKO+UyCMNZzUtJP30Bibto0GKlaE2GjCk3RbWpOaOZvzhX7/3L6xDrc+pMOmSEXIjVZDoG4ETJ7eOPeH6i8m73PP/cUB9TOWCeYSCi2jVl3Hw83rVaus6y5Hpe4/NLii2Xt4CLHNw8w6xP6kAZWsKRP+wpZCyxswmN3Hqoj6XiM41rNeSyieBMWq4aUd2End9Rm2CBmj/QyCsI1ReX8e+L3GZNz558*****************************************************”
}’
The subflow for retrieving messages from the DLQ

Once the DLQ messages have been consumed and stored in the object store, we retrieve all the key/value pairs from the object store and push each message payload to a specific MQ by invoking a flow reference to publish-messages-to-mq-subflow. In this subflow, we use a For Each scope to publish a message for each key/value pair in the object store to Anypoint MQ, so long as the payload is not empty. After each message is published, its key is removed key from the object store via the Remove operation of the Object Store module. If any error occurs, the message payload is published back to the DLQ instead, and its key is removed from the object store.

The subflow for publishing messages to the MQ

We maintain DLQ -> Anypoint MQ mappings in a configuration file to specify the destination Anypoint MQ a particular DLQ should publish its undeliverable messages to.

A DLQ-to-MQ mapping in the configuration property file resembles the following:

mappingDlqToMq:
utility-dlq: "utility-poc-mq"
event-dlq: "event-queue"

Before publishing a message from a DLQ, we can dynamically select the destination MQ Name.

Using the DLQ-to-MQ mapping to dynamically specify the destination MQ

Conclusion

It is a good practice to create a handy, general utility API that can be called anytime to process any application DLQ messages and publish them to their respective Anypoint MQ. It empowers non-technical personnel with the ability to trigger a simple endpoint to reprocess the DLQ messages at a convenient time, such as when traffic is low on the system.

--

--