금융서비스 MSA 전환기- 서버 간 비동기 메시지 기반 통신 처리(3편)

김형래
finda 기술 블로그
23 min readSep 25, 2023

안녕하세요, FINDA 현금그로스 PG 자산/신용관리 PT 백엔드 개발자 김형래 입니다.

이번 글에서는 자산/신용관리 PT 에서 대량 트래픽에서 운영 서비스에 유입되는 트래픽을 원활이 처리하기 위해서 서버 간 메시지를 정의해서 비동기로 처리했던 프로젝트에 대해 알아보도록 할게요!

왜 비동기 처리를 선택했나?

이전 글에서 다뤄본 CircuitBreaker 로 대량 트래픽 처리 시, 해결되지 않는 부분이 존재했습니다. CircuitBreaker 는 온전히 트래픽 유입을 조절하는 부분에 초점이 맞추어져 있습니다. 그러나, 내부 서비스들 간의 통신에서 Latency 가 오래 걸리는 API 가 있고, Thread 를 더욱 안정적으로 사용하기 위해서는 다른 방법을 강구해 봐야 한다고 판단했습니다. 실제 FINDA APP 홈 화면에서 호출되는 API 가 위에서 이야기한 Latency 가 오래 걸리는 API 로서 이부분을 해결하고자 자산관리, 마이데이터관리 백엔드 개발자가 모여 많은 논의를 진행했습니다. 논의 결과 서버 간 비동기 메시지를 기반으로 통신을 처리하자고 합의점을 이루었습니다. 이번 작업에서 EDA(Event Driven Architecture) 기반으로 아래의 기술들을 적용해서 진행해 보기로 했습니다.

Apache Kafka : 비동기 메시지 기반 통신 처리(Produce, Consume)

Redis : SSE 통신 중에 Server Scale Out 대응 처리(Pub/Sub), Distributed Lock

SSE : 단방향 이벤트 발생 시, Server 에서 Client 로 다수의 응답 처리

위에 소개한 기술들을 왜 선택하게 되었는지 하나씩 살펴 볼게요.

Apache Kafka 는 왜 사용하나요?

위에 질문에 답을 하기 위해서는 Apache Kafka 가 무엇인지 먼저 알아봐야겠네요. Apache Kafka 는 빠르고 확장 가능한 작업을 위해 데이터 피드의 분산 스트리밍, 파이프 라이닝 및 재생을 위한 실시간 스트리밍 데이터를 처리하기 위한 목적으로 설계된 오픈 소스 분산형 게시/구독 메시징 플랫폼입니다. Kafka는 Server 클러스터 내에서 데이터 스트림을 레코드로 유지하는 방식으로 작동하는 브로커 기반 솔루션입니다. Kafka Server 는 여러 데이터 센터에 분산되어 있을 수 있으며 여러 Server 인스턴스에 걸쳐 레코드 스트림(메시지)을 토픽으로 저장하여 데이터 지속성을 제공할 수 있습니다. 토픽은 레코드 또는 메시지를 키, 값 및 타임 스탬프로 구성된 일련의 튜플, 변경 불가능한 Python 객체 시퀀스로 저장합니다. Apache Kafka는 오늘날 시장에서 가장 빠르게 성장하는 오픈 소스 메시징 솔루션 중 하나입니다. 이는 주로 분산 시스템에 우수한 로깅 메커니즘을 제공하는 아키텍처 기반 설계 패턴 때문입니다. 실시간 로그 스트리밍을 위해 특별히 제작된 Apache Kafka는 다음 사항을 필요로 하는 애플리케이션에 적합합니다.

  • 서로 다른 구성 요소 간의 안정적인 데이터 교환
  • 애플리케이션 요구 사항 변경에 따라 메시징 워크로드를 분할하는 기능
  • 데이터 처리를 위한 실시간 스트리밍
  • 데이터/메시지 재생에 대한 기본 지원

Redis 는 왜 사용하나요?

Redis는 빠른 오픈 소스 인 메모리 키 값 데이터 구조 스토어입니다. Redis는 다양한 인 메모리 데이터 구조 집합을 제공하므로 다양한 사용자 정의 애플리케이션을 손쉽게 생성할 수 있습니다. 주요 Redis 사용 사례로는 캐싱, 세션 관리, pub/sub 및 순위표를 들 수 있습니다. Redis는 현재 가장 인기 있는 키 값 스토어로서, BSD 라이선스가 있고, 최적화된 C 코드로 작성되었으며, 다양한 개발 언어를 지원합니다. Redis는 REmote DIctionary Server의 약어입니다. Redis는 속도가 빠르고 사용이 간편하여 최고의 성능이 필요한 웹, 모바일, 게임, 광고 기술 및 IoT 애플리케이션에서 널리 사용 되고 있습니다. 또한, 아래의 장점을 가지고 있습니다.

  • 놀라울 정도로 빠른 성능
  • 인 메모리 데이터 구조
  • 다양성과 사용 편의성
  • 복제 및 지속성
  • 선호하는 개발 언어 지원

SSE 는 왜 사용하나요?

Server 의 Event 를 Client 전달하는 방법의 장단점을 아래와 같이 알아 볼게요.

Polling, Long Polling, Server Sent Events and WebSockets

Polling

  • Client 가 http request를 Server 로 계속 보내면서 Event 를 전달 받는 방식
  • 계속 Request 를 보내기 때문에 Client 가 많아지면 Server 의 부담 급증
  • Http request connection을 연결 및 해제 자체가 부담되는 방식
  • 실시간 정도의 빠른 응답을 기대 어려움
  • Http Overhead 발생(정보의 신뢰성을 판단하기 위해 전송되는 헤더 정보로 데이터량, 처리시간 증가)
  • 일정하게 갱신되는 Server Data 의 경우 유용

Long Polling

  • Server 쪽에서 접속 시간을 길게 설정하는 방식
  • Client 에서 Server 로 Http request 전송
  • Server 응답 데이터가 없으면 기다리고, Event 가 존재하는 순간 Response 를 전달 후 연결 해제
  • Client 에서 다시 Http request 를 전송하여 Server 의 다음 Event 대기
  • Polling 방식보다 Server 부담은 줄고, Event 간격이 좁으면 차이 없음
  • 다수의 Client 에게 Event 가 동시 발생 시, Server 부담 증가

Web Sockets VS SSE(Server-Sent-Event)

Socket 과 SSE 에 가장 큰 차이점이라면 Socket 은 양방향으로 데이터를 주고 받을 수 있지만, SSE를 사용하게 되면 클라이언트는 데이터를 받을 수만 있게 됩니다. 아래의 표에 좀 더 상세히 차이점을 나눠보았습니다.

Web Socket VS SSE

Socket 은 최대 동시 접속 수는 Server 에 셋업에 따라 얼마나 많이 접속할 수 있는지 정해지기 때문에 딱 정해진 숫자는 없습니다. SSE 는 아래에 보면 HTTP2 를 사용하지 않을 경우에는 브라우저에서 6개로 제한하고 있습니다. 즉 Tab 을 6개 초과 시 안될 수 있습니다. 그리고 앞으로도 크롬에나 파이어폭스에서는 고치지 않을 것이라고 합니다. 하지만 HTTP2 를 사용하면 기본으로 100개를 허용합니다.

Warning Message of SSE

Network Tab 을 열어서 보면 Socket 은 계속 Server 랑 계속 handshake를 하고 있습니다. 하지만 SSE 는 한번 열고 계속 기다립니다. Socket 이 Server 에 보내는 데이터가 적지만 계속 열려 있을 경우 양방향 통신으로 데이터 사용량이 더 크게 됩니다. 자산관리 서버에서는 마이데이터 정보를 Refresh 하는 기능을 처리하기 위한 필수 요건은 비동기 처리와 단방향 통신입니다.

즉, Refresh 요청을 Client 가 보내면 Server 는 마이데이터관리 서버를 통해 데이터를 받는 대로 Client 로 보내면 됩니다. 그래서 Server 의 Event 를 Client 전달하는 방법은 SSE 가 가장 적합하다고 판단했으며, 각 서비스 별로 아래의 설계로 진행했습니다.

어떻게 설계해야 비동기 처리일까?

기존에는 Client 로부터 받은 요청에 대해서 Client 와 Server 또는 Server 와 Server 사이에서 동기적으로 처리하고 있었습니다. 그러다 보니, 대량의 트래픽이 몰리면서 유한한 자원 내에서 요청에 대한 처리량이 비 효율적으로 나타나고 있었습니다. 그래서 메시지 기반으로 서로 간의 의존성을 제거하면서 비동기 통신으로 설계했습니다. 아래의 설계를 하나씩 살펴볼게요.

Asynchronous message-based communication processing design - EDA(Event Driven Architecture)

Step1. Client 에서 자산관리(AMS API) 서버 API 요청(SSE Connection)

  • Client 에서 아래의 Refresh API 를 요청해서 SSE Connection 을 생성합니다. 여기서 중복된 API 인지 Redis 를 통해 확인하고 정보 조회를 요청합니다.
@GetMapping(value = "/loans-async", produces = MediaType.TEXT_EVENT_STREAM_VALUE, consumes = MediaType.ALL_VALUE)
@ResponseStatus(value = HttpStatus.OK)
public ResponseEntity<SseEmitter> getLmsAccountListByAsync(
@Parameter(description = "사용자 인증토큰", required = true) @RequestHeader(value = "X-Auth-Token", required = true) String authToken,
@Parameter(description = "연동 접점") @RequestParam(value = "source", required = false, defaultValue = "loanmanage") String source) {
long userId = authService.getUserId();

sseEmitterService.checkDuplicatedCallAndSaveForApiCallInfo(userId, emitterEventName);

SseEmitter sseEmitter = sseEmitterService.subscribeEvent(userId, emitterEventName,
emitterTimeout, SseApiType.LMS_LOAN_REFRESH.name());

LmsLoanRefreshAsyncRequest request = LmsLoanRefreshAsyncRequest.builder()
.authToken(authToken).userId(userId).source(source).build();
lmsQueryService.getLmsAccountListByAsync(request, LmsAccountBaseListRspDto.class);

return ResponseEntity.ok().headers(SseHeader.get()).body(sseEmitter);
}

Step2. 여신(LMS API)/수신(DMS API) 관리 서버의 비동기 메시지 기반 통신(Produce)

  • 마이데이터관리 서버(MYDATA API)로 Kafka Topic 으로 produce 하면서 Refresh 를요청합니다.
@Qualifier("reactiveByteKafkaProducerTemplate")
private final ReactiveKafkaProducerTemplate<String, byte[]> reactiveByteKafkaProducerTemplate;

public void send(String topic, byte[] message) {
reactiveByteKafkaProducerTemplate.send(topic,message).subscribe();
}

Step3. 여신(LMS API)/수신(DMS API) 관리 서버의 Cache와 실제 응답

  • 아래는 여신관리 서버의 응답처리 부분입니다. Client 에 처음에는 Cache 응답을 주고, 비동기로 Refresh 응답을 요청해서 받아오는 구조입니다. 이렇게 처리하면서 Client 는 기존 동기 API 에서 Cache 사용 유무를 파라미터로 받아서 처리하고 있었으나, 이젠 비동기 메시지 통신 처리로 인해 구분하지 않고 받아서 화면에 렌더링할 수 있게 되었습니다.
@Async
public <T> void getLmsAccountListByAsync(LmsLoanRefreshAsyncRequest request, Class<T> clazz) {
if (sseEmitterService.hasEventEmitter(request.getUserId(), emitterEventName)) {
StringBuilder uriParam = new StringBuilder();
if (!StringUtils.isEmpty(request.getAuthToken())) {
uriParam.append("?source=");
uriParam.append(request.getSource());
}

BaseResponseDto responseDto = getLmsAccountListByAsyncByApiCall(request,
BaseResponseDto.class);
LmsAccountBaseListAsyncRspDto lmsAccountBaseListAsyncRspDto = objectMapper.convertValue(
responseDto.getData(), LmsAccountBaseListAsyncRspDto.class);
responseDto = new BaseResponseDto<>(
lmsAccountBaseListAsyncRspDto.getLmsAccountBaseListRspDto());

if (lmsAccountBaseListAsyncRspDto.getLoansType().equals(LoansType.MYDATA)) {
// SSE Emitter send the cache
sseEmitterService.sendEventCacheResult(request.getUserId(), emitterEventName,
responseDto);
} else {
responseDto.setResult(new ResultInfoDto(AmsResponseCode.CODE20010));

// SSE Emitter send the cache and SSE Close
sseEmitterService.sendEventResult(request.getUserId(), emitterEventName,
responseDto);
}
}
}

Step4. 마이데이터관리 서버/Worker 서버(MyData Event Worker)

  • 마이데이터관리 서버는 마이데이터 Worker 서버와 Kafka Topic 으로 produce/consume 하면서 Refresh 를요청/응답합니다. 해당 Step 의 코드는 Step3, 6의 코드와 유사하므로 참고 하시길 바랍니다. 마이데이터관리 서버는 비동기 처리 상태 관리를 위해 Redisson 의 Distributed Lock 을 사용했고, Redis 서버의 부하가 적게 하기 위해서 Java Redis Client 를 Redi-sson 로 선정했습니다. 아래의 코드는 Distributed Lock 을 사용한 코드입니다. redissonClient.getLock() 을 통해 Lock 을 획득하고, tryLock() 을 통해 Lock 획득을 시도합니다. Lock 획득 성공 시, 임계 영역에 진입하여 비즈니스 로직을 실행하고, finally 블럭에서 unlock() 을 처리합니다.
public void hincrByLock(final String key, final String field, final int cnt) {
if (key != null) {
String lockName = key + LOCK_POST_FIX;
final RLock redissonClientLock = redissonClient.getLock(lockName);

try {
redissonClientLock.tryLock(3, 2, TimeUnit.SECONDS);
final int fieldCnt = Integer.parseInt(this.hget(key, field).toString());
if (fieldCnt <= 0) {
return;
}

this.redissonClient.getMap(key).addAndGet(field, cnt);
} catch (Exception e) {
throw new FindaErrorException(
String.valueOf(AmsResponseCode.CODE50040.getRspCode()), e);
} finally {
redissonClientLock.unlock();
}
}
}

Step5. 마이데이터관리 Worker 서버

  • MYDATA Portal 이나 외부 금융사와 통신하면서 마이데이터관리 서버에서 요청한 결과를 전송해 줍니다.

Step6. 여신/수신 관리 서버의 비동기 메시지 기반 통신(Consume)

  • 마이데이터관리 서버에서 Refresh 응답을 Kafka 로 Produce 해주면 여신/수신 관리 서버에서 Consume 해서 DB 와 Redis 에 저장합니다.
@KafkaListener(
topics = {"${kafka.topic.mydata.loan-refresh-response.consumer}"},
groupId = "${spring.kafka.consumer.group-id}",
containerFactory = "pushEntityKafkaListenerContainerFactory")
public void loanRefreshResponseListen(@Payload byte[] message, Acknowledgment acknowledgment) {
Long userId = null;
String txId = StringUtils.EMPTY;
try {
CommonMessage<RequestApisAsyncRspDto> commonMessage = lmsDataParser.parse(message, CommonMessage.class);
RequestApisAsyncRspDto requestApisAsyncRspDto = lmsDataParser.parse(objectMapper.writeValueAsString(commonMessage.getData()), RequestApisAsyncRspDto.class);
userId = requestApisAsyncRspDto.getUserId();
txId = requestApisAsyncRspDto.getTxId();
mydataRequestService.requestAsyncResponse(requestApisAsyncRspDto);
acknowledgment.acknowledge();
} catch (Exception e) {
processListenerError(userId, txId, e, acknowledgment);
}
}
  • Server Scale Out 대응을 위해서 어느 POD 에 SSE Emitter 가 있는지 알 수 없으므로 Redis 의 publish/subcribe 로 구현해서 모든 자산관리 서버 POD 에 Broadcast 합니다.
private final RedissonClient redissonClient;

public Long send(String topic, String message) {
return redissonClient.getTopic(topic).publish(message);
}

Step7. 자산관리 서버에서 Client 로 응답 전송(SSE 응답, Connection Close)

  • Refresh 응답을 Redis 의 Subscribe 를 받아서 해당 SSE Emitter 가 자산관리 서버의 In-memory 에 있는지 확인합니다. 확인 후, 존재한다면, Client 에 Refresh 응답을 주고 SSE Connection 을 종료합니다.
@PostConstruct
public void init() {
redissonClient.getTopic(redisTopicLmsToAmsLoanRefreshResponseSubscribe)
.addListener(String.class, ((channel, message) -> {
Long userId = null;
String txId = StringUtils.EMPTY;
try {
LmsLoanRefreshAsyncResponseMessage lmsLoanRefreshAsyncResponseMessage =
amsDataParser.parse(
message,
LmsLoanRefreshAsyncResponseMessage.class);

userId = lmsLoanRefreshAsyncResponseMessage.getUserId();
txId = lmsLoanRefreshAsyncResponseMessage.getTxId();
lmsQueryService.processLmsRefreshAsyncResponse(
lmsLoanRefreshAsyncResponseMessage);
} catch (Exception e) {
processListenerError(userId, txId, e);
}
}));
}

public void processLmsRefreshAsyncResponse(LmsLoanRefreshAsyncResponseMessage message) {
long userId = message.getUserId();

if (sseEmitterService.hasEventEmitter(userId, emitterEventName)) {
LmsAccountBaseListRspDto lmsAccountBaseListRspDto = message.getBody();
BaseResponseDto<LmsAccountBaseListRspDto> responseDto = new BaseResponseDto<>(
lmsAccountBaseListRspDto);
responseDto.setResult(new ResultInfoDto(AmsResponseCode.CODE20010));

// SSE Emitter send
sseEmitterService.sendEventResult(userId, emitterEventName, responseDto);
}
}
  • 위에 코드가 Redis 의 Subscribe 를 받아오는 부분입니다. 내부적으로 txId 를 전달해 주어 비동기 메시지 기반 통신 처리 시, 트래킹이 편리하도록 구성했습니다. processLmsRefreshAsyncResponse 에서 보면 SSE Emitter 가 존재할 경우, Client 에 Event 를 전송합니다. 여기서 좀 더 깊게 들어갈 볼게요. 아래의 sendEventResult 를 살펴보면 sseEmitterRepository 인 In-memory 에서 SSE Emitter 를 조회하는 부분을 보실 수 있습니다. sendEvent 내부를 들여다 보면 eventName 과 data 를 설정해주면 event 를 Client 에 전송할 수 있는 구조입니다.
public void sendEventResult(Long userId, String emitterEventName, Object response) {
String eventName = sseEmitterManager.makeEmitterEventName(emitterEventName,
String.valueOf(userId));
SseEmitter emitter = sseEmitterRepository.findEmitterByEventName(eventName);
if (!ObjectUtils.isEmpty(emitter)) {
sseEmitterManager.sendEvent(emitter, eventName, response, userId);
emitter.complete();

String redisKey = RedisPrefixHelper.genKey(emitterEventName, String.valueOf(userId));
redisCommonService.evictRedisCache(redisKey);
}
}

public void sendEvent(SseEmitter emitter, String eventName, Object data, long userId) {
try {
AmsLogUtil.writeSseEmitterResponseLog(
new SseEmitterResponseLog(emitter, eventName, data, userId));
emitter.send(SseEmitter.event()
.name(eventName)
.data(data));
} catch (Exception e) {
sseEmitterRepository.deleteByEventName(eventName);
String redisKey = RedisPrefixHelper.genKey(eventName, String.valueOf(userId));
redisCommonService.evictRedisCache(redisKey);
}
}

적용 후, 결과 분석

Client 가 Cache 와 실제 데이터를 받을 때 결정이 쉬워졌습니다. 다만, 전체적으로 서비스의 설계는 좀 더 복잡해졌습니다. 그래도 동기 API 를 호출하지 않아서 Thread 사용을 적절하게 할 수 있고, 이전 보다 트래픽을 더 많이 받을 수 있는 구조가 되었습니다.

금융서비스 MSA 전환기 1, 2, 3 을 기획하면서 많은 고민을 했고, 빠른 생산성을 요구하는 PT 내에서 기술과 비즈니스 간의 논의와 타협이 많았던 것으로 기억합니다. 추후 좀 더 재미난 글로 만나뵙겠습니다. 여기까지 긴 글을 읽어주셔서 감사합니다.

--

--