Spring Boot + Camel + ActiveMQ

วันนี้ขอนำเสนอ การสร้าง Java Application ด้วย Spring Boots เพื่อ Consume และ Produce message กับ ActiveMQ แบบง่ายๆ นะครับ ในบทความนี้จะไม่ขอกล่าวถึงการติดตั้ง ActiveMQ นะครับ

Source Code : https://bitbucket.org/zengcode/spring-boot-camel-activemq

ตามเคยครับโปรเจคส์ของเราเป็น maven project นะครับ สร้าง pom.xml เลยครับ
จะเห็นว่ามี dependencies ที่เราต้องการใช้ในโปรเจคส์นี้คือ camel

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"
>
<modelVersion>4.0.0</modelVersion>

<groupId>org.springframework</groupId>
<artifactId>spring-boot-camel-activemq</artifactId>
<version>0.1.0</version>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.1.RELEASE</version>
</parent>

<properties>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring-boot</artifactId>
<version>2.15.1</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>3.2.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>2.17.2</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-camel</artifactId>
<version>5.9.1</version>
</dependency>
</dependencies>


<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

ต่อไป Config ค่า ActiveMQ ใน application.yml ประมาณนี้ครับ

server:
port:
8888

activemq:
url:
tcp://localhost:61616
user:
password:

สร้าง Main Application

package camel.zengcode.com;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.AsyncConfigurerSupport;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
@EnableAsync
public class MainApplication extends AsyncConfigurerSupport {

public static void main(String[] args) {
SpringApplication.run(MainApplication.class, args);
while (true){}
}

}

สร้าง Configuration Class กันครับ เป็นการสร้าง Camel Context และสร้าง Connection เพื่อติดต่อกับ ActiveMQ

package camel.zengcode.com.configuration;

import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.ThreadPoolRejectedPolicy;
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.spring.boot.CamelContextConfiguration;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.jms.ConnectionFactory;

import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;

@Configuration
public class MainConfiguration {

public static final String MY_DEFAULT_PROFILE = "myDefaultProfile";
public static final String ACTIVEMQ = "activemq";


@Bean
CamelContextConfiguration contextConfiguration(@Value("${activemq.user}") String user,
@Value("${activemq.password}") String password,
@Value("${activemq.url}") String url) {
return new CamelContextConfiguration() {

@Override
public void beforeApplicationStart(CamelContext context) {
// your custom configuration goes here
ThreadPoolProfile threadPoolProfile = new ThreadPoolProfile();
threadPoolProfile.setId(MY_DEFAULT_PROFILE);
threadPoolProfile.setPoolSize(10);
threadPoolProfile.setMaxPoolSize(15);
threadPoolProfile.setMaxQueueSize(250);
threadPoolProfile.setKeepAliveTime(25L);
threadPoolProfile.setRejectedPolicy(ThreadPoolRejectedPolicy.Abort);
context.getExecutorServiceManager().registerThreadPoolProfile(threadPoolProfile);
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
context.addComponent(ACTIVEMQ, jmsComponentAutoAcknowledge(connectionFactory));
}
};
}

}

ต่อไปหัวใจสำคัญของบทความนี้แล้วครับ คือการสร้าง Camel Router เพื่อ Produce and consume message จาก ActiveMQ กันแล้ว

package camel.zengcode.com.camel;

import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

@Component
public class MainRouter extends RouteBuilder {

@Autowired
private ProducerTemplate producerTemplate;

@Override
public void configure() throws Exception {


//Producer route
from("timer://test?period=5000")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
String message = UUID.randomUUID().toString();
log.info("**********************************");
log.info("Send message '{}' to queue....", message);
producerTemplate.sendBody("activemq://test-queue", message);
}
});

//==========================================================================//

//Consumer queue
from("activemq://test-queue")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {

String message = exchange.getIn().getBody(String.class);
log.info("--------------------------------");
log.info("Receive message '{}' from queue.", message);
}
});

}
}

จาก Camel Router ด้านบน เราจะมี Router 2 ตัว

ตัวหนึ่งคือ Router ที่เป็น timer จะทำงานทุกๆ 5 วินาที โดยจะใช้ ตัว ProducerTemplate ของ Camel ในการส่งขข้อมูลเข้าไปในคิวที่ชื่อว่า test-queue ของ ActiveMQ

อีกตัวหนึ่งคือ Router ที่เป็น Consumer โดยจะคอยรับข้อมูลจากคิวที่ชื่อว่า test-queue ของ ActiveMQ ครับ

เมื่อ รัน Application ก็จะเห็น log ประมาณนี้นะครับ

Apache Camel 2.17.2 (CamelContext: camel-1) started in 0.400 seconds
Started MainApplication in 2.395 seconds (JVM running for 5.35)
**********************************
Send message ‘4b6ba71f-1eb9–4e81-a87f-c5f72178525c’ to queue….
— — — — — — — — — — — — — — — —
Receive message ‘4b6ba71f-1eb9–4e81-a87f-c5f72178525c’ from queue.
**********************************
Send message ‘986d128c-e5f2–430c-a23a-bc5d3901636a’ to queue….

สำหรับ Router ที่เป็น Producer สามารถเขียนได้อีกแบบแบบนี้นะครับ แล้วแต่สะดวกเลยครับ

//Producer route
from("timer://test?period=5000")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
String message = UUID.randomUUID().toString();
log.info("**********************************");
log.info("Send message '{}' to queue....", message);
exchange.getIn().setBody(message);
}
}).to("activemq://test-queue");

ลองไปทำตามดูนะครับ ไม่ยากเลย ผมรัก Camel ที่สุดในโลก

Chiwa Kantawong (Pea)

Written by

Backend Team Lead

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade