Sitemap

Spring Cloud Stream Binder for Oracle Database TxEventQ

4 min readAug 28, 2024

Spring Cloud Stream is a Java framework designed for building event-driven microservices backed by a scalable, fault-tolerant messaging systems. The Oracle Database Transactional Event Queues (TxEventQ) stream binder implementation allows developers to leverage Oracle’s database messaging platform within the Spring Cloud Stream ecosystem, all while keeping your data within the converged database.

The TxEventQ stream binder integration combines the power of Oracle’s database-enabled messaging system with Spring Cloud Stream’s functional, event-driven development model. With the TxEventQ stream binder, Spring applications can seamlessly produce and consume messages with Oracle Database’s high-performance, transactional messaging system, benefiting from Spring’s abstraction layer and configuration capabilities.

Key Features of TxEventQ Stream Binder

Oracle Database TxEventQ provides a high-throughput, reliable messaging platform built directly into the database.

  • Real-time messaging with multiple publishers, consumers, and topics — all with a simple functional interface.
  • High throughput on RAC: 100 billion messages per day were measured on an 8-node Oracle Real Application Clusters (RAC) database.
  • Convergence of data: Your messaging platform lives within the database.
  • Integration with Spring Cloud Stream makes it easy-to-use, and easy to get started.

TxEventQ Stream Binder Sample Application

In this section, we’ll walk through a sample application that demonstrates how to get started with the Spring Cloud Stream binder for Oracle Database TxEventQ.

The full sample code for this section can be found here: TxEventQ Stream Binder Sample Application

TxEventQ Database Permissions

The database user producing/consuming events from the Spring Cloud Stream binder for TxEventQ will require the following database permissions:

grant unlimited tablespace to testuser;
grant select_catalog_role to testuser;
grant execute on dbms_aq to testuser;
grant execute on dbms_aqadm to testuser;
grant execute on dbms_aqin to testuser;
grant execute on dbms_aqjms_internal to testuser;
grant execute on dbms_teqk to testuser;
grant execute on DBMS_RESOURCE_MANAGER to testuser;
grant select on sys.aq$_queue_shards to testuser;
grant select on user_queue_partition_assignment_table to testuser;

Modify the username and tablespace grant as appropriate for your application.

Starting with a basic Spring Boot

We’ll create a basic Spring Boot application to test out the TxEventQ stream binder, which looks something like this.

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class TxEventQSampleApp {
public static void main(String[] args) {
SpringApplication.run(TxEventQSampleApp.class, args);
}
}

Add the Oracle Database TxEventQ Stream Binder

In your project’s pom.xml, add the following TxEventQ dependency, to pull in all necessary dependencies like Oracle JDBC 11 and UCP.

<dependency>
<groupId>com.oracle.database.spring.cloud-stream-binder</groupId>
<artifactId>spring-cloud-stream-binder-oracle-txeventq</artifactId>
<version>0.9.0</version> <!-- latest is 0.9.0 at time of writing -->
</dependency>

Connect TxEventQ to Oracle Database with Spring datasources

Next, configure your Spring properties to connect to Oracle Database using UCP. With YAML properties, it should look something like this:

spring:
datasource:
username: ${USERNAME}
password: ${PASSWORD}
url: ${JDBC_URL}
driver-class-name: oracle.jdbc.OracleDriver
type: oracle.ucp.jdbc.PoolDataSourceImpl
oracleucp:
initial-pool-size: 1
min-pool-size: 1
max-pool-size: 30
connection-pool-name: TxEventQSample
connection-factory-class-name: oracle.jdbc.pool.OracleDataSource

Note that TxEventQ requires no additional Spring datasource configuration beyond what is required for Oracle Database with UCP. If you’d like to read more about UCP, I suggest checking out this excellent blog by Juarez Junior: Migrating from Hikari to UCP.

Implementing Producers and Consumers

With Spring Cloud Stream, the producer/consumer code abstracts away references to TxEventQ, using pure Spring APIs to produce and consume messages.

Let’s start by implementing a producer that returns a phrase word-by-word, indicating when it has processed the whole phrase.

import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Supplier;

public class WordSupplier implements Supplier<String> {
private final String[] words;
private final AtomicInteger idx = new AtomicInteger(0);
private final AtomicBoolean done = new AtomicBoolean(false);

public WordSupplier(String phrase) {
this.words = phrase.split(" ");
}

@Override
public String get() {
int i = idx.getAndAccumulate(words.length, (x, y) -> {
if (x < words.length - 1) {
return x + 1;
}
done.set(true);
return 0;
});
return words[i];
}

public boolean done() {
return done.get();
}
}

Next, let’s add Spring beans for producers and consumers, including our WordSupplier and two more simple functional interfaces.

import java.util.function.Consumer;
import java.util.function.Function;

import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class StreamConfiguration {
private @Value("${phrase}") String phrase;

@Bean
public Function<String, String> toUpperCase() {
return String::toUpperCase;
}

@Bean
public Consumer<String> stdoutConsumer() {
return s -> System.out.println("Consumed: " + s);
}

@Bean
public WordSupplier wordSupplier() {
return new WordSupplier(phrase);
}
}

Wire up producers and consumer bindings with Spring Cloud Stream

Jumping back to the application properties, let’s configure the Spring Cloud Stream bindings.

In our binding configuration, the wordSupplier producer has toUpperCase as a destination, and the stdoutConsumer consumer reads from toUpperCase. The result of this acyclic producer-consumer configuration is that each word from the phrase is converted to uppercase and sent to stdout.

phrase: "Spring Cloud Stream simplifies event-driven microservices with powerful messaging capabilities."

spring:
cloud:
stream:
bindings:
wordSupplier-out-0:
destination: toUpperCase-in-0
group: t1
producer:
required-groups:
- t1
stdoutConsumer-in-0:
destination: toUpperCase-out-0
group: t1
function:
definition: wordSupplier;toUpperCase;stdoutConsumer

If you’ve followed along and have an Oracle Database instance handy (see Oracle Database 23ai Free if you don’t), you should have a runnable application. After you start the application, you should see the following messages sent to stdout:

Consumed: SPRING
Consumed: CLOUD
Consumed: STREAM
Consumed: SIMPLIFIES
Consumed: EVENT-DRIVEN
Consumed: MICROSERVICES
Consumed: WITH
Consumed: POWERFUL
Consumed: MESSAGING
Consumed: CAPABILITIES.

That’s all for this intro blog! If you have questions about the TxEventQ stream binder or Oracle with Spring, don’t hesitate to reach out or leave a comment.

--

--

Anders Swanson
Anders Swanson

Written by Anders Swanson

I'm a Developer Evangelist for Oracle Database who is passionate about creating content and tools to help developers succeed.

No responses yet