A High-Speed Data Ingestion Microservice in Java Using MQTT, AMQP, and STOMP

Juarez Junior
Oracle Developers
Published in
11 min readOct 31, 2023
A high-speed data ingestion microservice in Java

by Juarez Junior

Introduction

In a couple of previous blog posts, I’ve introduced Java developers to the Reactive Streams Ingestion (RSI) library.

Part 1 introduced the Java Library for Reactive Stream Ingestion (RSI) and its API. Part 2 explored another streaming scenario with RSI and the Oracle Autonomous Transaction Processing (ATP) Database — a fully automated database service optimized to run concurrent transactional workloads.

Besides, we’ve explored the use of Java Virtual Threads in combination with the ReactiveStreamsIngestion component. That’s interesting as the Virtual Threads feature is now released in JDK 21.

This blog post will combine both the RSI library, Virtual Threads and several messaging-related protocols such as AMQP, MQTT and STOMP along with ActiveMQ to create a high-speed data ingestion microservice in Java.

So without further ado, let’s get started!

Prerequisites

A quick introduction to MOM and ActiveMQ

A MOM (Message-Oriented Middware) is a message broker component that allows communication and data exchange (messages).

This involves passing data between applications using a communication channel that carries self-contained units of information (messages).

In a MOM-based communication environment, messages are sent and received asynchronously. ActiveMQ is an example of a MOM broker. ActiveMQ is perfect for our example mainly because it implements all the protocols we have used in our microservice.

This blog post won’t explain everything both MOM and ActiveMQ specifically. If you want to explore ActiveMQ in detail, there are two flavours of it currently, so I recommend checking the official documentation for both ActiveMQ “Classic” and ActiveMQ Artemis.

ActiveMQ

It’s interesting to highlight that with ActiveMQ you can integrate your multi-platform applications using the ubiquitous AMQP protocol, exchange messages between your web applications using STOMP over Websockets, and manage your IoT devices using MQTT. So, ActiveMQ offers the power and flexibility to support the vast majority of messaging-related use cases.

It’s beyond the scope of this tutorial to provide an extensive explanation of the protocols we’ve used in our implementation, so please check the official documentation for each of them — AMQP, MQTT and STOMP.

A refresher on the Reactive Streams Ingestion Library (RSI)

Beyond my previous blog posts about the RSI library as cited in the introduction section above, let’s just have a quick refresher on RSI.

The RSI library provides a service that addresses the use case of massive volumes of data injected by a large number of clients, and not blocked waiting for a synchronous response from the database.

Using RSI requires the following jars in the classpath:

  • rsi.jar
  • ojdbc11.jar
  • ucp.jar
  • ons.jar

You can get these from Maven Central or download them from the OTN download page. Nevertheless, the code sample already has all the dependencies included in the respective Maven POM file as required.

The Java Message Service (JMS) API

From a Java standpoint, you can use the JMS (Java Message Service) API to integrate with ActiveMQ in a seamless way.

In a nutshell, the Java Message Service API is an enterprise Java messaging standard that allows applications to create, send, receive, and read messages using reliable, asynchronous, loosely coupled communication.

The key concepts and components related to JMS are listed below:

  • Message: The fundamental unit of communication in JMS. It can contain various data types, including text, binary data, or serialized Java objects.
  • Producer: Producers create messages and send them to a specific JMS destination, which can be a queue or topic.
  • Consumer: Consumers subscribe to a queue or topic and consume messages when available.
  • ConnectionFactory: A JMS resource used to establish a connection to a JMS message broker.
  • Session: A session represents a single-threaded context for producing or consuming JMS messages, typically comprising support to transactions and message acknowledgement mechanisms.
  • MessageListener: MessageListeners are registered with a consumer to process incoming messages asynchronously.

Besides, JMS supports two messaging models: point-to-point and publish-subscribe.

  • Point-to-point: a one-to-one messaging model where a JMS message is sent from one producer to a queue and then consumed by a single consumer.
  • Publish-subscribe: a one-to-many messaging model where a producer sends a message to a topic, and all consumers subscribed to that topic consume the JMS message.

This blog post uses the publish-subscribe model with a JMS Topic as represented below.

If you need a more comprehensive introduction to messaging with Java, it may be interesting for you to have a look at some technical resources related to Jakarta Messaging as it is implemented by the different Jakarta EE releases and under the Eclipse Enterprise for Java (EE4J) umbrella.

JMS works with several message brokers like Apache ActiveMQ, our choice for this blog post, allowing you to choose the best fit for your requirements.

Configuring and starting up ActiveMQ

ActiveMQ is available from the Apache ActiveMQ website. Click on the link to the current release and you will find the tarballs and the zip files. Once you have downloaded and expanded these, you’re ready to go.

This blog post uses the Windows OS, so after downloading and extracting the contents of the respective zip file to your preferred directory, as an example, you will be able to start ActiveMQ as shown below.

cd C:\apache-activemq-5.17.4\bin
activemq start
ActiveMQ started on Windows

Now, ActiveMQ is ready to begin handling some messages. Let’s proceed to the steps to create the sample application and stream the records into the Oracle Autonomous Database!

A solution comprising the Reactive Streams Ingestion Library and ActiveMQ

Our project and the associated code sample have the following directory structure:

General project overview

As shown in the project structure above, the sample code we use in this blog post has three listener classes which implement the RSI logic for each protocol, namely AMQP, MQTT and STOMP.

Before we dive into the code snippets, use your Oracle Database instance with the following DDL script to create the table:

CREATE TABLE RETAILER (rank int, msr int, retailer varchar(255), name varchar (255), city varchar (255), 
phone varchar (255), terminal_type varchar (255), weeks_active int, instant_sales_amt varchar(255), online_sales_amt varchar(255),
total_sales_amt varchar (255));
COMMIT;

Creating the JMS message consumers (listeners) for AMQP, MQTT and STOMP

Like with any messaging-based application, you will need to create a JMS message receiver (listener) that will handle the JMS messages sent by a JMS message producer. As our first example, the listener for AMQP is rsi.example.amqp.Listener.

The code snippet below creates an AMQP connection that connects to ActiveMQ using the given username, password, hostname, and port number. Just to mention, port number 5672 is the default port where ActiveMQ is listening over the AMQP protocol when it starts up.

Regarding the MOM component, ActiveMQ, there’s a property file provided to support the configuration of such default ports — mq.properties, among other details.

mq.properties file

This listener also creates a JMS Topic subscriber to consume messages the producer sends to the JMS Topic.

A code excerpt is below for your reference and the entire code sample for the AMQP listener.

public static void main(String[] args) throws Exception {

// Setup ActiveMQ connection and consumer
String connectionURI = MqConfig.AMQP_URL_SCHEME + MqConfig.ACTIVEMQ_HOST + ":" + MqConfig.ACTIVEMQ_AMQP_PORT;
JmsConnectionFactory factory = new JmsConnectionFactory(connectionURI);

Connection connection = factory.createConnection(MqConfig.ACTIVEMQ_USER, MqConfig.ACTIVEMQ_PASSWORD);
connection.start();
Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);

Destination destination = session.createTopic(MqConfig.ACTIVEMQ_TOPIC_NAME);

MessageConsumer consumer = session.createConsumer(destination);

...

}

We need to configure the main RSI component as well, regarding the the required Oracle Database connection details. So, there’s a configuration file to support that — configuration.properties.

Provide the connection details below to reflect your target Oracle Database instance.

Oracle DB connection details — RSI component

In this example, we use the built-in PushPublisher. It’s a simple publisher that pushes the items received by the RSI service.

After attaching the rsi.subscriber() to the pushPublisher, RSI can start receiving messages.

  // Start up RSI
ReactiveStreamsIngestion rsi = RSI_SERVICE.start();
PushPublisher<Retailer> pushPublisher = ReactiveStreamsIngestion.pushPublisher();
pushPublisher.subscribe(rsi.subscriber());

When a message is received, the listener closes the connection if the message body is equal to “SHUTDOWN”, otherwise, it transfers the message to OracleJsonObject, and it is then consumed by the pushPublisher.


...

while (true) {
Message msg = consumer.receive();
if (msg instanceof TextMessage) {
String body = ((TextMessage) msg).getText();

if (body.trim().equals("SHUTDOWN")) {
long diff = System.currentTimeMillis() - start;
System.out.println(String.format("Received %d in %.2f seconds", count, (1.0 * diff / 1000.0)));
connection.close();

// close RSI and worker threads
pushPublisher.close();
RSI_SERVICE.stop();

try {
Thread.sleep(10);
} catch (Exception e) {
}
System.exit(1);

} else {
// Create OracleJsonObject from the incoming message
OracleJsonObject jsonObject = JSON_FACTORY
.createJsonTextValue(new ByteArrayInputStream(body.getBytes())).asJsonObject();

// Push the data
pushPublisher.accept(new Retailer(jsonObject));

if (count == 1) {
start = System.currentTimeMillis();
} else if (count % 1000 == 0) {
System.out.println(String.format("Received %d messages.", count));
}
count++;
}

} else {
System.out.println("Unexpected message type: " + msg.getClass());
}
}

...

In a similar fashion, there are listeners implemented for the MQTT and STOMP protocols as well. They work in a similar way.

We also have to perform the ORM (Object-relational mapping) functionality of our Java entity to the target database table, so the Retailer.java class serves this purpose.

It is a POJO that defines the record to be streamed. The constructor accepts the OracleJsonObject type and converts the variable to a plain Java object. Interestingly, it also works with the jakarta.json.* counterparts if that’s a better option for you, so please check more details about such compatibility here.

The @StreamedEntity annotation indicates that the class maps to a database table named “customer”, and the @StreamField annotation maps fields to the columns of the database table. Note that you can always specify your own table name and column name using the optional StreamEntity.tabelName() and StreamField.columnName() elements.

...

@StreamEntity(tableName = "retailer")
public class Retailer {

/**
* @param jsonObject jsonObject which is converted from the JSON payload
*/
public Retailer(OracleJsonObject jsonObject) {
Stream
.of(this.getClass().getDeclaredFields())
.filter(df -> (df.getAnnotation(StreamField.class) != null))
.forEach(f -> {
f.setAccessible(true);

String fieldName = f.getName();
OracleJsonValue jsonValue = jsonObject.get(fieldName);
OracleJsonValue.OracleJsonType type = jsonValue.getOracleJsonType();

try {
switch (type) {
case DECIMAL:
f.setInt(this, jsonValue.asJsonDecimal().intValue());
break;
case STRING:
f.set(this, jsonValue.asJsonString().getString());
break;
default:
throw new IllegalArgumentException("unknown type");
}
} catch (IllegalAccessException ex) {
ex.printStackTrace();
}
});
}

@StreamField
public int rank;

...

Last, we have the class that implements the RSIService.java class that implements the service.

The class is nothing but again just a simple POJO that defines methods to configure, start and stop the RSI service.

We are using the method newVirtualThreadPerTaskExecutor() from JDK 21 to create an ExecutorService that uses Java Virtual Threads.

/**
* A class that builds RSI service.
*/
public final class RSIService {

/** ExecutorService that uses virtual threads from JDK 19 **/
private ExecutorService workers;
private ReactiveStreamsIngestion rsi;

/**
* Start RSI
*
* @return {@link oracle.rsi.ReactiveStreamsIngestion} object
*/
public ReactiveStreamsIngestion start() {
if (rsi != null) {
return rsi;
}

workers = Executors.newVirtualThreadPerTaskExecutor();
rsi = ReactiveStreamsIngestion.builder().url(DatabaseConfig.getJdbcConnectionUrl()) // "jdbc:oracle:thin:@host:port/DB"
.username(DatabaseConfig.USER).password(DatabaseConfig.PASSWORD).schema(DatabaseConfig.SCHEMA)
.entity(Retailer.class).executor(workers)
// .bufferRows(1000)
.bufferInterval(Duration.ofSeconds(5)).build();

return rsi;
}

/**
* Stop RSI
*/
public void stop() {
if (rsi != null) {
rsi.close();
}

if (workers != null) {
workers.shutdown();
}
}

}

Running the sample application

Run the following command to start a listener of your choice depending on the target protocol — AMQP, MQTT or STOMP.

The screenshot below shows the execution of our AMQP listener.

rsi.example.amqp.Listener

Now, you can send a message to ActiveMQ Topic to test the application!

To support that, the class MessageProducer.java implements a JMS message producer (client) that uses Java Virtual Threads to send 50 sample messages to the ActiveMQ Topic.

public class MessageProducer {

private static final OracleJsonFactory JSON_FACTORY = new OracleJsonFactory();
private static final ResteasyClient client = (ResteasyClient) ClientBuilder.newClient();

private static final ResteasyWebTarget target = client
.target(MqConfig.HTTP_URL_SCHEME + MqConfig.ACTIVEMQ_HOST + MqConfig.COLON + MqConfig.ACTIVEMQ_CLIENT_PORT
+ MqConfig.ACTIVEMQ_CLIENT_URI + MqConfig.QUESTION_MARK + MqConfig.ACTIVEMQ_TOPIC_PARAM);

public static void main(String args[]) {

target.register(new AuthHeadersRequestFilter(MqConfig.ACTIVEMQ_HTTP_BASIC_PASSWORD,
MqConfig.ACTIVEMQ_HTTP_BASIC_PASSWORD));
Instant start = Instant.now();

var threads = IntStream.range(0, 50).mapToObj(i -> Thread.startVirtualThread(() -> {
sendMessage();
System.out.println("Message #: " + (i));

})).toList();

for (var thread : threads) {
try {
thread.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
}

Instant finish = Instant.now();
long timeElapsed = Duration.between(start, finish).getSeconds();
System.out.println("Elapsed: " + timeElapsed);
client.close();

}

private static void sendMessage() {

Response response = null;
try {

String body = "{\"rank\": 1,\r\n" + "\"msr\": 217,\r\n" + "\"retailer\": \"100224\",\r\n"
+ "\"name\": \"Freddys One Stop\",\r\n" + "\"city\": \"Roland\",\r\n"
+ "\"phone\": \"(918) 503-6288\",\r\n" + "\"terminal_type\": \"Extrema\",\r\n"
+ "\"weeks_active\": 37,\r\n" + "\"instant_sales_amt\": \"$318,600.00 \",\r\n"
+ "\"online_sales_amt\": \"$509,803.00 \",\r\n" + "\"total_sales_amt\": \"$828,403.00 \"}";

OracleJsonObject jsonObject = JSON_FACTORY.createJsonTextValue(new ByteArrayInputStream(body.getBytes()))
.asJsonObject();

Retailer retailer = new Retailer(jsonObject);

response = target.request().post(Entity.json(retailer));
System.out.println(response.getStatusInfo());
} finally {
response.close();
}

}

}

The JMS message producer uses ActiveMQ’s REST API to send the data as JSON to the target topic. To enable that, we’ve implemented the jakarta.ws.rs.client.ClientRequestFilter interface as provided by Jakarta EE Platform API to create a custom filter that will perform the authentication for us.

Besides, we’ve configured the credentials for ActiveMQ on the respective mq.properties file and enabled it to allow the JMS message producer to authenticate against it (check the official documentation to get the details of this step).

ActiveMQ — credentials

Just before running the JMS message producer, let us check the database table to confirm that it is empty. Run your preferred SQL tool like Oracle SQL Developer, connect to the target Oracle database, and run the query below.

SQL query — SELECT * FROM RETAILER;

Now, run the JMS message producer (client), and then run the query above once again. You will see that 50 records have been ingested with the help of RSI and recorded on the target Oracle Database table as expected.

SQL query — SELECT * FROM RETAILER;

In addition to the execution above, you can run the other Listeners for MQTT and STOMP in order to see them in action as well.

Optionally, you can also achieve the same results by creating Apache JMeter if you want.

Tear down

Once you have completed your test, you can stop the listeners and ActiveMQ as expected.

ActiveMQ — shutdown

Wrapping it up

That’s it! This blog post explored a more elaborated scenario with the Reactive Streams Ingestion Library, ActiveMQ and several messaging-related protocols. Check out my previous blog posts listed below to learn more about RSI:

I hope you enjoyed this blog post! Stay tuned!!!

References

The Java library for Reactive Streams Ingestion (RSI)

Develop Java applications with Oracle Database

Apache ActiveMQ

Jakarta EE

Jakarta EE 10 Platform API

AMQP — Advanced Message Queuing Protocol

MQTT — MQ Telemetry Transport

STOMP — Simple Text-Oriented Messaging Protocol

Product — Oracle Database 23c Free — Developer Release

Documentation — Oracle Database 23c Free — Developer Release

Forum — Oracle Database 23c Free — Developer Release

Developers Guide For Oracle JDBC on Maven Central

Oracle SQL Developer

Oracle Developers and Oracle OCI Free Tier

Join our Oracle Developers channel on Slack to discuss Java, JDK, JDBC, GraalVM, Microservices with Spring Boot, Helidon, Quarkus, Micronaut, Reactive Streams, Cloud, DevOps, IaC, and other topics!

Build, test, and deploy your applications on Oracle Cloud — for free! Get access to OCI Cloud Free Tier!

--

--