Spring Cloud Stream with RabbitMQ

달빛방랑
13 min readMay 27, 2020

--

https://cloud.spring.io/spring-cloud-static/spring-cloud-stream/3.0.4.RELEASE/reference/html/spring-cloud-stream.html#spring-cloud-stream-overview-application-model

Spring Cloud Stream

Spring Cloud Stream 프로젝트는 Message 기반 마이크로 서비스 어플리케이션을 만들기위한 Framework 로 메시징 시스템과 연동하여 확장성 높은 이벤트 기반 마이크로 서비스를 구축 가능하게 해줍니다.
높은 추상화를 통해 어떤 인프라인지를 알 필요 없이 어플리케이션 설정과 간단한 구현 만으로 메시징 시스템을 구현할 수 있습니다.

Main Concept

Programming 을 통한 구현을 위해서는 아래 간단한 컨셉을 일단 알고 가야 합니다.

Destination Binder : 외부 메시징 시스템 (Kafka 또는 RabbitMQ)과의 연동을 책임지는 컴포넌트로 유저가 직접 구현시 책임져야 할 많은 boiler-plate한 코드들을 제공해 줍니다.

binding : 외부 메시징 시스템과 어플리케이션에서 제작된 Producer Consumer 사이의 브릿지로서 binding name 을 통해 property에서 설정하여 사용할 수 있습니다.

Message : Destination Binder 와 통신하기 위한 정규화 데이터 구조로 POJO 객체를 그대로 사용할 수 있습니다.

위 그림에서 보이는 것처럼 외부 메시징 시스템과 연동 및 설정은 Spring Property를 통해 추상화되고 사용자는 Application 주요 로직만 신경 쓸 수 있도록 높은 추상화를 제공합니다.

RabbitMQ

각각의 특성과 용도가 달라 어느 메시징 시스템을 사용해도 상관없으며, 본문에서는 Rabbit MQ를 사용한 P2P 메시지와 Pub-Sub 구조의 메시징 시스템을 만들어 보겠습니다.

RabbitMQ Binder

RabbitMQ Binder는 Topic을 기반한 Exchange Output(Producer)과 Queue를 통한 Input(Consumer)을 제공하고 있습니다.

docker 설정

테스트를 위해 아래와 같이 docker-compose를 통해 RabbitMQ를 설정합니다.

  • docker-compose.yaml
version: '3'
services:
rabbitmq:
image: 'rabbitmq:3-management-alpine'
container_name:
rabbitmq-stream
ports:
- '5672:5672'
- '15672:15672'
environment:
RABBITMQ_ERLANG_COOKIE: "RabbitMQ-My-Cookies"
RABBITMQ_DEFAULT_USER: "admin"
RABBITMQ_DEFAULT_PASS: "admin"

docker-compose up 또는 docker-compose up -d 를 명령어를 통해 docker 를 실행시키고, 브라우저를 통해 http://localhost:15672 로 접속해 RabbitMQ상태를 모니터링 합니다.

Annotation-based vs Functional

Spring Cloud Stream 3.0부터는 spring-cloud-stream-reactive 가 합쳐지면서 Function 을 기반으로 한 Routing Function , Multiple bindings with Functions , Functions with multiple inputs/outputs , Native support for reactive programming 등이 지원되면서 Annotation 을 활용한 방식은 Legacy 방식으로 취급되고 있습니다. 하지만 꼭 Functional 방식을 사용할 필요는 없으며 필요에 따라 Spring Integration 등을 활용하는 다양한 방식을 모두 지원하고 있습니다.

여기서는 Annotation-based 방식과 Functional 방식 두 가지 예를 모두 살펴보겠습니다.

Configuration

  • pom.xml

Spring Boot 관련 디펜던시와 함께 spring-cloud-starter-stream-rabbit 을 통해 연관 디펜던시를 모두 추가 할 수 있습니다.

Annotation-based

기존의 방식을 통한 Spring Cloud Stream 을 사용할 때 Spring Integration을 사용한 다양한 방식 ( @MessagingGatewayIntegrationFlow 등) 들을 활용할 수 있습니다. 여기에서는 StreamBridge 를 사용한 Publisher와 @StreamListener 를 사용한 Consumer를 알아보도록 하겠습니다.

Publisher

  • application.yaml
spring:
profiles:
local
cloud:
stream:
bindings:
direct:
binder:
rabbit
destination: p2p-topic
broadcast:
binder:
rabbit
destination: pubsub-topic
rabbitmq:
addresses:
localhost
username: admin
password: admin

spring.rabbitmq 를 통해 RabbitMQ 관련 접속 정보를 설정해 줍니다.
Spring Cloud Stream 에서는 앞서 언급한 Binding Name 을 통해 Stream IN/OUT을 설정할 수 있습니다.
Legacy 방식에서는 spring.cloud.bindings.<Binding Name>.destination 을 통해 Binding Name 에 연관된 RabbitMQ Topic Exchange 를 설정해 줄 수 있습니다. 위 예시에서는 directbroadcast 의 두 개의 Binding을 설정하였습니다.

  • Source

ProducerChannel 을 통해 Binding Interface 를 정의하고 @EnableBinding 을 통해 적용해주면 application.yaml 을 통해 정의한 Binding Name 들과 연결할 수 있습니다.

위 예제에서는 REST 호출을 통해 외부에서 부터 시작된 데이터를 Stream 으로 전달하기 위해서 StreamBridge 를 활용하였습니다. 참고

StreamBridge.send() 를 통해 ProducerChannel 에 정의된 Binding Name 을 찾고 해당하는 spring.cloud.stream.bindings 의 Topic으로 메시지를 전달할 수 있습니다. (기본 메시지 타입은 application/json 입니다.) 참고

아직 현재까지는 directbroadcast 가 설정에 차이가 없습니다.

Consumer

  • application.yaml
spring:
profiles:
local
cloud:
stream:
bindings:
direct:
binder:
rabbit
destination: p2p-topic
group: p2p-group
durableSubscription: true
broadcast:
binder:
rabbit
destination: pubsub-topic
rabbitmq:
addresses:
localhost
username: admin
password: admin

Consumer에서도 동일하게 RabbitMQ 관련 설정을 정의해주고 Spring Cloud Stream 관련 대상 Topic도 정의해 줍니다. Topic 메시지를 한 Consumer만 처리할 수 있도록 group 을 정의해 주고 Consumer가 하나도 없을 경우 메시지를 저장하고 Consumer 접속 시 저장된 메시지를 전달하도록 durableSubscriptiontrue 로 설정해 줍니다.

  • Source

ConsumerChannel 을 통하여 Binding Interface 를 정의합니다. @Input 을 통해 Consumer를 정의할 때는 @Output 과는 달리 MessageChannel 의 하위 인터페이스인 SubscribableChannel 을 사용해야 합니다.

메시지를 처리할 때에는 @StreamListener 를 통해 Binding된 메시지를 바로 처리 할 수 있습니다.

만약 메시지를 처리해서 다른 Stream으로 바로 전달하고 싶다면 @SendTo 를 함께 사용하여 Stream 으로 바로 Publish할 수 있습니다.

PublisherConsumer 두 어플리케이션을 실행한 상태로 curl 과 같은 도구를 이용하여 아래와 같이 호출해 보면 Publisher -> RabbitMQ -> Consumer 로 메시지가 전달되는 것을 확인 할 수 있습니다.

curl -X GET 'http://localhost:8080/api/direct/hi'
curl -X GET 'http://localhost:8080/api/broadcast/hiAll'

Functional way

Functional 방식에서는 Binding Name 이 중요합니다. 어플리케이션에서 정의된 Functional Interface 명을 활용하여 다음과 같은 규칙을 통해 Input/Output 정의할 수 있습니다.

Input : <function name> + -in- + <index>
Output : <function name> + -out- + <index>

<index> 값은 input/output Binding 순번으로 Multiple Input/Output 이 아닐 경우 0 으로 설정합니다.

Functional 방식에서는 Binding Interface 를 정의할 필요 없이 @Bean 으로 등록된 Supplier<T> , Function<T, R> , Consumer<T> 를 인지하여 Function Name 과 Property에 설정된 binding Name 을 자동으로 연결해 줍니다.

Publisher

  • application.yaml
spring:
profiles:
local
cloud:
function:
definition:
direct;broadcast
stream:
bindings:
direct-out-0:
destination:
p2p-topic
broadcast-out-0:
destination:
pubsub-topic
rabbitmq:
addresses:
localhost
username: admin
password: admin

directbroadcast 두 개의 Function 을 정의 했을 때 외부 Stream 으로 메시지를 발행해야 하기 때문에 direct-out-0 과 같은 형태로 Binding Name 을 정의하게 됩니다. 그리고 정의된 Function 이 두 개 이상일 때에는 spring.cloud.function.definition 을 통해 구분자 ; 와 함께 정의된 Function Name 을 기술해 줘야 합니다.

  • Source

앞서 설정한 Binding Name 과 정의된 Supplier 를 통해 Stream 으로 메시지를 보낼 수 있으며, 위 예에서는 REST 호출을 통해 메시지를 전달하기 위해 EmitterProcessor 를 활용해야 합니다. Binding Interface 없이 아주 간단하게 구현 가능한 것을 확인할 수 있습니다.

Consumer

  • application.yaml
spring:
profiles:
local
cloud:
function:
definition:
direct;broadcast
stream:
bindings:
direct-in-0:
binder:
rabbit
destination: p2p-topic
group: p2p-group
durableSubscription: true
broadcast-in-0:
destination:
pubsub-topic
rabbitmq:
addresses:
localhost
username: admin
password: admin

Binding Name 을 제외하고는 Annotation-based 방식과 동일한 것을 확인할 수 있습니다.

  • Source

@Bean 으로 등록된 Functional Interface 를 통해 간단히 메시지를 처리하는 것을 볼 수 있습니다. 다만 Reactive Consumer 는 조금 특별합니다. Consumervoid 를 리턴하기 때문에 subscribe 를 할 수가 없어서 Consumer<Flux<?>> 대신 Function<Flux<?>, Mono<Void>> 를 사용하고, then() 을 통해 리턴합니다.

Spring Cloud Stream 을 활용하면 아주 간단한 구현 만으로도 메시지 기반 마이크로 서비스 구축이 가능하며, Spring Cloud Stream ApplicationsSpring Cloud Stream App Starters 를 통해 설정 만으로도 서비스 구현이 가능하도록 지원하고 있습니다.

관련 소스는 아래 GitHub 링크에서 확인 가능합니다.

  • Functional Way
  • Annotation-based

--

--