Spring Cloud Stream
Spring Cloud Stream
프로젝트는 Message 기반 마이크로 서비스 어플리케이션을 만들기위한 Framework
로 메시징 시스템과 연동하여 확장성 높은 이벤트 기반 마이크로 서비스를 구축 가능하게 해줍니다.
높은 추상화를 통해 어떤 인프라인지를 알 필요 없이 어플리케이션 설정과 간단한 구현 만으로 메시징 시스템을 구현할 수 있습니다.
Main Concept
Programming 을 통한 구현을 위해서는 아래 간단한 컨셉을 일단 알고 가야 합니다.
Destination Binder : 외부 메시징 시스템 (Kafka 또는 RabbitMQ)과의 연동을 책임지는 컴포넌트로 유저가 직접 구현시 책임져야 할 많은 boiler-plate한 코드들을 제공해 줍니다.
binding : 외부 메시징 시스템과 어플리케이션에서 제작된 Producer
Consumer
사이의 브릿지로서 binding name
을 통해 property에서 설정하여 사용할 수 있습니다.
Message : Destination Binder
와 통신하기 위한 정규화 데이터 구조로 POJO 객체를 그대로 사용할 수 있습니다.
위 그림에서 보이는 것처럼 외부 메시징 시스템과 연동 및 설정은 Spring Property를 통해 추상화되고 사용자는 Application 주요 로직만 신경 쓸 수 있도록 높은 추상화를 제공합니다.
RabbitMQ
각각의 특성과 용도가 달라 어느 메시징 시스템을 사용해도 상관없으며, 본문에서는 Rabbit MQ를 사용한 P2P 메시지와 Pub-Sub 구조의 메시징 시스템을 만들어 보겠습니다.
RabbitMQ Binder는 Topic을 기반한 Exchange Output(Producer)과 Queue를 통한 Input(Consumer)을 제공하고 있습니다.
docker 설정
테스트를 위해 아래와 같이 docker-compose를 통해 RabbitMQ를 설정합니다.
docker-compose.yaml
version: '3'
services:
rabbitmq:
image: 'rabbitmq:3-management-alpine'
container_name: rabbitmq-stream
ports:
- '5672:5672'
- '15672:15672'
environment:
RABBITMQ_ERLANG_COOKIE: "RabbitMQ-My-Cookies"
RABBITMQ_DEFAULT_USER: "admin"
RABBITMQ_DEFAULT_PASS: "admin"
docker-compose up
또는 docker-compose up -d
를 명령어를 통해 docker
를 실행시키고, 브라우저를 통해 http://localhost:15672
로 접속해 RabbitMQ상태를 모니터링 합니다.
Annotation-based vs Functional
Spring Cloud Stream
3.0부터는 spring-cloud-stream-reactive
가 합쳐지면서 Function
을 기반으로 한 Routing Function
, Multiple bindings with Functions
, Functions with multiple inputs/outputs
, Native support for reactive programming
등이 지원되면서 Annotation
을 활용한 방식은 Legacy 방식으로 취급되고 있습니다. 하지만 꼭 Functional
방식을 사용할 필요는 없으며 필요에 따라 Spring Integration
등을 활용하는 다양한 방식을 모두 지원하고 있습니다.
여기서는 Annotation-based
방식과 Functional
방식 두 가지 예를 모두 살펴보겠습니다.
Configuration
pom.xml
Spring Boot
관련 디펜던시와 함께 spring-cloud-starter-stream-rabbit
을 통해 연관 디펜던시를 모두 추가 할 수 있습니다.
Annotation-based
기존의 방식을 통한 Spring Cloud Stream
을 사용할 때 Spring Integration을 사용한 다양한 방식 ( @MessagingGateway
와 IntegrationFlow
등) 들을 활용할 수 있습니다. 여기에서는 StreamBridge
를 사용한 Publisher와 @StreamListener
를 사용한 Consumer를 알아보도록 하겠습니다.
Publisher
application.yaml
spring:
profiles: local
cloud:
stream:
bindings:
direct:
binder: rabbit
destination: p2p-topic
broadcast:
binder: rabbit
destination: pubsub-topic
rabbitmq:
addresses: localhost
username: admin
password: admin
spring.rabbitmq
를 통해 RabbitMQ 관련 접속 정보를 설정해 줍니다.
Spring Cloud Stream
에서는 앞서 언급한 Binding Name
을 통해 Stream IN/OUT을 설정할 수 있습니다.
Legacy 방식에서는 spring.cloud.bindings.<Binding Name>.destination
을 통해 Binding Name
에 연관된 RabbitMQ Topic Exchange
를 설정해 줄 수 있습니다. 위 예시에서는 direct
와 broadcast
의 두 개의 Binding을 설정하였습니다.
- Source
ProducerChannel
을 통해 Binding Interface
를 정의하고 @EnableBinding
을 통해 적용해주면 application.yaml
을 통해 정의한 Binding Name
들과 연결할 수 있습니다.
위 예제에서는 REST
호출을 통해 외부에서 부터 시작된 데이터를 Stream
으로 전달하기 위해서 StreamBridge
를 활용하였습니다. 참고
StreamBridge.send()
를 통해 ProducerChannel
에 정의된 Binding Name
을 찾고 해당하는 spring.cloud.stream.bindings
의 Topic으로 메시지를 전달할 수 있습니다. (기본 메시지 타입은 application/json
입니다.) 참고
아직 현재까지는
direct
와broadcast
가 설정에 차이가 없습니다.
Consumer
application.yaml
spring:
profiles: local
cloud:
stream:
bindings:
direct:
binder: rabbit
destination: p2p-topic
group: p2p-group
durableSubscription: true
broadcast:
binder: rabbit
destination: pubsub-topic
rabbitmq:
addresses: localhost
username: admin
password: admin
Consumer에서도 동일하게 RabbitMQ 관련 설정을 정의해주고 Spring Cloud Stream
관련 대상 Topic도 정의해 줍니다. Topic 메시지를 한 Consumer만 처리할 수 있도록 group
을 정의해 주고 Consumer가 하나도 없을 경우 메시지를 저장하고 Consumer 접속 시 저장된 메시지를 전달하도록 durableSubscription
을 true
로 설정해 줍니다.
- Source
ConsumerChannel
을 통하여 Binding Interface
를 정의합니다. @Input
을 통해 Consumer를 정의할 때는 @Output
과는 달리 MessageChannel
의 하위 인터페이스인 SubscribableChannel
을 사용해야 합니다.
메시지를 처리할 때에는 @StreamListener
를 통해 Binding된 메시지를 바로 처리 할 수 있습니다.
만약 메시지를 처리해서 다른 Stream
으로 바로 전달하고 싶다면 @SendTo
를 함께 사용하여 Stream
으로 바로 Publish할 수 있습니다.
Publisher
와 Consumer
두 어플리케이션을 실행한 상태로 curl
과 같은 도구를 이용하여 아래와 같이 호출해 보면 Publisher
-> RabbitMQ
-> Consumer
로 메시지가 전달되는 것을 확인 할 수 있습니다.
curl -X GET 'http://localhost:8080/api/direct/hi'
curl -X GET 'http://localhost:8080/api/broadcast/hiAll'
Functional way
Functional
방식에서는 Binding Name
이 중요합니다. 어플리케이션에서 정의된 Functional Interface
명을 활용하여 다음과 같은 규칙을 통해 Input/Output 정의할 수 있습니다.
Input : <function name> + -in- + <index>
Output : <function name> + -out- + <index>
<index>
값은 input/output Binding 순번으로 Multiple Input/Output
이 아닐 경우 0
으로 설정합니다.
Functional
방식에서는 Binding Interface
를 정의할 필요 없이 @Bean
으로 등록된 Supplier<T>
, Function<T, R>
, Consumer<T>
를 인지하여 Function Name
과 Property에 설정된 binding Name
을 자동으로 연결해 줍니다.
Publisher
application.yaml
spring:
profiles: local
cloud:
function:
definition: direct;broadcast
stream:
bindings:
direct-out-0:
destination: p2p-topic
broadcast-out-0:
destination: pubsub-topic
rabbitmq:
addresses: localhost
username: admin
password: admin
direct
와 broadcast
두 개의 Function
을 정의 했을 때 외부 Stream
으로 메시지를 발행해야 하기 때문에 direct-out-0
과 같은 형태로 Binding Name
을 정의하게 됩니다. 그리고 정의된 Function
이 두 개 이상일 때에는 spring.cloud.function.definition
을 통해 구분자 ;
와 함께 정의된 Function Name
을 기술해 줘야 합니다.
- Source
앞서 설정한 Binding Name
과 정의된 Supplier
를 통해 Stream
으로 메시지를 보낼 수 있으며, 위 예에서는 REST
호출을 통해 메시지를 전달하기 위해 EmitterProcessor
를 활용해야 합니다. Binding Interface
없이 아주 간단하게 구현 가능한 것을 확인할 수 있습니다.
Consumer
application.yaml
spring:
profiles: local
cloud:
function:
definition: direct;broadcast
stream:
bindings:
direct-in-0:
binder: rabbit
destination: p2p-topic
group: p2p-group
durableSubscription: true
broadcast-in-0:
destination: pubsub-topic
rabbitmq:
addresses: localhost
username: admin
password: admin
Binding Name
을 제외하고는 Annotation-based
방식과 동일한 것을 확인할 수 있습니다.
- Source
@Bean
으로 등록된 Functional Interface
를 통해 간단히 메시지를 처리하는 것을 볼 수 있습니다. 다만 Reactive Consumer
는 조금 특별합니다. Consumer
가 void
를 리턴하기 때문에 subscribe
를 할 수가 없어서 Consumer<Flux<?>>
대신 Function<Flux<?>, Mono<Void>>
를 사용하고, then()
을 통해 리턴합니다.
Spring Cloud Stream
을 활용하면 아주 간단한 구현 만으로도 메시지 기반 마이크로 서비스 구축이 가능하며, Spring Cloud Stream Applications
와 Spring Cloud Stream App Starters
를 통해 설정 만으로도 서비스 구현이 가능하도록 지원하고 있습니다.
관련 소스는 아래 GitHub 링크에서 확인 가능합니다.
Functional Way
Annotation-based