SSE를 활용한 다중 서버 환경에서의 실시간 알람 기능(feat.Hazelcast Topic)

SSE: Hazelcast Topic과의 효율적인 통합 방안

최예소라
PentaSecurity Labs
19 min readNov 27, 2023

--

프로젝트 개발 중에 아래와 같은 이유로 사용자에게 실시간 알람을 보내는 기능이 필요했습니다.

  • 관리자가 사용자의 상태를 변경하여 사용자를 강제 로그아웃 시켜야 하는 경우
  • 긴급 공지 내용 등을 사용자에게 알리는 경우 등

이중화된 WAS에서 인 메모리 데이터 그리드인 Hazelcast를 사용 중이었는데, 단순하게 클라이언트와의 연결정보를 Hazelcast의 세션 클러스터링 기능과 비슷하게 공유하면 될 줄 알았지만 불가능했습니다 😢
Message queue를 구축하는 것이 일반적인 방법이었지만, 고민이 많이 필요했던 상황이었습니다.

이 글에서는 다중 서버 환경에서 별도의 Message queue 시스템을 이용하지 않고(Hazelcast의 컴포넌트를 잘 이용해서) 웹브라우저에게 실시간 알람을 발송할 수 있는 기능을 소개합니다.

SSE with Hazelcast Topic

SSE(Server-Sent Events)란?

SSE는 HTTP 프로토콜에서 서버가 클라이언트에게 실시간으로 데이터를 보내는 HTML5 표준 기술입니다. 별도의 프로토콜을 사용하지 않고 HTTP 프로토콜만으로 사용할 수 있기 때문에 구현이 단순하며, 보통 푸시 알림 등 실시간 데이터 업데이트 기능으로 많이 사용됩니다.

왜 SSE를 사용할까?

실시간 통신 구현에는 다양한 방식이 있습니다. 일반적으로 polling 이나, 웹소켓이 사용됩니다.

  • polling : 클라이언트가 서버로 주기적인 http request를 보내는 방식
polling 방식 — 클라이언트가 주기적으로 서버에 request를 보내고, 서버는 응답한다.
  • 웹 소켓 : ws 프로토콜을 이용한 서버-클라이언트 양방향 통신 방식
웹소켓방식 — 한 번의 연결 설정 후 서버와 클라이언트가 서로에게 데이터를 자유롭게 보낼 수 있다.

그러나 polling 방식은 request가 계속해서 발생해 클라이언트가 많아지면 서버에 부하가 생길 수 있으며, ‘주기적으로’ 요청하는 방식은 완전한 실시간이라 할 수 없습니다.

또한 웹 소켓은 양방향 통신을 위해 도입된 프로토콜로, 서버 측 단방향 통신인 SSE보다 무겁습니다.

SSE 통신방식 — 한번의 연결로 클라이언트의 별도 요청 없이 서버는 계속해서 데이터를 전송할 수 있다.

SSE는 클라이언트가 서버를 최초 1회 ‘구독(subscribe)’만 하면, 계속해서 서버로부터 데이터를 받을 수 있습니다. 단순히 서버의 일방적인 데이터 전송만 필요하다면, HTTP 외에 별도의 프로토콜을 필요로 하지 않는 SSE가 웹 소켓 방식보다 우월한 이유입니다.

Spring MVC에서는 이 SSE 구현을 위해 SseEmitter 클래스를 제공합니다.

SseEmitter
Spring에서 ResponseBodyEmitter을 확장하여 SSE 구현을 돕는 클래스로 아래와 같은 특징을 가집니다.
1) 비동기 처리 지원
2) 다양한 데이터 형식 지원
3) 타임아웃 설정 기능 및 에러 핸들링 기능 지원

이번 프로젝트에서는 단순히 서버→클라이언트의 단방향 푸시 알림만 필요했기 때문에, SSE를 활용하기로 했습니다.

서버의 구독 API는 아래와 같습니다.

  • Controller
@GetMapping(value = "/event/subscribe", produces = MediaType.TEXT_EVENT_STREAM_VALUE)
public SseEmitter subscribe(HttpServletRequest request, HttpServletResponse response) {
String sessionId = request.getSession().getId();
return sessionEventNotificationService.subscribe(sessionService.getSession(sessionId));
}

클라이언트가 구독 API를 호출하면, 서버는 SseEmitter를 생성하고 저장합니다.
이때, 클라이언트를 특정하기 위한 식별자(Key)로 클라이언트의 세션 아이디를 사용했습니다.(MediaType은 꼭 TEXT_EVENT_STREAM_VALUE 로 설정해야 합니다.)

  • EventNotificationService —(1) SseEmitter 생성 및 구독 기능부
// 구독 메서드 
public SseEmitter subscribe(Session session) {
SseEmitter emitter = createEmitter(session);
return emitter;
}

// SseEmitter 생성 메서드
private SseEmitter createEmitter(Session session) {

// 생성자 파라미터로 타임아웃을 지정할 수 있다.
SseEmitter emitter = new SseEmitter(session.getMaxInactiveInterval().getSeconds());
String sessionId = session.getId();
sseLocalRepository.put(sessionId, emitter);

// Emitter가 타임아웃 되었을 때(지정된 시간동안 어떠한 이벤트도 전송되지 않았을 때)
emitter.onTimeout(() -> sseLocalRepository.deleteById(sessionId));

// Emitter가 정상적으로 연결이 종료되었을 때
emitter.onCompletion(() -> sseLocalRepository.deleteById(sessionId));

// 에러 발생 시
emitter.onError((t) -> {
sseLocalRepository.deleteById(sessionId);
});

return emitter;
}

// 사용자에게 알림 발송이 필요한 시점에 이 메서드를 호출한다.
public void notify(Session session, EventCode event) {
// 사용자에게 푸시 알림을 발송한다.
...
}

SseEmitter 객체를 관리하는 레파지토리는 아래와 같이 구성되어 있습니다.

  • SSELocalRepository
@Component
public class SSELocalRepository {

private final Map<String, SseEmitter> sseEmitterMap = new ConcurrentHashMap<>();

public void put(String key, SseEmitter sseEmitter) {
sseEmitterMap.put(key, sseEmitter);
}

public Optional<SseEmitter> get(String key) {
return Optional.ofNullable(sseEmitterMap.get(key));
}

public void deleteById(String key) {
sseEmitterMap.remove(key);
}
}

알림 시스템에서는 여러 클라이언트가 동시에 구독하고 데이터를 전송할 수 있으므로, 동시성을 제어하는 것이 중요합니다.
그에 맞게 thread-safe한 ConcurrentHashMap을 사용하면서, 안전하게 저장할 수 있습니다.

[Ref. ConcurrentHashMap는 어떻게 Thread-safe 한가]

메시지를 받는 클라이언트는 js의 EventSource를 통해 쉽게 SSE연결을 맺고, 데이터를 전송받을 수 있습니다.

  • messageReceiver.js
var eventSource = new EventSource("/html/api/events");
eventSource.addEventListener('message', eventListener);

var eventListener = function (e) {
const data = JSON.parse(e.data);
if (data.code == "HI") {
alert(data.msg);
} else if (data.code == "BYE") {
alert(data.msg);
location.href = "/html/login";
}
};

SSE는 위 코드처럼 구현이 매우 쉽고, 간단합니다. 그러나 다중 서버 환경에서는 고려해야 할 것이 있습니다.

예를 들어, 아래 이미지와 같이 이중화된 서버와 각 서버에 접속한 관리자와 일반 사용자가 있습니다.
이때, 관리자는 어떻게 사용자에게 실시간으로 알람을 보낼 수 있을까요?

처음에는 SseEmitter 객체를 세션과 같이 Hazelcast의 메모리 공유 객체인 IMap(map 형태의 저장 구조)에 저장하면 되지 않을까? 생각했습니다.
그러나 Hazelcast의 IMap은 당연히도 Serializable인터페이스를 구현한 객체, 즉 직렬화/역직렬화가 가능한 객체만을 저장할 수 있습니다.

IMap은 여러 JVM 사이에 데이터를 저장하고 공유하는데, 이러한 분산 데이터 구조에서는 데이터를 네트워크를 통해 전송하거나 디스크에 저장할 수 있어야 합니다.
이러한 작업을 수행하려면 데이터를 바이트 스트림으로 변환하는 과정, 즉 직렬화/역직렬화가 필요합니다.

SseEmitter는 내부적으로 HTTP 응답과 관련된 ResponseEntity와 같은 직렬화할 수 없는 객체를 참조하고 있기 때문에 서버 간에 메모리를 통해 공유하는 것이 불가능했습니다.

Hazelcast Topic

Hazelcast는 Topic이라는 컴포넌트가 있습니다.

게시/구독(pub/sub) 모델로, 각 노드(서버)들 끼리 메시지를 발송하고, 리스너를 통해 수신할 수 있는 분산 메시징 기능(Distributed Messaging System)입니다.

이 Hazelcast Topic을 이용해 위 WAS A에 접속한 관리자(Admin)가 알람을 보낼 대상(User1)의 SSE 연결이 동일한 서버에 존재하지 않는 경우, WAS B에게 메시지를 보내도록 구현할 것입니다.

User1의 SSE 연결 객체(SseEmitter)를 소유한 서버는, 메시지를 수신하면 해당 사용자에게 SSE를 통해 데이터를 전송합니다.

하나의 노드는 Topic을 통해 Hazelcast의 구성원인 모든 노드에게 메시지를 보내고, 리스너를 구현한 노드는 모두 메시지를 받을 수 있다.

Hazelcast Topic을 이용해 아래와 같이 메시지 송수신 전용 클래스를 생성했습니다.

  • HazelcastMessagingService
@RequiredArgsConstructor
public class HazelcastMessagingService<T extends Serializable> {

private final HazelcastInstance hazelcastInstance;

// 토픽 생성 및 조회
public ITopic<T> getOrCreateTopic(String topicName) {
return hazelcastInstance.getTopic(topicName);
}

// 토픽에 리스너를 등록
public void registerListener(String topicName, MessageListener<T> listener) {
ITopic<T> topic = getOrCreateTopic(topicName);
topic.addMessageListener(listener);
}

// 토픽에 메시지를 발행
public void publishMessage(String topicName, T message) {
ITopic<T> topic = getOrCreateTopic(topicName);
topic.publish(message);
}

}

Topic을 통해 메시지를 송수신 할 때에도 마찬가지로 직렬화 가능한 객체만 주고받을 수 있어 Serializable을 구현한 객체를 Generic type으로 받게 했습니다.

SseEmitter는 자체를 직렬화할 수 없어 Hazelcast를 통한 메모리 공유가 불가능했지만, 그 메타데이터인 사용자의 key와 메시지를 필드로 갖는 객체를 정의하고 Serializable을 구현하여 직렬화할 수 있게 한 후, 서버의 메시지 객체로 사용할 수 있습니다.

그리고 Generic type을 사용하는 클래스를 bean으로 사용하기 위해 FactoryBean interface를 구현했습니다.

  • HazelcastMessagingServiceFactoryBean
public class HazelcastMessagingServiceFactoryBean<T extends Serializable> implements FactoryBean<HazelcastMessagingService<T>> {

private Class<T> messageType;

@Autowired
private HazelcastInstance hazelcastInstance;

public HazelcastMessagingServiceFactoryBean(Class<T> messageType) {
this.messageType = messageType;
}

@Override
public HazelcastMessagingService<T> getObject(){
return new HazelcastMessagingService<>(hazelcastInstance);
}

@Override
public Class<?> getObjectType() {
return HazelcastMessagingService.class;
}

@Override
public boolean isSingleton() {
return true;
}
}

위와 같이 Generic Type을 활용한 FactoryBean 구현은 코드의 재사용성을 향상시킵니다. 메시지의 형식을 단순한 문자열이 아닌 객체로 주고받을 때, 하나의 타입에 얽매이지 않고 필요한 클래스 타입을 활용할 수 있게 합니다.

이번 프로젝트에서는 SSEEvent 클래스를 생성하여 각 노드들의 메시지 및 클라이언트에게 발송하는 데이터 객체로 사용했습니다.

  • SSEEvent
public class SSEEvent implements Serializable {

private String sessionId;
private EventCode code;
private String msg;

}

그리고 Hazelcast 설정 파일에 아래와 같이 빈 등록을 해줍니다.

  • HazelcastConfiguration
@Bean
public HazelcastMessagingServiceFactoryBean<SSEEvent> hazelcastMessagingServiceFactoryBean() {
return new HazelcastMessagingServiceFactoryBean<>(SSEEvent.class);
}

클라이언트에게 메시지를 발송하는 EventNotificationService 는, 빈 생성이 완료되면 @PostConstruct에 의해 초기화 과정을 거칩니다. 다른 노드로부터 받는 메시지 리스너를 설정하는 과정입니다.

  • EventNotificationService — (2)Hazelcast Topic 수신 및 발신부
@Service
@RequiredArgsConstructor
public class EventNotificationService {

private final ServiceMessageService messageService;
private final SSELocalRepository sseLocalRepository;
private final HazelcastMessagingService<SSEEvent> hazelcastMessagingService;
public static final String HZ_MESSAGE_TOPIC_NAME = "ALERT_SOMETHING";

@PostConstruct
public void init() {
// message listener : hazelcast의 다른 노드로부터 받은 메시지 listener 등록
hazelcastMessagingService.registerListener(HZ_MESSAGE_TOPIC_NAME, message -> {
SSEEvent event = message.getMessageObject();
if(isSseEmitterPresentInLocal(event.getSessionId())) {
sendToLocalClient(event); // 클라이언트에게 메시지 전송
};
});
}

// SseEmitter Map에 해당 클라이언트의 연결 객체가 존재하는지 확인
private boolean isSseEmitterPresentInLocal(String sessionId) {
return sseLocalRepository.get(sessionId).isPresent();
}

// 사용자에게 알림 발송이 필요한 시점에 이 메서드를 호출한다.
public void notify(Session session, EventCode event) {
// 대상 사용자의 연결 객체가 내 로컬 맵에 존재하는지 확인하고
// 있으면 데이터를 발송하고, 없으면 다른 노드들에게 메시지를 발송한다.
...
}

public SseEmitter subscribe(Session session) {
...
}

private SseEmitter createEmitter(Session session) {
...
}
...
}

메시지 리스너에서는, SSEEvent객체의 sessionId 필드로 현재 내 서버의 SSELocalRepository에 대상 클라이언트의 SseEmitter가 있는지 확인하고, 있으면 클라이언트에게 데이터를 전송(푸시알림)합니다.

클라이언트에게 푸시 알림을 보내고 싶은 시점에 호출할 notify 메서드를 구현합니다.

  • EventNotificationService — (3)푸시 알림 요청 및 처리부
public void notify(Session session, EventCode event) {

EventCode code = null == event ? EventCode.PING : event;
String msg = "";

switch (code) {
case HI:
msg = "안녕하세요. 적당히 바람이 시원해 기분이 너무 좋아요. 유후";
break;
case BYE:
msg = "안녕은 영원한 헤어짐은 아니겠지요, 다시 만나기 위한 약속일 거야.";
break;
}

SSEEvent SSEEvent = SSEEvent.builder().sessionId(session.getId()).code(code).msg(msg).build();

// 로컬 map에 타겟 관리자(세션)의 SseEmitter가 존재하면 바로 클라이언트에게 이벤트를 전송.
if(isSseEmitterPresentInLocal(session.getId())) {
sendToLocalClient(SSEEvent);
} else {
// 존재하지 않으면 다른 hazelcast 노드로 메시지를 전송.
hazelcastMessagingService.publishMessage(HZ_MESSAGE_TOPIC_NAME, SSEEvent);
}

}

private void sendToLocalClient(SSEEvent SSEEvent) {

String sessionId = SSEEvent.getSessionId();
SseEmitter emitter = sseLocalRepository.get(SSEEvent.getSessionId()).orElseGet(null);
if (emitter != null) {
Map<String, String> dataForClient = new HashMap<>();
dataForClient.put("code", SSEEvent.getCode().name());
dataForClient.put("msg", SSEEvent.getMsg());
emitter.send(dataForClient); // 최종적으로 클라이언트에게 발송.
}

}

notify 메서드에서는 대상 사용자가 SSELocalRepository에 있는지 확인하여 있으면 데이터를 전송하고, 없으면 다른 노드들로 메시지를 전송합니다.
그리고 리스너를 구현한 다른 노드들은 위에서 언급한 과정을 통해 사용자에게 메시지를 전송하거나, 무시합니다.

한 가지 주의할 점은, SseEmitter의 연결이 종료되었으면 서버의 Map에 담긴 자원을 꼭 정리해 memory leak을 방지해야 한다는 점입니다.

SseEmitter가 제공하는 onTimeout, onCompletion, onError 등의 콜백 메서드를 적극 활용하여 SseEmitte의 이벤트를 감지하고 map에서 해당 객체를 삭제하거나 그 외의 예외 처리를 통해서 꼭 정리가 필요합니다.

여기까지 SSE와 Hazecast Topic 컴포넌트를 활용한 다중 서버 환경에서의 실시간 알람 기능에 대해 알아보았습니다.

작은 정보지만 필요할 때 큰 도움이 될 수 있길 바랍니다. 감사합니다.😃

--

--