허튼짓은 그만: Kafka Streams를 활용한 실시간 이상 로그인 감지 시스템 도입하기

비정상 사용자를 모니터링하고 대응하기 위해 만든 이상 로그인 감지 시스템과 기반 기술인 카프카 스트림즈(Kafka Streams) 도입기를 소개합니다.

Heewon Chae
MUSINSA tech
31 min readMay 8, 2024

--

안녕하세요. 무신사 회원개발팀 백엔드 엔지니어 채희원입니다. 😀

회원개발팀은 무신사에서 회원가입 등 회원과 관련된 도메인을 맡고 있으며 메시지, 후기, 적립금의 도메인을 담당하는 팀입니다.

이 글에서는 비정상 사용자를 모니터링하고 대응하기 위해 만든 이상 로그인 감지 시스템과 기반 기술인 카프카 스트림즈(Kafka Streams) 도입기를 소개하겠습니다.

[그림 1] 이상 감지 로그인을 표현

도입 배경

무신사는 가파른 성장을 이룩하고 있으며 매년 사용자 수가 크게 증가하고 있습니다. 이에 따라 어뷰징, 매크로, 해킹 시도와 같이 비정상적인 사용자도 꾸준히 늘어나고 있습니다.

[그림 2] 무신사 성장 그래프

이러한 비정상 사용자를 확인하기 위한 방법으로는 WAF를 통한 트래픽 확인, 주문 정보 확인, VoC(Voice of Customer) 접수 등이 있습니다. 이런 확인 방법은 이슈가 발생한 후에야 파악할 수 있다는 문제점이 있습니다. 이에 이슈 발생 전 비정상 사용자를 모니터링하고 조치하여 피해를 막을 필요성이 대두되었습니다.

사용자는 로그인 시도 시 많은 정보를 남깁니다. IP, ID, Referrer 등의 정보 패턴을 분석한다면 로그인 단계에서 비정상적인 접속을 파악할 수 있으리라 생각했습니다. 이를 위해 실시간 모니터링과 선제적인 조치가 가능한 이상 로그인 감지 시스템을 도입하였습니다.

기술 선택

이상 로그인 감지 시스템을 개발하기 위한 기술 선택 과정과 결론에 대해 설명드리겠습니다.

요구 사항 확인

이상 로그인을 감지하기 위해서는 실시간으로 들어오는 로그인 정보를 활용하여 데이터를 집계하고 분석해야 했습니다. 그러면서도 적은 학습 비용의 기술을 활용하면서 서드파티 인프라를 추가하지 않고 문제를 해결해야 했습니다.

활용 데이터 분석

[그림 3] 로그인 이력 정보 데이터의 흐름

분석 대상인 로그인 이력 정보는 로그인 처리 후 Kafka를 통해 이벤트를 발행, 후처리로 RDB에 저장하는 프로세스를 가지고 있습니다.

로그인 이력 정보를 실시간으로 집계하기 위해서는 초당 수백 건으로 들어오는 데이터와 연관 있는 특정 기간의 데이터들을 집계하여 패턴을 분석해야 했습니다.

[그림 4] Batch를 이용한 모델

우선적으로 떠오른 것은 RDB를 조회하는 배치를 개발하는 모델이었습니다. 하지만 RDB에 직접 조회하고 집계하는 방식은 부하와 실시간 처리의 보장이 어려워 제외되었습니다. 따라서 Kafka 메시지를 활용하여 데이터를 처리하기로 결정했습니다.

기술 후보 비교

Kafka 메시지를 이용하여 집계를 위한 기술 후보로는 Flink, Spark, Kafka Streams, KsqlDB이 후보로 거론되었습니다.

1) Apache Spark

  • 분산 처리를 위한 오픈 소스 클러스터 컴퓨팅 프레임워크.
    대규모 데이터 처리, 데이터 집계, 머신 러닝 등 다양한 작업을 지원.
  • 장점
    — 메모리 내에서 데이터 처리를 수행하여 빠른 속도를 제공.
    — SQL 쿼리, 스트리밍 처리, 머신 러닝 등 다양한 기능을 제공.
    — 사용자가 데이터를 탐색하고 실험할 수 있는 대화형 쉘을 제공.
  • 단점
    — 대규모 데이터셋을 처리할 때 메모리 사용량이 높다.
    — 마이크로 배치 방식을 사용으로 인한 지연이 발생.

2) Apache Flink

  • 스트리밍 및 배치 처리를 위한 분산 데이터 처리 엔진.
    실시간 이벤트 스트리밍과 배치 프로세싱을 모두 지원.
  • 장점
    — 스트림 처리에 초점을 맞춰 실시간 처리 및 일괄 배치를 제공.
    — 이벤트 시간 기반의 처리를 지원하여 데이터의 지연과 순서를 관리.
    — 최적화된 실행 계획을 생성하여 성능을 향상.
    — 내부 상태 관리를 통해 정확한 결과를 보장.
  • 단점
    — 러닝 커브가 높다.
    — 메모리 요구량이 많아 대규모 클러스터가 필요.

3) Kafka Streams

  • Kafka 클러스터에서 데이터를 처리하고 분석하기 위한 클라이언트 라이브러리이다.
    애플리케이션에서 데이터를 읽고 쓸 수 있으며, 스트림 처리를 위한 고수준 API를 제공.
  • 장점
    — Kafka cluster와 함께 확장되므로 처리량을 쉽게 조정 가능.
    — Kafka cluster와 연동되어 데이터 손실 없이 안정적 내결함성 제공.
    — Java 라이브러리로 구현.
    — Exactly-once를 보장.
  • 단점
    — 일부 복잡한 처리 작업에는 기능이 제한적.
    — Kafka 컨슈머 그룹을 사용하여 데이터를 처리. 따라서 컨슈머 그룹의 부하나 지연이 Kafka Streams 애플리케이션의 처리 속도에 영향.

4) KsqlDB

  • Kafka 스트림 처리를 위한 오픈 소스 SQL 엔진.
    SQL 스타일의 문법을 사용하여 Kafka topic에서 stream을 query하고 처리 가능.
  • 장점
    — 코드 작성 없이 SQL query로 데이터 처리가 가능한 높은 생상성 제공.
    — 스트림 데이터를 실시간으로 처리할 수 있어 실시간 분석에 적합.
    — 외부 시스템과의 데이터 통합을 위해 내장된 Kafka Connect를 지원.
  • 단점
    — 세밀한 기능 구현이 어려워 복잡한 작업에는 적합하지 않음.
    — 스트림 처리를 위해 Kafka 클러스터와 함께 동작하므로 클러스터의 스케일링 제한에 따라 성능이 제약.

여러 논의를 거친 결과, 최종적으로 아래와 같은 이유로 Kafka Streams를 사용하기로 결정하였습니다.

1. 서드파티를 추가 사용하지 않음.
2. Java 라이브러리를 사용.
3. 복잡한 기능을 구현할 수 있음.
4. 멀티 모듈 환경에서 별도의 독립된 서비스로 관리 가능함.

Kafka Streams란

[그림 5] Kafka Streams와 Kafka Broker간의 데이터 흐름

Kafka Streams는 Kafka에서 공식적으로 지원하는 라이브러리입니다. Broker에 적재되는 메시지를 실시간으로 더 빠르고 안전하게 처리할 수 있도록 기능을 제공합니다. 또한 Topic으로 들어오는 데이터를 소비(consuming)하여 Kafka Streams에서 제공하는 처리 로직을 적용하여 집계하거나 재가공 후 다른 Topic으로 발행하거나 Kafka Connect를 이용하여 DB에 적재하는 등의 스트림 처리 기능을 제공합니다.

Kafka Strams의 특징

Kafka Streams는 JVM 기반 언어인 Java, Kotlin 등에서 사용 가능한 라이브러리를 지원합니다. Apache는 Kafka 버전에 맞춰 호환성을 제공하고 있습니다. 또한 트랜잭션과 상태 저장소를 이용하여 데이터 유실 없이 1번만 처리되는 것을 보장해 줍니다.

추상화 레벨

[그림 6] kafka Streams 추상화 레벨

이 그림은 Kafka Streams에 대한 추상화 레벨을 보여줍니다. 제일 아래의 Producer와 Consumer는 Kafka Streams가 Producer와 Consumer 위에서 동작함을 뜻합니다. 그리고 Processor API와 Streams DSL 순으로 추상화 레벨이 정해집니다.

  • Kafka에서 메시지(message)로 불렀던 데이터는 Kafka Streams 내에서는 레코드(record)라는 용어로 사용 됩니다.

1) Streams DSL
Streams DSL은 Processor API를 활용해 미리 구현해 놓은 API를 제공합니다. 다른 토픽 간의 join 등의 여러 API를 제공하며, 집계, 변환 등의 기능을 쉽게 사용할 수 있는 메서드들이 많아 대부분의 사용자에게 권장됩니다. 이러한 Streams DSL은 레코드의 흐름을 추상화한 KStream, KTable, GlobalKTable을 제공해 줍니다.

KStream은 데이터의 연속적인 스트림을 나타냅니다. 이는 데이터가 순차적으로 흘러가는 것을 의미합니다. 예를 들어, 로그 데이터나 실시간 이벤트 스트림과 같은 것을 KStream으로 처리할 수 있습니다. KStream은 변경 가능한 데이터이므로 여러 번 읽을 수 있습니다.

KTable은 key-value 쌍의 형태로 데이터를 저장하는 테이블을 나타냅니다. 각각의 키는 유일해야 하며, 키를 기반으로 데이터를 조회할 수 있습니다. 주로 데이터베이스의 테이블과 유사한 개념이며, 데이터의 변경이 이루어지면 해당 변경 사항을 갱신합니다.

GlobalKTable은 분산된 상태 저장소로서, 모든 스트림 프로세서 인스턴스에서 동일하게 볼 수 있는 테이블입니다. 일반적인 KTable과 달리 모든 인스턴스 간에 공유됩니다. 이것은 전역 상태 저장소로 사용되며, 모든 스트림 처리 애플리케이션이 동일한 상태를 공유할 때 유용합니다.

2) Processor DSL
Processor API는 Streams DSL처럼 미리 정의된 KStream, KTable, GlobalKTable과 같은 기능을 제공하지 않고, row level의 기능을 제공합니다. 이는 개발자가 더 세밀하게 로직을 개발할 수 있게 해줍니다. 하지만 필요한 기능은 매번 구현해야 하므로 코드가 길어지고 유지 보수 비용이 증가하며, 러닝 커브가 높아 새로운 사용자가 시스템에 진입하기 어려울 수 있습니다.

스트림의 변환(stateless와 stateful)

Kafka Streams에는 stateless와 stateful 두 개의 상태기반 프로세싱을 제공합니다.

1) stateless
stateless 변환은 처리를 위해 상태가 필요하지 않으며, Stream processor와 연결된 상태 저장소가 필요하지 않습니다.

2) stateful
stateful 변환은 입력을 처리하고 출력을 생성하기 위해 상태에 의존하며, 스트림 프로세서와 연결된 상태 저장소가 필요합니다. 예를 들어, 집계 작업에서는 윈도우 상태 저장소가 윈도우별 최신 집계 결과를 수집하는 데 사용됩니다. 조인 작업에서는 윈도우 상태 저장소가 정의된 윈도우 경계 내에서 지금까지 수신된 모든 레코드를 수집하는 데 사용됩니다. 또한 장애 발생 시 이슈 해결을 위해 내결함성 방식을 지원하여 안정성을 보장합니다.

상태 저장소

[그림 7] RocksDB logo

Kafka Streams에서 집계를 위한 stateful(상태 기반 처리)을 위해서는 상태 저장소가 필요합니다. Kafka Streams는 상태 기반 처리를 위해 내부적으로 RocksDB를 로컬에서 사용하여 상태 저장 기본 설정으로 제공합니다. 로컬 DB에 저장된 상태에 대한 변환 정보는 Kafka의 changelog(변경 로그) topic에 저장됩니다. 이를 통해 장애가 발생하더라도 상태 정보가 모두 안전하게 저장되기 때문에 장애 복구를 할 수 있습니다. 물론 Redis와 같은 외부 저장소를 사용할 수도 있지만, Kafka Streams는 수많은 Stream을 다뤄야 하는 특성상 외부 저장소 이용 시 네트워크 통신 지연이 성능적으로 좋지 않기에 내부 저장소 사용을 권장합니다.

내결함성

[그림 8] changelog topic을 이용한 내결함성

Kafka Streams의 상태 저장소는 내결함성을 위해 changelog를 사용합니다. Kafka Streams 애플리케이션이 시작될 때 상태 저장 노드를 감지하고 데이터가 누락되었다고 판단되면 changelog를 통해 복원합니다. 인메모리 저장소는 다시 시작할 때마다 레코드를 유지하지 않으므로 다시 시작한 후 changelog를 통해 복원이 필요합니다. changelog topic은 압축을 사용하여 각 키의 가장 오래된 레코드를 삭제하고 가장 최근 레코드만 안전하게 남깁니다. 이는 changelog의 크기가 끝없이 증가하는 것을 방지합니다.

Windowing

Kafka Streams에서는 일정 간격의 레코드들을 이용하여 집계가 필요할 때 window가 필요합니다. Kafka Streams는 Tumbling, Hopping, Session, Sliding 4개의 window를 제공하고 있습니다.

1) Tumbling window

[그림 9] tumbling window

Tumbling window는 시간 간격을 기반으로 하는 윈도우로, 고정된 크기의 겹치지 않는 윈도우를 모델링 합니다. 이 윈도우는 window의 크기라는 단일 속성으로 정의됩니다. 다시 말해, 설정한 시간마다 ‘고정된’ 윈도우를 생성하여 윈도우 내부에 존재하는 레코드들을 집계합니다.

2) Hopping window

[그림 10] Hopping window

Hopping window는 고정된 크기이지만 겹칠 수 있는 윈도우를 모델링 합니다. 윈도우의 크기와 직전 간격(hop)이라는 두 가지 속성으로 정의됩니다. 직전 간격은 윈도우가 이전 윈도우를 기준으로 얼마나 앞으로 이동하는지를 지정합니다. 예를 들어, 5분 크기와 1분 직전 간격으로 Hopping window를 구성할 수 있습니다. Hopping window는 겹칠 수 있기 때문에 데이터 레코드가 두 개 이상의 윈도우에 속할 수 있습니다.

3) Session window

[그림 11] session window

Session window는 키 기반 이벤트를 세션으로 집계하는 데 사용되며, 이를 세션화라고 합니다. 세션은 정의된 ‘inactivity gap’ 시간으로 구분됩니다. 이벤트가 세션 간격을 벗어나면 새 세션이 만들어집니다.

4) Sliding window

[그림 12] sliding window

Sliding window는 이전 윈도우들과는 다르게 조인 작업에만 사용되며 JoinWindows 클래스를 통해 지정할 수 있습니다. 시간 축을 따라 연속적으로 슬라이드 되는 고정된 크기의 윈도우를 모델링하며, 두 레코드의 타임스탬프 차이가 윈도우 크기 내에 있는 경우 동일한 윈도우에 포함된 것으로 간주합니다. Sliding window는 시간 축을 따라 이동하며 레코드가 Sliding window의 여러 스냅샷에 속할 수 있지만 레코드의 각 고유 조합은 하나의 Sliding window 스냅샷에만 나타납니다.

5) 정리

[표 1] window 종류와 설명

Kafka Streams 사용법

[그림 13] Kafka streams 내부 코드의 흐름

Kafka Streams는 3단계의 흐름을 가집니다. 먼저, Source Processor는 Kafka Streams 애플리케이션으로 정보를 흘려보내는 곳입니다. Kafka topic으로부터 데이터를 읽고 하나 이상의 Stream Processor로 전송합니다. 두 번째는 Stream Processor로 데이터 처리와 변환 로직의 적용을 담당하고 있습니다. Streams DSL에서 이 프로세서는 Kafka Streams 라이브러리가 노출한 내장 연산자들을 이용하여 정의합니다. 연산자의 예로 filter, map, join 등이 있습니다. 마지막으로 Sink Processor는 보강, 변환, 필터링 등으로 처리한 데이터를 카프카로 다시 내보내는 역할을 합니다. 이 데이터는 다른 스트림 처리 애플리케이션에서 처리되거나 Kafka Connect를 이용하여 DB에 저장할 수도 있습니다. 이러한 전체적인 흐름을 topology라고 부릅니다.

[그림 14] Sub topology

Kafka Streams의 topology는 그림과 같이 Stream Processor에서 분기 처리가 가능하며 1개의 Source Processor에서 N 개의 결과가 나올 수 있습니다. 이렇게 분기된 topology를 sub-topology라고 부릅니다.

[그림 15] kafka streams의 단일 처리 규칙

Kafka Streams에 들어온 레코드는 단일 이벤트 처리 규칙에 의해 하나의 topology에서는 하나의 레코드만 처리합니다. 그림에서 흰색 레코드가 topology 내에서 스트림 플로우를 따르고 있을 때, 노란색 레코드는 단일 이벤트 처리 규칙에 의해 처리가 불가능한 것을 볼 수 있습니다.

[그림 16] sub topology에서 복사 되는 레코드

하지만 sub-topology는 단일 이벤트 처리 규칙에 예외를 두어 분기 처리 시 레코드가 복제되며 각 sub-topology에서 처리됩니다. 이를 통해 동일한 레코드가 여러 처리 경로로 전달될 수 있습니다.

간단한 예제 코드

단어 개수를 세는 기능을 제공하는 Streams DSL을 사용한 간단한 예제입니다.

[그림 17] kafka streams 예제 코드

설정 부분은 애플리케이션 아이디(토폴로지의 이름), Kafka 서버의 호스트 주소, 그리고 serde(저장된 데이터의 직렬화, 역직렬화 구현체)에 대한 설정입니다. Source processor에서는 streams-plaintext-input 토픽에서 데이터를 가져오겠다고 선언해 줍니다. Stream processor에서는 가져온 데이터를 소문자 처리하고 split 처리한 후 각 단어를 group by하여 집계합니다. 그 후 counts-store 상태 저장소를 이용하여 데이터를 저장하고 개수를 구해줍니다. 레코드 데이터의 변환과 집계 과정이 끝난 후 Sink processor에서 streams-wordcount-output 토픽에 해당 결과를 발행합니다.

상태 저장 주기 설정

Kafka Streams는 애플리케이션이 상태를 변경할 때마다 커밋하는 것이 아니라, 설정된 간격마다 한 번에 여러 번의 상태 변경 내용을 커밋합니다. 예를 들어, 설정된 커밋 주기에 따라 상태 변경이 커밋 되기 때문에 중복 IP로 접속 패턴이 감지될 때 슬랙으로 알람 메시지에 ‘중복 접속 수’ 정보가 포함되어 발송됩니다. 만약 커밋 주기가 길게 설정되어 있다면, 순차적인 숫자가 아닌 1, 2, 4, 7과 같이 불규칙하게 증감하는 수로 알람이 발생할 수 있습니다. 따라서 실시간으로 후처리를 받고 싶다면 커밋 주기를 짧게 설정해야 합니다.

commit.interval.ms는 Kafka Streams 애플리케이션에서 상태 저장을 주기적으로 커밋 하는 주기를 지정하는 설정입니다. 이 설정은 밀리 초 단위로 지정되며 기본 값은 30000ms입니다. 짧은 커밋 간격으로 설정하면 더 높은 안정성을 제공하지만 커밋에 소요되는 시간과 리소스가 증가할 수 있습니다. 반면에 긴 커밋 간격은 상태 저장에 필요한 리소스를 줄일 수 있지만, 장애 발생 시 데이터 손실의 위험이 있을 수 있습니다.

public class ExampleTopology {

public static void main(String[] args) {
// Kafka Streams 설정
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "commit-interval-example");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
// 커밋 주기 설정
props.put(StreamsConfig.COMMIT_INTERVAL_MS_CONFIG, "30000");

// topology 생성
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> inputStream = builder.stream("input-topic");
KStream<String, String> processedStream
= inputStream.mapValues(value -> value.toUpperCase());

processedStream.to("output-topic",
Produced.with(org.apache.kafka.common.serialization.Serdes.String(),
org.apache.kafka.common.serialization.Serdes.String()));

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();
}
}

테스트 코드

Kafka streams는 TopologyTestDriver 사용하여 테스트 코드 작성이 가능합니다.

테스트를 위한 Topology 생성 예제 코드

public class WordCountTopology {
public static void buildTopology(StreamsBuilder builder) {
KStream<String, String> source = builder.stream("input-topic");
KTable<String, Long> wordCounts = source
.flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
.groupBy((key, word) -> word)
.count(Materialized.as("counts"));

wordCounts.toStream()
.to("output-topic",
Produced.with(wordCounts.keySerde(), wordCounts.valueSerde()));
}
}

테스트 코드 샘플

public class WordCountTopologyTest {
private TopologyTestDriver testDriver;
private TestInputTopic<String, String> inputTopic;
private TestOutputTopic<String, Long> outputTopic;

@BeforeEach
public void setup() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "word-count-test");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

StreamsBuilder builder = new StreamsBuilder();
WordCountTopology.buildTopology(builder);
Topology topology = builder.build();

testDriver = new TopologyTestDriver(topology, props);

inputTopic = testDriver.createInputTopic("input-topic", Serdes.String().serializer(), Serdes.String().serializer());
outputTopic = testDriver.createOutputTopic("output-topic", Serdes.String().deserializer(), Serdes.Long().deserializer());
}

@AfterEach
public void tearDown() {
testDriver.close();
}

@Test
public void testWordCount() {
inputTopic.pipeInput("key", "Hello world");
inputTopic.pipeInput("key", "Hello Kafka Streams");
inputTopic.pipeInput("key", "Goodbye world");

assertEquals(2L, outputTopic.readKeyValuesToList().get(0).value());
assertEquals(1L, outputTopic.readKeyValuesToList().get(1).value());
}
}

해당 테스트 코드는 input-topic에 메시지를 입력하고, Topology가 처리한 결과를 output-topic에서 읽어와서 예상한 결과와 비교합니다. 이를 통해 Kafka Streams 애플리케이션의 동작을 테스트할 수 있습니다.

이상 로그인 감지 시스템에 적용하기

[그림 18] 간략한 이상 로그인 감지 시스템 구조

이상 로그인 감지 시스템은 로그인 시 발생하는 로그성 정보(성공, 실패 등)를 Kafka를 이용하여 후처리로 데이터베이스에 저장하고 있습니다. 이 과정에서 Kafka 브로커로 보내진 메시지의 토픽 변경을 감지하기 위해 Kafka Streams의 Consumer 기능을 활용하고 있습니다. Kafka Streams에서는 이러한 토픽 변경을 감지하여 패턴 분석 및 후처리를 위한 데이터 가공 작업을 수행하고, 결과를 이상 로그인 감지 토픽에 발행합니다. 이에 대한 후속 조치는 Consumer가 이상 로그인 감지 토픽을 구독하면서 슬랙 메시지 발행, 이력 저장 등을 역할을 수행하고 있습니다.

[그림 19] 간단한 패턴 분석 및 처리 흐름

위 그림은 간략히 표현한 이상 로그인 감지 시스템의 데이터 처리 흐름입니다. Source processor에서는 로그인 성공과 실패 토픽의 변경을 감지합니다. Filter를 통해 조건에 따라 sub-topology로 분기됩니다. 각 분기된 Stream Processor에서는 감지할 케이스에 따라 데이터를 일차적으로 집계하기 위해 key로 그룹화하고, 감지 기간 동안의 데이터를 분석합니다. 그 후, 감지 조건에 부합하는 데이터는 Sink Processor로 보내져 이상 로그인 감지 토픽에 발행할 수 있도록 데이터를 가공합니다. 이후 구독 중인 Consumer에서는 이상 로그인 감지 토픽의 변화를 감지하고, 해당 정보를 기반으로 슬랙 메시지 발송 등의 후속 처리를 진행합니다.

이슈와 해결 방안

다음으로는 Kafka Streams를 사용하면서 겪었던 이슈와 그에 대한 해결 방법을 소개하겠습니다.

⛔️ JDK 이슈

로컬 환경에서 개발을 완료한 후 개발서버에 배포할 때 RocksDB가 실행되지 않고 서버가 기동되지 않는 문제에 직면했습니다. 원인 분석 결과 사용 중인 JDK에 이슈가 있음을 발견했습니다. RocksDB는 C++로 개발되어 JNI를 통해 실행되며, 이때 사용되는 JDK의 C 라이브러리는 musl libc와 호환되지 않고 glibc를 사용해야 합니다. 당시 사용한 JDK는 openjdk의 alpine 버전으로 경량화된 버전이었는데, 이는 musl libc를 사용하는 버전이었습니다. 해당 이슈를 해결하기 위해 경량화되지 않은 JDK를 사용하도록 변경했고, 이로써 문제가 해결되었습니다.

리파티셔닝(Repartitioning) 이슈

리파티셔닝은 두 개 이상의 토픽을 조인하거나 레코드를 집계하는 과정에서 map()과 같이 새로운 키를 사용해야 하는 경우에 발생합니다. 이때 키가 변경되면서 리파티셔닝이 발생합니다. 리파티셔닝을 처리하기 위해 repartition으로 끝나는 토픽이 각 application.id에 따라 자동으로 생성됩니다. 리파티셔닝이 발생하면 해당 토픽에 변경된 키와 레코드 정보를 저장하고 사용하게 됩니다. 따라서 빈번한 리파티셔닝은 네트워크 및 디스크 I/O 측면에서 비용이 많이 드는 작업입니다. 리파티셔닝의 주요한 원인인 키를 변경하는 map, transform, flatMap, groupBy 연산은 지양하고, 대신 mapValues, transformValues, flatMapValues, groupByKey와 같은 연산을 사용하는 것이 좋습니다.

⛔️ 과도한 메모리 사용

어느 날 슬랙으로부터 Kafka Streams 서버의 Pod 메모리 사용률이 90%에 도달했다는 알람을 받았습니다.

[그림 20] streams 메모리 사용률 알림

해당 Pod가 실행된 이후부터 메모리 사용률이 선형적으로 증가하는 것을 확인할 수 있었습니다.

[그림 21] 계속 증가하는 kafka streams 메모리 사용률

특히 전날의 이벤트로 인해 평시 대비 40% 더 많은 로그인이 발생하면서 상태 저장소의 사용률도 함께 증가하였으며, 이로 인해 메모리 사용률이 증가하는 경향을 보였습니다. 메모리를 추가하는 것은 쉬운 해결책일 수 있지만, 이는 근본적인 문제 해결책이 아닙니다. 이 문제를 해결하기 위해 RocksDB의 Metric 정보를 수집하여 모니터링하는 과정과 Window 보존 시간 수정, RocksDB가 사용하는 메모리를 제한하는 조치에 대해 설명드리겠습니다.

1) JMX를 이용한 RocksDB 메모리 모니터링
Kafka Streams는 JVM 환경에서 실행되는 만큼 기존의 JVM Metric을 수집하여 모니터링하는 여러 apm을 통해 모니터링이 가능합니다. 하지만 RocksDB는 JNI를 통해 실행되므로 native method stack 영역을 사용합니다. 이를 모니터링하기 위해서는 JMX(Java Management eXtensions)를 이용하여 Metric 정보를 수집, Promethus와 Grafna를 통해 RocksDB의 Metric 정보 모니터링이 필요합니다. JMX는 jmx_export를 사용, Kafka Streams에서 topology 설정에 metrics.recording.level을 추가해 주시면 됩니다. Grafna에서 Dashboard를 만들기 어려우시면 Kafka Streams Dashboard 템플릿을 활용하시는 걸 추천드립니다. Metric 정보에 대한 설명은 Confluent에서 제공하는 Kafka Strams Monitor Document를 참고 부탁드립니다.

[그림 22] Grafna에서 RocksDB Metric 정보를 활용한 그래프

2) Window 보존 시간 수정
Kafka Streams에서 상태 저장소의 오래된 레코드는 지정된 윈도우 보존 기간이 지나면 삭제됩니다. 문제는 기본값이 하루(24시간)이었는데, 패턴 분석에 사용되는 각 topology의 윈도우 기준 시간 대비 보존 시간이 길어 불필요한 메모리를 사용하고 있었습니다. 이를 해결하기 위해 Materialized#withRetention() 메서드를 이용하여 윈도우 기준 시간에 5분을 추가한 값을 보존 시간으로 설정해 주었습니다.

KTable<Windowed<String>, LoginSuccessDuplicationLoginCountDTO> ipAddressTable =
sourceStream.filter((key, value) -> StringUtils.isNotBlank(value.getPayload().getLoginIp()))
.mapValues(LoginSuccessDuplicationLoginCountDTO::new)
.groupBy((key, value) -> value.getLoginSuccessStreamDTO().getPayload().getLoginIp(),
Grouped.with(Serdes.String(), AbnormalLoginSerdes.loginSuccessDuplicationLoginCountDTOSerde()))
.windowedBy(TimeWindows.ofSizeAndGrace(Duration.ofMinutes(super.getWindowTime()), Duration.ZERO))
.reduce(LoginSuccessDuplicationLoginCountDTO::count,
Materialized.<String, LoginSuccessDuplicationLoginCountDTO, WindowStore<Bytes, byte[]>>as("ip")
.withKeySerde(Serdes.String()).withValueSerde(AbnormalLoginSerdes.loginSuccessDuplicationLoginCountDTOSerde())
// window 보존 시간 설정
.withRetention(Duration.ofMinutes(super.getWindowTime() + 5)));

3) RocksDB의 메모리 사용률 제한
window 보존 시간 수정을 통해 메모리 사용률을 줄일 수 있었지만, 로그인이 증가하면 언제 Out of Memory가 발생할지 모르는 위험이 있었습니다. 이에 RocksDB의 메모리 사용률을 제한하여 위험을 피하고자 하였습니다. 패턴 분석 시 만들어지는 집계 KTable 데이터는 직전 데이터만 존재하면 되므로 이전에 사용된 집계 KTable은 고려 대상이 아니었습니다. 설정 방법은 application property 파일에 spring.kafka.properties.rocksdb.retention.bytes 옵션을 추가해 주면 됩니다. 단위는 byte이며, 설정할 수치를 구하는 방법은 서버의 가용 가능한 메모리에서 Java heap 메모리를 빼는 것입니다.

마치며

2023년 7월의 어느 날, 평시 대비 300배에 가까운 로그인 시도가 있었습니다.
이상 로그인 감지 시스템 알람이 쉴 새 없이 울리며 이슈가 발생하였음을 알렸습니다. 공격자는 다수의 IP를 동원하여 몇일 걸쳐 크리덴셜 스터핑 공격을 진행하였습니다. 하지만 이상 로그인 감지 이력 정보를 통하여 정보를 분석, 여러 협력팀과 함께 공격을 막아내는 성과를 이루었습니다.

[그림 23] 크리덴셜 스터핑으로 인해 치솟은 로그인 실패 그래프
[그림 24] 크리덴셜 스터핑으로 인해 치솟은 이상 로그인 감지 그래프

다른 사례는 명의도용 보호 사례입니다. 이상 로그인 감지 시스템을 통해 동일한 패턴으로 생성된 아이디의 접근이 확인되었습니다. 해당 계정들을 분석해 보니 명의도용 의심 사례로 확인되었으며 계정 보호 조치 등을 통해 고객의 피해를 막을 수 있었습니다.

[그림 25] 패턴이 비슷한 계정 감지

이상 로그인 감지 시스템을 도입한 결과, 크리덴셜 스터핑, 어뷰징, 명의도용과 같은 비정상적인 접근을 사전에 감지하고 빠르게 대응하여 무신사를 더 안전하게 만들 수 있었습니다.

신규 기술 도입에 대한 부담감이 있었지만 주변 팀원들의 지지와 조언 덕분에 시스템을 성공적으로 개발할 수 있었습니다. 도움을 주신 팀원들에게 진심으로 감사드립니다.

Musinsa CAREER

함께할 동료를 찾습니다.

이처럼 무신사는 매년 빠르게 성장하며 새로운 문제를 마주하고, 문제를 해결하기 위해 새로운 기술을 적극적으로 도입하고 있습니다.
전국민이 사용하는 1위 패션 플랫폼 무신사에서 기술로 비즈니스를 성장시키는 경험을 함께하고 싶으시다면 아래 채용 페이지를 통해 지원해 주세요!

🚀 무신사 채용 페이지 : https://corp.musinsa.com/ko/career

Refernce

--

--

Heewon Chae
MUSINSA tech

무신사 회원개발팀에서 백엔드 엔지니어를 맡고 있습니다.