Moving Your JMS App from ActiveMQ to Apache Pulsar

DataStax
Building Real-World, Real-Time AI
15 min readApr 21, 2023

--

By Bill McLane

Image: Pixabay

When building mission-critical applications such as financial services systems, transportation and logistics control systems, or supply chain management systems, it’s becoming increasingly important to modularize components within those systems and implement a distributed microservices architecture. At the foundation of any distributed micro services architecture there has to be an efficient and reliable communications infrastructure between components. Without it, applications are left to struggle with how data between components can be exposed. This is where message queues and data streaming come into play.

Message queues are a common tool used to enable communication between disparate application components, and have been at the foundation of messaging technologies from the very beginning. While there are many approaches to message queueing, most have a message broker that sits between the application components and acts as the queue manager for distribution of the messages to given components.

In Java, there’s a messaging specification called Java Message Service (now renamed as Jakarta Message Service, or JMS) that defines the API interaction to asynchronously send and receive messages between Java components. A commonly used open source implementation of JMS is ActiveMQ; it enables legacy applications with JMS and queuing functionality.

ActiveMQ has worked well for smaller scale deployments, but if your JMS applications handle millions of messages between your system components, you need modern tools and infrastructure capable of dealing with real-time data streaming with consistent latency at the highest throughput. DataStax Starlight for JMS and DataStax Astra Streaming are the best choice for meeting this criterion.

Leveraging Apache Pulsar, Starlight for JMS is an open-source JMS implementation that enables systems to run either on-premises or in a cloud environment using the highly performance Pulsar distributed messaging and streaming architecture. Astra Streaming is a highly scalable SaaS offering that provides robust data storage, event streaming, message queuing, and publisher-subscriber messaging features, all in one. Together Starlight for JMS and Astra Streaming provide the ability to upgrade your existing JMS applications to modern, highly performant, highly scalable infrastructure that can handle all of your JMS workloads as well as provide applications with next-generation stream processing functionality.

The challenge of modernization exists across every industry. There’s a pressing need to provide application infrastructure with a seamless path to leverage existing implementations while also providing availability of new technologies. Take for example the demands of a banking system that needs to onboard thousands of new branches and clients due to a recent acquisition. Or a supply chain management system that needs to integrate with a legacy JMS system for visibility into a significant portion of the workload. With Starlight for JMS and Astra Streaming, these functions of integration, scalability and performance are provided to your existing JMS infrastructure with no compromise or changes to your existing JMS implementation. Let’s explore how this can be done.

How to make the move

This article teaches you how to upgrade your existing JMS app that uses ActiveMQ as a message queue by dropping in Starlight for JMS on Astra Streaming. In this example, you’ll create a Spring app with the simple function that sends a detailed car order message to test the app.

What you need

To complete this tutorial, you need the following:

  • A basic understanding of the Java programming language.
  • Spring knowledge is a plus, though not necessary.
  • JDK — for download and installation instructions follow link
  • Maven — for download and installation instructions follow link
  • ActiveMQ — for download and installation instructions follow link
  • An Astra Streaming account
  • Quick start instructions linked below in the section “Replacing the App with Starlight for JMS on Astra Streaming”
  • Knowledge of message queuing in system design
  • A code editor

All examples for this tutorial are shown based on a Unix-based system like Linux or MacOS. You can also view the final project code at any time.

Building an app to use ActiveMQ JMS

You begin by creating a JMS ActiveMQ app, running it, and then replacing it with Starlight for JMS on Astra Streaming.

Start by opening a terminal window, creating a folder called activemqdemo; this will be your working directory and we will reference this top -level directory going forward as $ACTIVEMQDEMO. You can create this working directory and set the environment variable ACTIVEMQDEMO to this working directory using the following terminal commands:

 mkdir activemqdemo
cd activemqdemo
export ACTIVEMQDEMO=`pwd`

Next, create a file called pom.xml with a text/code editor and add the code below. It contains the configurations and dependencies needed for the app:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.7.3</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.example</groupId>
<artifactId>activemqdemo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>activemqdemo</name>
<description>Demo project for using JMS ActiveMQ and replacing it with DataStax Starlight for JMS on Astra Streaming</description>
<properties>
<java.version>17</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-assembly-plugin</artifactId>
<version>3.3.0</version>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass>com.example.activemqdemo.ActivemqdemoApplication</mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>assemble-all</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

Code segment 1: pom.xml file

Next, create a src directory where you add the Java code and the resources files:

 mkdir -p $ACTIVEMQDEMO/src/main
mkdir $ACTIVEMQDEMO/src/main/java
mkdir $ACTIVEMQDEMO/src/main/resources

Now, create the root package (com.example.activemqdemo):

 mkdir -p $ACTIVEMQDEMO/src/main/java/com/example/activemqdemo

Create three more packages for the ActiveMQ configuration (mqconfig), message helpers (messagehelpers), and model classes (models):

 mkdir $ACTIVEMQDEMO/src/main/java/com/example/activemqdemo/mqconfig
mkdir $ACTIVEMQDEMO/src/main/java/com/example/activemqdemo/messagehelpers
mkdir $ACTIVEMQDEMO/src/main/java/com/example/activemqdemo/models

Adding the Model Classes — CarOrder.java and CustomerOrder.java

These are plain old Java object classes to help organize your data. In the $ACTIVEMQDEMO/src/main/java/com/example/activemqdemo/models directory, create two new files: CarOrder.java and CustomerOrder.java.

Add the code below to the CarOrder.java file:

package com.example.activemqdemo.models;
public class CarOrder {
private String carMake;
private Integer passengerCapacity;
private Integer noOfUnitsOrdered;
public CarOrder(String carMake, Integer passengerCapacity, Integer noOfUnitsOrdered) {
this.carMake = carMake;
this.passengerCapacity = passengerCapacity;
this.noOfUnitsOrdered = noOfUnitsOrdered;
}
public String getCarMake() {
return carMake;
}
public void setCarMake(String carMake) {
this.carMake = carMake;
}
public Integer getPassengerCapacity() {
return passengerCapacity;
}
public void setPassengerCapacity(Integer passengerCapacity) {
this.passengerCapacity = passengerCapacity;
}
public Integer getNoOfUnitsOrdered() {
return noOfUnitsOrdered;
}
public void setNoOfUnitsOrdered(Integer noOfUnitsOrdered) {
this.noOfUnitsOrdered = noOfUnitsOrdered;
}
}

Code segment 2: CarOrder.java file

Then, add the code below to CustomerOrder.java:

package com.example.activemqdemo.models;
public class CustomerOrder {
private Integer orderId;
private String customerName;
private String countryOfOrigin;
public CustomerOrder(Integer orderId, String customerName, String countryOfOrigin) {
this.orderId = orderId;
this.customerName = customerName;
this.countryOfOrigin = countryOfOrigin;
}
public Integer getOrderId() {
return orderId;
}
public void setOrderId(Integer orderId) {
this.orderId = orderId;
}
public String getCustomerName() {
return customerName;
}
public void setCustomerName(String customerName) {
this.customerName = customerName;
}
public String getCountryOfOrigin() {
return countryOfOrigin;
}
public void setCountryOfOrigin(String countryOfOrigin) {
this.countryOfOrigin = countryOfOrigin;

Code segment 3: CustomerOrder.java file

Adding the Configuration Class — MQConfigHelper.java

You will now add the necessary credentials for connecting to ActiveMQ. In this section we are going to walk through different sections of the code, the full code block for the MQConfigHelper.java file is provided at the end of this section.

First we are going to create a new file called MQConfigHelper.java in the $ACTIVEMQDEMO/src/main/java/com/example/activemqdemo/mqconfig directory:

package com.example.activemqdemo.mqconfig;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
import javax.jms.JMSException;
import java.util.HashMap;
import java.util.Map;

Add the @EnableJMS and @Configuration annotation before the class declaration, as shown below:

@Configuration
@EnableJms
public class MQConfigHelper {

Next, set ActiveMQ broker connection credentials using constants for the URL (BROKER_URL), username (BROKER_USERNAME), and password (BROKER_PASSWORD). ActiveMQ’s default TCP port is 61616. You use TCP since you’ll interact with the transport layer to pass the messages:

final String BROKER_URL = "tcp://localhost:61616";
//Enter your ActiveMQ password and username
final String BROKER_USERNAME = "";
final String BROKER_PASSWORD = "";

Now, create a method called activemqConnectionFactory where you set the credentials. The setTrustAllPackages method instructs ActiveMQ to trust classes in all packages in the application. The method returns the ActiveMQ Connection Factory (“factory”) instance. Note the @Bean annotation, which sets the method to produce a Bean:

@Bean
public ActiveMQConnectionFactory activemqConnectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setTrustAllPackages(true);
factory.setBrokerURL(BROKER_URL);
factory.setPassword(BROKER_USERNAME);
factory.setUserName(BROKER_PASSWORD);
return factory;
}

To convert Java objects to JMS messages, you need a converter. Add a method using the following code, which converts the Java objects to text messages:

@Bean
public MessageConverter messageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
return converter;
}

Last, use the JMSTemplate class to configure the settings you’ve added to create a JMS ActiveMQ connection. You use the setDeliveryPersistent method to instruct ActiveMQ to persist a message in case of failure.

@Bean
public JmsTemplate templateConfig() {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(activemqConnectionFactory());
jmsTemplate.setMessageConverter(messageConverter());
jmsTemplate.setDeliveryPersistent(true);
return jmsTemplate;
}

The full code block for MQConfigHelper.java is here:

package com.example.activemqdemo.mqconfig;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.jms.annotation.EnableJms;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.jms.support.converter.MappingJackson2MessageConverter;
import org.springframework.jms.support.converter.MessageConverter;
import org.springframework.jms.support.converter.MessageType;
import javax.jms.JMSException;
import java.util.HashMap;
import java.util.Map;
@Configuration
@EnableJms
public class MQConfigHelper {
final String BROKER_URL = "tcp://localhost:61616";
//Enter your ActiveMQ password and username
final String BROKER_USERNAME = "";
final String BROKER_PASSWORD = "";
@Bean
public ActiveMQConnectionFactory activemqConnectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setTrustAllPackages(true);
factory.setBrokerURL(BROKER_URL);
factory.setPassword(BROKER_USERNAME);
factory.setUserName(BROKER_PASSWORD);
return factory;
}
@Bean
public MessageConverter messageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
return converter;
}
@Bean
public JmsTemplate templateConfig() {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(activemqConnectionFactory());
jmsTemplate.setMessageConverter(messageConverter());
jmsTemplate.setDeliveryPersistent(true);
return jmsTemplate;
}
}

Code segment 4: MQConfigHelper.java file

Adding the message producers and consumers

In JMS, producers are the objects that create messages and initiate the transmission. Consumers are the objects that receive and process these messages. You now need to add your producers and consumers.

In the $ACTIVEMQDEMO/src/main/java/com/example/activemqdemo/messagehelpers directory, create a new Java file called MessageSender.java. This is the producer. Add this code to it:

package com.example.activemqdemo.messagehelpers;
import com.example.activemqdemo.models.CarOrder;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jms.core.JmsTemplate;
import org.springframework.stereotype.Component;
import javax.jms.TextMessage;
@Component
public class MessageSender {
String queueName = "carQueue";
@Autowired
JmsTemplate jmsTemplate;
public void sendToQueue() {
try {
CarOrder carOrder = new CarOrder("Mercedes GLE", 7, 74);
processMessageForSending(carOrder);
} catch (Exception e) {
System.out.println(e);
}
}
public void processMessageForSending(Object message) throws JsonProcessingException {
String jsonObj = new ObjectMapper().writer().withDefaultPrettyPrinter().writeValueAsString(message);
jmsTemplate.send(queueName, messageCreator -> {
TextMessage txtMessage = messageCreator.createTextMessage();
txtMessage.setText(jsonObj);
return txtMessage;
});
}
}

Code segment 5: MessageSender.java file

The sendToQueue method sends a carOrder object to a queue called carQueue using the helper method processMessageForSending. The helper method first converts the Java object to a JSON object, from which it uses JMSTemplate to send it. The second parameter in the JMSTemplate send method is a lambda expression that creates a TextMessage from the JSON object.

For the consumer, add the code below in a file called MessageConsumer.java that resides in the same folder as the producer:

package com.example.activemqdemo.messagehelpers;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
@Component
public class MessageConsumer {
@JmsListener(destination = "carQueue")
public void queueMessageReceiver(Message receivedMessage) {
try {
processMessageReceived(receivedMessage);
} catch (Exception e) {
System.out.println(e);
}
}
public void processMessageReceived(Message messageArg) throws JMSException {
System.out.println("\n\nActiveMQ Message as received: " + messageArg);
if (messageArg instanceof TextMessage) {
TextMessage textMessage = (TextMessage) messageArg;
String processedMessage = textMessage.getText();
System.out.print("-----------------------------------------");
System.out.println("\n\nMessage after processing: " + processedMessage);
}
}
}

Code segment 5: MessageSender.java file

The sendToQueue method sends a carOrder object to a queue called carQueue using the helper method processMessageForSending. The helper method first converts the Java object to a JSON object, from which it uses JMSTemplate to send it. The second parameter in the JMSTemplate send method is a lambda expression that creates a TextMessage from the JSON object.

For the consumer, add the code below in a file called MessageConsumer.java that resides in the same folder as the producer:

package com.example.activemqdemo.messagehelpers;
import org.springframework.jms.annotation.JmsListener;
import org.springframework.stereotype.Component;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.TextMessage;
@Component
public class MessageConsumer {
@JmsListener(destination = "carQueue")
public void queueMessageReceiver(Message receivedMessage) {
try {
processMessageReceived(receivedMessage);
} catch (Exception e) {
System.out.println(e);
}
}
public void processMessageReceived(Message messageArg) throws JMSException {
System.out.println("\n\nActiveMQ Message as received: " + messageArg);
if (messageArg instanceof TextMessage) {
TextMessage textMessage = (TextMessage) messageArg;
String processedMessage = textMessage.getText();
System.out.print("-----------------------------------------");
System.out.println("\n\nMessage after processing: " + processedMessage);
}
}
}

Code segment 6: MessageConsumer.java file

The destination argument in the @JmsListener annotation informs the consumer from which queue it will receive its message. The processMessageReceived method is a helper method to the queueMessageReceiver method. You use it to extract the text message from the verbose message returned by ActiveMQ and display it.

Running the app

In the src/main/java/com/example/activemqdemo directory, create a new file called ActivemqdemoApplication.java. This is the entry point for the application. Place the code below in that ActivemqdemoApplication.java file:

package com.example.activemqdemo;
import com.example.activemqdemo.messagehelpers.MessageSender;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.jms.annotation.EnableJms;
@SpringBootApplication
@EnableJms
public class ActivemqdemoApplication {
public static void main(String[] args) throws JsonProcessingException {
ConfigurableApplicationContext appContext = SpringApplication.run(ActivemqdemoApplication.class, args);
MessageSender messageSender = appContext.getBean(MessageSender.class);
messageSender.sendToQueue();
}
}

Code segment 7: Activemqdemo.java file

It first creates a running app context. You get the producer bean from the context and call the sendToQueue method.

Start by compiling your activemqdemo app using the commands below:

cd $ACTIVEMQDEMO
mvn clean install -DSkipTests

Validate that everything built properly; you should see a BUILD Success at the end.

Next we need to start the ActiveMQ broker; if you installed ActiveMQ earlier you can start the default broker in a separate terminal window with the command below:

activemq start

Finally we will run our activemqdemo application with the command below in the same window we compiled our code in:

java -jar target/activemqdemo-0.0.1-SNAPSHOT.jar

If everything runs successfully, you should get this output:

Replacing the app with Starlight for JMS on Astra Streaming

Follow the Astra Streaming QuickStart instructions to start using Astra Streaming and get the necessary credentials.

To use Starlight in your app, you will need to add its dependency to the pom.xml file, if you downloaded it this dependency is already added:

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-activemq</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.datatype</groupId>
<artifactId>jackson-datatype-jsr310</artifactId>
<version>2.13.3</version>
</dependency>
<dependency>
<groupId>com.datastax.oss</groupId>
<artifactId>pulsar-jms-all</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.30</version>
</dependency>
<dependency>

Code segment update 1: pom.xml file

The next step is to add the credentials to connect to Pulsar just as you did for ActiveMQ. We are going to do this in the MQConfigHelper class by adding a method called pulsarConnectionFactory:

cd $ACTIVEMQDEMO/src/main/java/com/example/activemqdemo/mqconfig

Open the MQConfigHelper.java file and add the lines in bold below:

public class MQConfigHelper {
String BROKER_URL = "tcp://localhost:61616";
//Enter your ActiveMQ password and username
final String BROKER_USERNAME = "";
final String BROKER_PASSWORD = "";
final String token = "<YOUR_ASTRA_STREAMING_TOKEN>";


@Bean
public ActiveMQConnectionFactory activemqConnectionFactory() {
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setTrustAllPackages(true);
factory.setBrokerURL(BROKER_URL);
factory.setPassword(BROKER_USERNAME);
factory.setUserName(BROKER_PASSWORD);
return factory;
}


public PulsarConnectionFactory pulsarConnectionFactory() throws JMSException {
Map<String, Object> configuration = new HashMap<>();
configuration.put("webServiceUrl", "https://pulsar-gcp-useast1.api.streaming.datastax.com");
configuration.put("brokerServiceUrl", "pulsar+ssl://pulsar-gcp-useast1.streaming.datastax.com:6651");
configuration.put("authPlugin", "org.apache.pulsar.client.impl.auth.AuthenticationToken");
configuration.put("authParams", token);
PulsarConnectionFactory factory = new PulsarConnectionFactory(configuration);
return factory;
}



@Bean
public MessageConverter messageConverter() {
MappingJackson2MessageConverter converter = new MappingJackson2MessageConverter();
converter.setTargetType(MessageType.TEXT);
return converter;
}


@Bean
public JmsTemplate templateConfig() {
JmsTemplate jmsTemplate = new JmsTemplate();
jmsTemplate.setConnectionFactory(activemqConnectionFactory());
jmsTemplate.setMessageConverter(messageConverter());
jmsTemplate.setDeliveryPersistent(true);
return jmsTemplate;
}


}

Code segment update 4: MQConfigHelper.java file

Adding the DataStax helper class

Next, create a new file called DatastaxHelper.java in the $ACTIVEMQDEMO/src/main/java/com/example/activemqdemo/messagehelpers directory and add the code below to it:

package com.example.activemqdemo.messagehelpers;
import com.datastax.oss.pulsar.jms.PulsarConnectionFactory;
import com.example.activemqdemo.models.CarOrder;
import com.example.activemqdemo.mqconfig.MQConfigHelper;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.stereotype.Component;
import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.Queue;
@Component
public class DatastaxHelper {
MQConfigHelper mqConfigHelper = new MQConfigHelper();
PulsarConnectionFactory factory = mqConfigHelper.pulsarConnectionFactory();
public DatastaxHelper() throws JMSException {
}
public void astraSendAndReceive() {
try (JMSContext context = factory.createContext()) {
CarOrder carOrder = new CarOrder("Mercedes GLE", 7, 20);
String jsonObj = new ObjectMapper().writer().withDefaultPrettyPrinter().writeValueAsString(carOrder);
Queue queue = context.createQueue("<your-topic>”);
context.createConsumer(queue).setMessageListener(message -> {
try {
System.out.print("\n-----------------------------------");
System.out.println("\n\nReceived: " + message.getBody(String.class));
System.out.println("\n-----------------------------------\n");
} catch (Exception err) {
err.printStackTrace();
}
});
context.createProducer().send(queue, jsonObj);
} catch (JsonProcessingException e) {
throw new RuntimeException(e);
}
}
}

Code segment 8: DatastaxHelper.java file

This code starts by creating a config helper class object (mqConfigHelper) from where it accesses the Pulsar connection. The constructor throws a JMSException since it needs to make a JMS connection to Pulsar. After that, a method called astraSendAndReceive first creates a JMSContext from the Pulsar connection. After creating a JSON object, it creates a queue using the JMSContext:

Queue queue = context.createQueue("<your-topic>");

For example:

Queue queue = context.createQueue(“persistent://my-stream-jms-wpm2023/my-namesp
ace/my-topic”)

You will need to replace the <your-topic> section of the code with your topic full name, which you’ll find in the Astra Streaming Portal under the “namespaces and topics” section by clicking on the namespace you created in the quick-start.

Updating ActivemqdemoApplication.java to use Astra Streaming

The next block of code creates a consumer using the createConsumer method while passing in the queue created as a parameter. Unlike what you did previously, you don’t set an annotation for the JMS message listener. You only call the setMessageListener method and pass in the message. Here, you use a lambda expression to extract the message instead of the verbose one returned by Pulsar.

For the entry point class ActivemqdemoApplication, we need to add the import for the DatastaxHelper and replace the body of the main method with the bold code below, notice we comment out the first three lines using our ActiveMQ MessageSender:

package com.example.activemqdemo;
import com.example.activemqdemo.messagehelpers.MessageSender;
import com.example.activemqdemo.messagehelpers.DatastaxHelper;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.ConfigurableApplicationContext;
import org.springframework.jms.annotation.EnableJms;
@SpringBootApplication
@EnableJms
public class ActivemqdemoApplication {
public static void main(String[] args) throws JsonProcessingException {
// ConfigurableApplicationContext appContext = SpringApplication.run(ActivemqdemoApplication.class, args);
// MessageSender messageSender = appContext.getBean(MessageSender.class);
// messageSender.sendToQueue();
ConfigurableApplicationContext appContext = SpringApplication.run(ActivemqdemoApplication.class, args);
DatastaxHelper dComponent = appContext.getBean(DatastaxHelper.class);
dComponent.astraSendAndReceive();

Code segment update 7: Activemqdemo.java file

Once all the updates have been made all we have to do is recompile the code:

cd $ACTIVEMQDEMO
mvn clean install -DSkipTests

After Maven outputs that our project has been successfully built, run the application again using:

java -jar target/activemqdemo-0.0.1-SNAPSHOT.jar

You see this output:

If you want to clean up the code, you can now remove the spring boot ActiveMQ dependency because you will no longer need it using Starlight with Astra Streaming. Also, the messagehelpers package is no longer useful because you no longer need two separate classes to create a producer and a consumer object. And lastly, the configuration helper class can remain with the pulsarConnectionFactory method only.

Benefits of Starlight for JMS

There are many benefits to moving your JMS applications to Starlight for JMS.

  • Increased performance Starlight can support up to 1 million messages in a second with a consistent mean low latency below ten milliseconds.
  • Fully JMS compliant and backward compatibility feature Starlight is fully TCK certified for JMS 1.0 and 2.0 compliancy and includes all the TCK tests as a part of the open source packages.
  • Horizontally scalable Starlight implements JMS natively on top of Pulsar, which allows you to scale your existing messaging infrastructure horizontally without affecting other components. Limitations of primary/backup models inherent in standard JMS implementations have been upgraded to leverage Pulsar’s decoupled, distributed architecture for massive scalability.
  • Less code As this tutorial demonstrated, it requires far less code than a solution like JMS ActiveMQ, reducing complexity and technical debt.
  • Modern authentication mechanism: Astra Streaming provides native support for JWT as well as integrations into single sign-on solutions for more tightly coupled authentication management.

Starlight for JMS is just one component of Starlight, which provides a common framework for leveraging Pulsar as the centralized data distribution platform for JMS, AMQP, Kafka, and Pulsar based operations without compromising on what applications have been built to use. There are many other benefits that Starlight offers including reduced cost of ownership, message replay functionality, and native integration across solutions like JMS, Kafka and AMQP. For more information, please see the Starlight for JMS documentation.

Wrapping up

This brief walkthrough showed you how easy it is to replace your JMS ActiveMQ application with DataStax Starlight for JMS. It started by creating a simple Spring app with a JMS ActiveMQ integration. It used a configuration class to set variables needed to connect to Active MQ and two components for the producers and senders. Later, it walked through replacing ActiveMQ with Starlight for JMS on Astra Streaming.

This tutorial showed you that with Starlight for JMS you can gain some significant efficiencies, reducing code and complexity without compromising on your applications usage of the JMS specification.

As with ActiveMQ, you used a config class and a component calls, however since Starlight for JMS provides native producers and consumers built in, you only had to use a single component class.

In this tutorial we looked at using Starlight for JMS as a native function of Astra Streaming, DataStax’s fully managed cloud based service for messaging and streaming operations. However Starlight is not limited to just Astra Streaming, in addition DataStax provides Starlight and Pulsar as a self-hosted/on-premise solution as well allowing you to upgrade not only your cloud native infrastructure but also your on-premise/self-hosted infrastructure to leverage the power of Pulsar and streamline your messaging and streaming operations into a common infrastructure. From JMS to Kafka-based applications, Astra Streaming can help you level up your data distribution layer for data-in-motion weather you are using JMS, Kafka AMQP or native Pulsar, Astra Streaming provides you with the ability to seamlessly connect and distribute data across your organization with ease.

--

--

DataStax
Building Real-World, Real-Time AI

DataStax provides the real-time vector data tools that generative AI apps need, with seamless integration with developers' stacks of choice.