Using ActiveMQ with Node.js, STOMP, and HTTP

ActiveMQ is written in Java and has a full-fledged JMS client, but its STOMP support and RESTful API let us interface with the messaging queue quite nicely using Node.js.

With Node, you’ll find that STOMP is a great choice of protocol for subscribing to queues, while HTTP is ideal for sending messages (and is particularly great for messages that are scheduled to be delivered at a set time due to the info-rich response we receive from the queue upon scheduling a message for delivery).

Let’s start with the basics. Using Russel Haering and easternbloc’s very handy node-stomp-client, we can create a class in Node.js that allows us to subscribe to the ActiveMQ queue of our choice. Here’s how this might look:

'use strict';
var Stomp = require('stomp-client');
var MessageConsumer = function MessageConsumer(){};
MessageConsumer.prototype.init = function init(){
var stompClient = new Stomp('127.0.0.1', 61613, 'user', 'pw');
stompClient.connect(function(sessionId){
stompClient.subscribe('/queue/queue1', function(body, headers){
/*
this callback function is invoked whenever our a client receives a message.
*/
}
});
};
module.exports = new MessageConsumer();

Just call the init() method on this class to subscribe to a queue. (It’s also worth mentioning here that the way we refer to queues in STOMP is different than how we would do so with JMS — with STOMP, we need to prepend the string ‘/queue/’ to the name of the queue itself.)

Posting a message with STOMP is easy. Let’s create a class that can be used to send a message below:

'use strict';
var Stomp = require(‘stomp-client’);
var MessageProducer = function MessageProducer(){
this._stompClient = null;
};
MessageProducer.prototype.init = function init(){
this._stompClient = new Stomp(‘127.0.0.1’, 61613, ‘user’, ‘pw’);
this._stompClient.connect(function(sessionId){
console.log(“STOMP client connected.”);
});
};
MessageProducer.prototype.sendMessage = function sendMessage(messageToPublish){
this._stompClient.publish('/queue/queue1', messageToPublish);
};
module.exports = new MessageProducer();

You might notice a major limitation of the approach for publishing messages used above — there’s no way to verify receipt of the message from the broker! STOMP 1.2 does give us something called a ‘receipt’ frame (here’s a Node.js client that implements it), and while it helps us timing-wise with processes that must occur after a message has been sent to the broker and are contingent upon successful receipt of the message by that broker (say, for instance, you want to write a record of that message to a database somewhere), it doesn’t send back any detailed data having to do with the message itself.

Imagine a scenario where you schedule a message for future delivery, but need some mechanism that would allow you to cancel delivery of that message (ActiveMQ does have a nifty scheduler, which you can read about here). The problem is that you can only delete individual messages using the internal message ID that ActiveMQ’s message scheduler generates, and STOMP gives you no way to retrieve this ID immediately after scheduling a message.

It’s still possible to delete a scheduled message using STOMP. Here’s what you’d need to do (there are quite a few hoops to jump through, but what I’m about to do is the same basic method described here but with an external Node.js client as opposed to JMS):

  • First, make sure to send along some other kind of identifying information with the headers for the message we publish. For example, can use ActiveMQ’s correlation-id property and give it a value of a UUID that can function as the primary key for the record of the scheduled message we save to a database. To accomplish this, you can modify the sendMessage method on your MessageProducer class to look like the following:
MessageProducer.prototype.sendScheduledMessage = function sendScheduledMessage(messageToPublish, scheduledTime, correlationId){
this._stompClient.publish('/queue/queue1', messageToPublish, {
AMQ_SCHEDULED_DELAY : scheduledTime,
correlation-id : correlationId
});
};
  • Once ActiveMQ has a few scheduled messages queued up for timed delivery, send a message that initiates a “browse” request. To do this, you’ll need to use your MessageProducer class’s internal STOMP client to publish a message to the ActiveMQ.Scheduler.Management topic (so the first argument you’ll pass to stompClient.subscribe is this string: “/topic/ActiveMQ.Scheduler.Management”). The message you send to initiate a browse must include the following headers:
{
reply-to: "/queue/browsedMessages",
AMQ_SCHEDULER_ACTION: "AMQ_SCHEDULER_ACTION_BROWSE"
}

(Note: the queue ‘browsedMessages’ can be called anything — you’re just creating a queue that will receive copies of any message currently in ActiveMQ’s scheduler.)

  • Next, create another STOMP client and subscribe it to this ‘browsedMessages’ queue. It will consume all of the messages that end up in the browsedMessages queue, and once it consumes that copy of the message you want to delete (you verify the ID of the message you want to delete by inspecting the headers for the correlation-id you sent along with the message when it was scheduled), take its corresponding ‘message-id’ property (you’ll also find this in the response headers) and send this along as the value for “scheduledJobId” in the headers of a new message to ActiveMQ, which will be used to delete the desired message from the scheduler. Here’s what the headers for this subsequent request will look like:
{
AMQ_SCHEDULER_ACTION : "REMOVE",
"scheduledJobId" : scheduledJobId
}
  • The example code below should extend your MessageProducer class with methods that will allow you to pull this off:
MessageProducer.prototype._sendDeleteMessageRequest = function _sendDeleteMessageRequest(scheduledJobId){
this._stompClient.publish(“/topic/ActiveMQ.Scheduler.Management”, null, {
AMQ_SCHEDULER_ACTION : “REMOVE”,
“scheduledJobId” : scheduledJobId
});
};
MessageProducer.prototype._sendBrowseRequest = function _sendBrowseRequest(){
this._stompClient.publish(“/topic/ActiveMQ.Scheduler.Management”, null, {
“reply-to” : “/queue/browsedMessages”,
AMQ_SCHEDULER_ACTION : “AMQ_SCHEDULER_ACTION_BROWSE”
});
};
MessageProducer.prototype.deleteMessage = function deleteMessage(correlationId){
var browserConsumer = new Stomp(‘127.0.0.1’, 61613, ‘user’, ‘pw’);
var self = this;
return new Promise(function(resolve, reject){
browserConsumer.connect(function(sessionId){
browserConsumer.subscribe(destination, function(body, headers) {
if (headers[‘correlation-id’] === correlationId){
browserConsumer.disconnect(function(){
return resolve(headers[‘message-id’]);
});
}
});
process.nextTick(function(){
self._sendBrowseRequest();
});
});
}).then(function(headers){
self._sendDeleteMessageRequest(headers.scheduledJobId);
});
};

If this whole process struck you as a huge, suboptimal pain in the ass, don’t worry — there’s hope.


Scheduling and deleting with HTTP & REST

Here’s a really easy, promisified way to use HTTP to schedule a message, assuming you have ActiveMQ’s REST API enabled:

'use strict';
var request = require('request');
var Promise = require('bluebird');
var MessageScheduler = function MessageScheduler(){};
MessageScheduler.prototype.scheduleMessage = function scheduleMessage(messageToSend, delay){
var promisifiedPostRequest = Promise.promisify(request.post);
return promisifiedPostRequest(
'http://localhost:8080/demo/message/queue1?type=queue',
{
form : {
body: JSON.stringify(notification),
AMQ_SCHEDULED_DELAY : delay
}
})
.then(function(response){
return response.headers.messageid;
});
};

See that bit in the .then() block? This is where we can get access to the message ID that ActiveMQ assigns to the scheduled message which, incidentally, has the same value as the message’s scheduledJobId.

That’s the beauty of HTTP: it gives us access to this very important property in the response of the request, which means we don’t have to issue a ‘browse’ request to dig it up later.

Now that we have this value, using it to delete the scheduled message it identifies is easy:

'use strict';
var Promise = require('bluebird');
var request = require('request');
var MessageDeleter = function MessageDeleter(messageID){
var promisifiedPostRequest = Promise.promisify(request.post);
return promisifiedPostRequest(
'http://localhost:8080/demo/message/ActiveMQ.Scheduler.Management?type=topic',
{
form: {
AMQ_SCHEDULER_ACTION : 'REMOVE',
scheduledJobId : scheduledJobId
}
}).then(function(response){
//do any post-deletion processes you need to here
return response;
}
}

With REST, that’s all there is to it.

The only major limitation to deleting messages (and this is true for both STOMP and HTTP) is the fact that we do not receive confirmation from ActiveMQ that the message was in fact deleted — just that the request to delete this message was received and processed. The message targeted for deletion may have even been delivered to a consumer before the message containing the delete request even reached the queue.

Regardless, this gets us a whole lot closer to having a reliable, quick, Node-based means of interfacing with ActiveMQ’s message scheduling capabilities.