Spring Boot + Camel + ActiveMQ

วันนี้ขอนำเสนอ การสร้าง Java Application ด้วย Spring Boots เพื่อ Consume และ Produce message กับ ActiveMQ แบบง่ายๆ นะครับ ในบทความนี้จะไม่ขอกล่าวถึงการติดตั้ง ActiveMQ นะครับ
Source Code : https://bitbucket.org/zengcode/spring-boot-camel-activemq
ตามเคยครับโปรเจคส์ของเราเป็น maven project นะครับ สร้าง pom.xml เลยครับ
จะเห็นว่ามี dependencies ที่เราต้องการใช้ในโปรเจคส์นี้คือ camel
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>org.springframework</groupId>
<artifactId>spring-boot-camel-activemq</artifactId>
<version>0.1.0</version>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>1.5.1.RELEASE</version>
</parent>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-spring-boot</artifactId>
<version>2.15.1</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-jms</artifactId>
<version>3.2.8.RELEASE</version>
</dependency>
<dependency>
<groupId>org.apache.camel</groupId>
<artifactId>camel-core</artifactId>
<version>2.17.2</version>
</dependency>
<dependency>
<groupId>org.apache.activemq</groupId>
<artifactId>activemq-camel</artifactId>
<version>5.9.1</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>ต่อไป Config ค่า ActiveMQ ใน application.yml ประมาณนี้ครับ
server:
port: 8888
activemq:
url: tcp://localhost:61616
user:
password:สร้าง Main Application
package camel.zengcode.com;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.AsyncConfigurerSupport;
import org.springframework.scheduling.annotation.EnableAsync;
import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
@EnableScheduling
@EnableAsync
public class MainApplication extends AsyncConfigurerSupport {
public static void main(String[] args) {
SpringApplication.run(MainApplication.class, args);
while (true){}
}
}สร้าง Configuration Class กันครับ เป็นการสร้าง Camel Context และสร้าง Connection เพื่อติดต่อกับ ActiveMQ
package camel.zengcode.com.configuration;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.camel.CamelContext;
import org.apache.camel.ThreadPoolRejectedPolicy;
import org.apache.camel.spi.ThreadPoolProfile;
import org.apache.camel.spring.boot.CamelContextConfiguration;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.jms.ConnectionFactory;
import static org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
@Configuration
public class MainConfiguration {
public static final String MY_DEFAULT_PROFILE = "myDefaultProfile";
public static final String ACTIVEMQ = "activemq";
@Bean
CamelContextConfiguration contextConfiguration(@Value("${activemq.user}") String user,
@Value("${activemq.password}") String password,
@Value("${activemq.url}") String url) {
return new CamelContextConfiguration() {
@Override
public void beforeApplicationStart(CamelContext context) {
// your custom configuration goes here
ThreadPoolProfile threadPoolProfile = new ThreadPoolProfile();
threadPoolProfile.setId(MY_DEFAULT_PROFILE);
threadPoolProfile.setPoolSize(10);
threadPoolProfile.setMaxPoolSize(15);
threadPoolProfile.setMaxQueueSize(250);
threadPoolProfile.setKeepAliveTime(25L);
threadPoolProfile.setRejectedPolicy(ThreadPoolRejectedPolicy.Abort);
context.getExecutorServiceManager().registerThreadPoolProfile(threadPoolProfile);
ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(user, password, url);
context.addComponent(ACTIVEMQ, jmsComponentAutoAcknowledge(connectionFactory));
}
};
}
}ต่อไปหัวใจสำคัญของบทความนี้แล้วครับ คือการสร้าง Camel Router เพื่อ Produce and consume message จาก ActiveMQ กันแล้ว
package camel.zengcode.com.camel;
import org.apache.camel.ConsumerTemplate;
import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.ProducerTemplate;
import org.apache.camel.builder.RouteBuilder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import java.util.UUID;
@Component
public class MainRouter extends RouteBuilder {
@Autowired
private ProducerTemplate producerTemplate;
@Override
public void configure() throws Exception {
//Producer route
from("timer://test?period=5000")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
String message = UUID.randomUUID().toString();
log.info("**********************************");
log.info("Send message '{}' to queue....", message);
producerTemplate.sendBody("activemq://test-queue", message);
}
});
//==========================================================================//
//Consumer queue
from("activemq://test-queue")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
String message = exchange.getIn().getBody(String.class);
log.info("--------------------------------");
log.info("Receive message '{}' from queue.", message);
}
});
}
}จาก Camel Router ด้านบน เราจะมี Router 2 ตัว
ตัวหนึ่งคือ Router ที่เป็น timer จะทำงานทุกๆ 5 วินาที โดยจะใช้ ตัว ProducerTemplate ของ Camel ในการส่งขข้อมูลเข้าไปในคิวที่ชื่อว่า test-queue ของ ActiveMQ
อีกตัวหนึ่งคือ Router ที่เป็น Consumer โดยจะคอยรับข้อมูลจากคิวที่ชื่อว่า test-queue ของ ActiveMQ ครับ
เมื่อ รัน Application ก็จะเห็น log ประมาณนี้นะครับ
Apache Camel 2.17.2 (CamelContext: camel-1) started in 0.400 seconds
Started MainApplication in 2.395 seconds (JVM running for 5.35)
**********************************
Send message ‘4b6ba71f-1eb9–4e81-a87f-c5f72178525c’ to queue….
— — — — — — — — — — — — — — — —
Receive message ‘4b6ba71f-1eb9–4e81-a87f-c5f72178525c’ from queue.
**********************************
Send message ‘986d128c-e5f2–430c-a23a-bc5d3901636a’ to queue….
สำหรับ Router ที่เป็น Producer สามารถเขียนได้อีกแบบแบบนี้นะครับ แล้วแต่สะดวกเลยครับ
//Producer route
from("timer://test?period=5000")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
String message = UUID.randomUUID().toString();
log.info("**********************************");
log.info("Send message '{}' to queue....", message);
exchange.getIn().setBody(message);
}
}).to("activemq://test-queue");ลองไปทำตามดูนะครับ ไม่ยากเลย ผมรัก Camel ที่สุดในโลก
