HaJin Jo
IHFB  R&D 팀블로그
17 min readFeb 1, 2024

--

ksqlDB를 이용해 실시간 이벤트 스트리밍 최적화하기

사진: UnsplashT K

들어가며

안녕하세요, 밀당 데이터 엔지니어 조하진입니다. 최근 밀당 데이터팀에서는 실시간 학습현황 대시보드를 위해 Kafka를 활용해 데이터 파이프라인을 구축하는 작업을 진행했습니다. 이 과정에서 단시간내의 많은 중복 이벤트로 인해 DB 부하를 발생했는데요, 이 부분을 ksqlDB를 도입해 해결해나가고 있습니다. 오늘 글에서는 ksqlDB 소개 및 ksql 쿼리로 중복 이벤트를 제거한 사례에 대해 공유드리려고 합니다.

Kafka란?

ksqlDB에 대해 설명하기전 먼저 Kafka에 대해 간략히 알아보겠습니다. Kafka는 실시간 데이터 스트리밍을 위한 분산형 이벤트 스트리밍 플랫폼입니다. 다양한 소스에서 데이터를 메세지 형태로 받아 Topic(큐)에 저장하고, Topic에 저장된 메세지를 읽어서 원하는 작업을 처리 할 수 있습니다. Topic으로 메세지를 보내는 역할을 Producer, Topic에서 메세지를 읽는 역할을 Consumer 라고 합니다. 유저의 실시간 이벤트를 메세지로 발송하여 Topic에 쌓으면, Topic의 메세지를 소비하며 학습현황을 업데이트하는 방식으로 kafka를 사용하고 있습니다.

ksqlDB란?

ksqlDB는 Apache Kafka를 기반으로 하는 스트림 처리 데이터베이스입니다. ksqlDB는 Kafka 플랫폼 위에서 동작하며, 실시간 데이터 스트림을 마치 데이터베이스처럼 사용 할 수 있도록 SQL과 유사한 쿼리언어를 지원합니다. SQL을 사용하여 스트림, 테이블로 데이터를 모델링하고, 다양한 SQL 구성(join, 집계, 변환, 필터링, 윈도우 설정 등)을 적용해 코드를 건드리지 않고 데이터를 조작 할 수 있게 해줍니다. 또한 ksqlDB를 외부 데이터 저장소와 통합할 수 있도록 커넥터를 정의하여, 다양한 데이터 소스 및 싱크에서 쉽게 읽고 쓸 수 있습니다. 테이블 및 스트림과 커넥터를 결합해 end-to-end streaming ETL 파이프라인을 생성 할 수 있습니다.

ksqlDB의 특징

스트림 처리

  • 스트림 처리를 통해 기존 토픽의 이벤트에서 새로운 스트림을 만들어내고 새로운 정보를 도출해낼 수 있습니다. 스트림 데이터를 마치 메모리 내 컬렉션처럼 다룰 수 있게 추상화하여 보다 직관적인 방식으로 실시간 데이터를 다룰 수 있게 해줍니다.

전통적인 데이터베이스와의 차이

  • 전통적인 데이터베이스는 쿼리가 매번 전체 결과를 반환하는 방식으로 동작하지만, 스트림 처리에서는 새로운 데이터가 있을 때마다 지속적으로 스트림과 테이블을 쿼리하여 현재상태를 업데이트합니다.

완전한 실시간 애플리케이션

  • 실제 애플리케이션을 구축하기 위해서는 스트림 처리 단일 기능만으로는 부족합니다. 앞뒤로 여러 복잡한 시스템을 통합하고, 시스템 별로 다른 보안, 확장성, 모니터링을 각각 관리해야 해서 유지보수에 많은 리소스가 들어갑니다.
출처 : https://ksqldb.io/overview.html
  • ksqlDB는 이런 문제들을 해결하고 스트림 처리 애플리케이션을 위해 특별히 제작된 데이터베이스입니다. ksqlDB 내부에서 스트림 처리를 하고, 커넥터를 통해 외부 데이터 저장소와 연결도 가능하며, SQL 쿼리로 스트림(Push query)과 현재 상태(Pull query)에 대한 결과 값을 반환받을 수 있습니다. 이렇듯 ksqlDB를 사용해 통합된 시스템으로 간단하게 실시간 애플리케이션을 구축할 수 있습니다.
출처 : https://ksqldb.io/overview.html

ksql 쿼리

ksql의 쿼리는 전통적인 데이터베이스처럼 현재 스냅샷에 대한 쿼리(Pull query), 그리고 이벤트 스트림을 읽기 위해서 새로운 데이터가 들어올 때 마다 결과를 내보내는 연속 쿼리(Push query) 두가지 종류를 지원합니다.

Pull queries

기존 RDBMS 같이 현재 저장된 데이터에 대해 즉시 실행되는 일회성 쿼리입니다. 이 쿼리는 materialized view(구체화 된 뷰), 현재 시점의 가장 최신데이터)나 테이블에서 특정 데이터를 조회할 때 사용되며 일회성이기 때문에 상태 변경 쿼리는 지원하지 않습니다. 기존 쿼리문처럼(SELECT * FROM table) 사용하면 지금 시점의 결과값이 반환됩니다.

Push queries

연속된 스트림을 반환하는 쿼리를 push query라고 합니다. 쿼리 실행 시 단순히 하나의 결과값을 반환하는게 아닌 새로운 데이터가 들어올 때마다 결과를 내보내게 됩니다. 스트림을 실시간으로 조회하기 위해서 쿼리문 마지막에 EMIT CHANGES라는 구문을 붙여줘야 합니다. 이 부분에 대해서는 아래에 나올 window 집계 쿼리 부분에서 추가적으로 설명드리겠습니다.

Stream & Table

스트림은 토픽으로 들어오는 연속적인 이벤트 기록을 모델링합니다. 토픽의 데이터를 읽기위해 구조화된 일종의 뷰(view)라고 생각할 수 있습니다. 스트림 자체는 데이터를 저장하지 않고 토픽에서 발생하는 값을 지속적으로 스트리밍합니다.

반면, 테이블은 이러한 실시간 스트림을 읽어 각각의 고유한 키에 대한 최신 상태를 가지게 됩니다. 테이블을 사용하면 join 연산과 집계 연산을 할 수 있습니다. 아래 표는 이벤트 시퀀스를 스트림과 테이블로 표현 했을 때의 차이를 나타냅니다. 스트림은 연속적인 모든 데이터를 가지고 있고, 테이블은 각 키의 최신 값만 가지고 있습니다.

Stream 생성 구문 예시

CREATE STREAM test_stream (
id INT,
title VARCHAR
) WITH (
KAFKA_TOPIC='titles',
VALUE_FORMAT='JSON',
PARTITIONS=6,
REPLICAS=2
);

스트림 생성은 기존 DDL문과 비슷하게 정의할 수 있으며, WITH 절에 추가적으로 스트림의 속성을 지정할 수 있습니다.

  • KAFKA_TOPIC : 소비할 데이터가 있는 토픽을 지정합니다.
  • VALUE_FORMAT : 소스 토픽의 데이터 직렬화 형식을 설정합니다.
  • PARTITIONS : 파티션 수를 지정합니다. 지정하지 않으면 기본 토픽의 파티션 수와 동일한 값이 설정됩니다.
  • REPLICAS : 복제본 개수를 지정합니다. 기본 1로 설정되어 있습니다.

Table 생성 구문 예시

CREATE TABLE test AS
SELECT
id, COUNT(*)
FROM test_stream
GROUP BY id;

테이블을 생성할 때는 기본 키를 지정해줘야 하며, 기본 키를 바탕으로 파티션이 할당됩니다. 위와 같이 GROUP BY 절에 id를 넣으면 id가 테이블의 기본 키가 됩니다. GROUP BY 절에 포함된 컬럼들은 Topic 전송시 메세지의 키로 사용되므로, 이들을 메세지의 value로 받으려고 한다면 AS_VALUE 함수를 사용해야 합니다.

만약 GROUP BY 절에 여러 컬럼을 포함시키려고 한다면, ‘기본 키는 하나만 가능하다’는 에러가 발생합니다. 이때 키 포맷을 JSON으로 설정해주면 여러 컬럼의 값이 JSON 형태로 직렬화되어 하나의 복합 키가 됩니다.

아래는 JSON 키와 AS_VALUE를 사용한 쿼리 예시입니다.

CREATE TABLE test WITH (FORMAT='JSON') AS
SELECT
title,
user_id,
AS_VALUE(title) "title",
AS_VALUE(user_id) "user_id",
count(*) AS "count"
FROM test_stream
GROUP BY title, user_id

위와 같이 테이블을 만들게 되면 title과 user_id가 합쳐져서 JSON 형식의 키가 되고, AS_VALUE 함수를 사용하여 나중에 Topic 메세지에서 title과 user_id를 value로 읽을 수 있게 됩니다.

Window 집계

스트림이나 테이블에서 시간경계를 window라고 하며 이 window를 이용해 집계작업을 할 수 있습니다. window는 시작시간과 종료시간이 있으며 window 함수 사용해서 테이블을 생성하면 해당 테이블에서 WINDOWSATRT, WINDOWEND 라는 컬럼명으로 window의 시작, 종료 시간을 쿼리 할 수 있습니다.

window를 정의하는 방법에는 세가지가 존재합니다.

  • Tumbling : 시간 간격을 기반으로, 고정된 크기의 윈도우가 겹치지 않고 이동하는 방식입니다. 윈도우가 겹치지 않기 때문에 윈도우 안에서 레코드는 하나만 남게 됩니다.
  • Hopping : 시간 간격을 기반으로, 고정된 크기의 윈도우를 사용하며 윈도우가 겹칠 수 있습니다. hop이라는 간격으로 이전 윈도우에 비해 얼마나 전진할건지 진행 간격을 설정합니다. 예를 들어 윈도우 크기가 5분이고 진행간격이 1분이라면 이전 윈도우와 4분씩 겹치면서 전진합니다. 겹치는 구간안에 특정 레코드가 여러번 포함 될 수 있습니다.
  • Session : 데이터 기반으로, 동적으로 크기가 조정됩니다. 레코드를 세션으로 통합하고 레코드가 존재하지 않는 비활성 공백의 크기를 설정합니다. 비활성 공백이 5초인 경우 이전 레코드에서 5초 이내로 동일한 키로 타임스탬프가 있는 각 레코드는 동일한 윈도우에 포함됩니다. 만약 새 레코드가 5초 이후에 들어온다면 그 시점으로 새로운 윈도우가 생성됩니다. 그렇기 때문에 윈도우 크기가 가변적이게 됩니다.

Push queries 부분에서 EMIT구문을 마지막에 사용해야 한다고 했던 부분 기억 나시나요? EMIT구문은 EMIT CHANGESEMIT FINAL두가지가 존재하며, 용도 별로 구분지어 사용해야 합니다.

EMIT CHANGES

  • EMIT CHANGES는 스트림 쿼리의 기본 동작으로, 결과가 변경될 때마다 실시간으로 결과를 내보냅니다.
  • 방금 알아본 window 기반 집계 쿼리에서 EMIT CHANGES를 사용하면 동일 아이디로 토픽에 새로운 값이 들어오면 새롭게 업데이트된 집계 결과가 지속적으로 반환됩니다.
  • 데이터의 각 변화를 지속적으로 추적하는 용도입니다.

EMIT FINAL

  • EMIT FINAL은 윈도우 기반의 집계 쿼리에서만 사용되며, 윈도우가 종료될 때 최종 결과만을 내보냅니다.
  • 집계 작업에서 중간 결과를 무시하고 최종 결과만 필요할 때는 EMIT FINAL을 사용해야 합니다.

window 쿼리 유즈케이스

신용카드 거래 서비스 회사에서 악의적인 고객 활동을 탐지하는 예시입니다. 짧은 시간내에 수많은 거래가 발생하는 경우 카드 소지자에게 의심스러운 활동을 알리기 위해 ksql window 집계 쿼리를 활용 할 수 있습니다. 거래 정보 트랜잭션 스트림에서 짧은 기간내에 여러번 거래된 건수를 파악하기 위해 window tumbling을 사용한 테이블을 생성합니다.

CREATE TABLE possible_anomalies WITH (
kafka_topic = 'possible_anomalies',
VALUE_AVRO_SCHEMA_FULL_NAME = 'io.ksqldb.tutorial.PossibleAnomaly'
) AS
SELECT card_number AS `card_number_key`,
as_value(card_number) AS `card_number`,
latest_by_offset(email_address) AS `email_address`,
count(*) AS `n_attempts`,
sum(amount) AS `total_amount`,
collect_list(tx_id) AS `tx_ids`,
WINDOWSTART as `start_boundary`,
WINDOWEND as `end_boundary`
FROM transactions
WINDOW TUMBLING (SIZE 30 SECONDS)
GROUP BY card_number
HAVING count(*) >= 3
EMIT CHANGES;
  • 30초 윈도우 사이즈 안에서 각 신용카드 번호 별로 3번 이상의 이벤트(거래)가 발생한 경우의 거래정보를 반환하는 쿼리입니다.
  • WINDOWSTARTWINDOWEND를 넣어서 거래가 발생한 시점의 타임스탬프를 함께 기록했습니다.
  • EMIT CHANGE를 사용했기 때문에 윈도우 사이즈인 30초 안에 이벤트가 3개를 넘어 4개, 5개 계속 발생한다면 그때마다 새로운 행이 테이블에 업데이트 될 것입니다.
  • 이처럼 이상 거래 이벤트를 탐지해서 고객에게 미리 알람을 줄 수 있습니다.

밀당에서 ksqlDB를 도입한 사례

배경

앞서 말했듯, 실시간 학습현황을 업데이트 하는 부분에서 너무 많은 중복 이벤트로 인해 데이터베이스에 부하가 생겼습니다.먼저 중복 이벤트가 발생한 배경을 설명하겠습니다. 우리가 업데이트 해야 하는 학습현황에는 학습시간이라는 항목이 있습니다. 학생이 학습을 중지하고 이탈하는 시점을 알아야 그 시점으로 학습을 위해 머문시간을 계산할 수 있습니다. 하지만 이탈시점을 파악하는 것은 어려운 문제이기 때문에, 학생이 문제를 계속 풀고 있는지 확인하는 이벤트를 사용했습니다. 하지만 이 이벤트가 주기적으로 생기다보니 단기간에 중복 이벤트가 너무 많이 발생해 부하를 일으키는 원인이 되었습니다.

해결방안

해당 이벤트는 학습 이탈 시점을 파악하기 위한 목적으로 받았기 때문에, 주기적으로 들어오는 모든 이벤트를 처리할 필요가 없었습니다. 그래서 ksqlDB의 window 집계 쿼리를 사용해 중복 이벤트 제거하기로 했습니다.

Confluent ksqlDB 사용방법

Confluent cloud의 kafka를 사용하고 있었기 때문에 ksqlDB도 confluent 환경에 구축했습니다. confluent에서는 GUI를 지원하기 때문에 편하게 스트림과 테이블을 생성하고, 쿼리로 바로 실행시켜 결과를 볼 수 있습니다.

ksqlDB 클러스터 생성

먼저 원하는 ksqlDB 사용을 원하는 클러스터로 가서 ksqlDB 항목의 Add cluster를 눌러 ksqlDB의 클러스터를 생성해줍니다. Status가 provisioning에서 Up으로 바뀌면 클러스터를 사용할 준비가 되었다는 뜻입니다. 준비가 된 클러스터를 눌러 들어가줍니다.

Stream 생성

토픽의 값을 바로 집계 쿼리를 사용한 테이블로 만드는 것은 불가하므로 먼저 stream을 생성해줘야 합니다. 토픽으로 들어오는 데이터를 구조와하여 정의해주고 Editor에 넣어 쿼리를 실행시키면 stream이 생성됩니다.

Table 생성

이제 stream을 기반으로 window 집계를 하는 table을 생성할 차례입니다. id로 GROUP BY를 했으므로 id가 메세지 key가 되는데, 우리는 컨슈머에서 value로 받아 처리하길 원하기 때문에 AS_VALUE를 사용해 id가 메세지 value로도 들어 갈 수 있게 해줍니다.

window 집계는 window tumbling을 사용했습니다. 프로덕션 환경은 이벤트가 많아 비활성 공백이 거의 존재하지 않아서 session 쿼리를 제외했고, 구간별 유니크한 이벤트를 얻기 위한 작업이기 때문에 겹치는 구간 또한 필요하지 않아서 hopping 쿼리도 제외했습니다. 겹치는 구간 없이 정해진 window size 안에서 유니크한 값을 주는 tumbling을 사용하여 이벤트의 중복을 제거했습니다. 마지막엔 EMIT FINAL문을 추가해 window가 종료될 때 최종 결과만 반환 할 수 있게합니다. 이제 작성한 쿼리를 Editor에서 실행시켜 중복이 제거된 table을 생성합니다.

컨슈머에서 토픽 설정

table이 생성되면 Tables라는 탭에서 그동안 생성한 table들을 조회 할 수 있습니다. 방금 만든 table로 들어가면 해당 테이블의 결과값을 저장하는 토픽도 같이 생성되어 있음을 알 수 있습니다. 기존에 컨슈머가 읽던 토픽을 해당 토픽으로 변경해주면 드디어 중복이 제거된 이벤트 값을 소비할 수 있게 됩니다.

효과

ksqlDB windowing 적용 전과 후로 CPU 부하가 얼마나 줄었고, 이벤트 개수가 얼마나 감소 했는지 효과를 알아볼 차례입니다. 카프카 모니터링 도구로는 datadog을 사용하고 있어서 이를 통해 이벤트 개수의 변화를 추적해보겠습니다.

적용 전 이벤트 개수

빨간 부분이 앞단의 토픽이고 초록 부분이 뒷단의 토픽입니다. 앞단의 토픽에서 값을 받아 windowing으로 중복을 처리하고 뒤의 토픽 보내는 로직으로 이루어져 있습니다. 그래서 두 토픽의 이벤트 개수 차이를 비교해보면 중복 처리가 얼마나 되었는지 알 수 있습니다. Sum 부분이 해당 기간 안에 보내진 이벤트의 총 개수 입니다. 적용 전에는 4.87M 개 이벤트를 받아서 4.84M 개를 보낸 것으로 확인이 되는데, 두 토픽간의 개수 차이가 거의 나지 않는 것을 볼 수 있습니다.

적용 후 이벤트 개수

ksqlDB 적용 후 6.75M 개 이벤트를 받아서 2.18M 개를 보낸 것을 확인 할 수 있습니다. 이벤트 개수가 약 67% 정도 감소했으니 꽤나 성공적으로 중복이 처리되었음을 알 수 있습니다.

적용 전 DB CPU

DB는 AWS RDS를 사용하고 있습니다. 모니터링으로 가서 CPU의 변화를 확인해봅니다. 적용 전에는 97%로 피크를 치면서 새벽까지도 해소가 되지 않다가 새벽 6시에 줄어드는 것을 볼 수 있습니다.

적용 후 DB CPU

적용 후 피크 시간대에도 90%를 넘지 않게 되었고 새벽 2시 정도에 보다 빠르게 해소가 되는 것으로 확인했습니다.

활용가능성

이번에 밀당에서 진행한 작업은 DB 부하를 줄이는 것이 목표였기 때문에 window 집계 쿼리에 초점을 맞춰 사용했습니다. 하지만 이외에도 다양한 유즈케이스가 있습니다.

  • 사용자 행동 분석 및 맞춤형 추천 : 실시간으로 사용자 행동 데이터를 분석해 개인화된 추천 제공
  • 알림 및 모니터링 : IT 인프라의 로그 데이터를 분석해 시스템 성능을 모니터링하고 문제를 조기에 감지
  • 실시간 대시보드 : 실시간으로 집계하고 요약한 데이터를 대시보드나 보고서로 생성
  • 통합 파이프라인 : ksqlDB와 커넥터와의 연결을 통해 ETL 작업 수행, 다른 데이터 스토어로 실시간 데이터 전송 및 처리 가능

앞으로 데이터 파이프라인 작업시 이처럼 다양한 활용사례를 참고하여, 실시간 처리의 효율성을 향상시키는 기반을 마련 할 수 있을 것입니다.

마치며

지금까지 ksqlDB와 도입 사례에 대해 살펴봤습니다. 작업을 진행하며 이벤트 처리를 위해 코드 작업을 추가하지 않아도, ksqlDB에서 쿼리언어로 중복을 제거한 테이블과 토픽을 생성할 수 있어 편리함을 느꼈습니다. ksqlDB 도입 전 리서치하는 과정에서 상세한 사례가 많지 않아서 거의 공식문서를 보면서 사용법을 익혔습니다. 이 글이 보다 구체적인 ksqlDB 사용사례를 찾는 분들께 도움이 되길 바랍니다.

데이터 팀에서 동료를 찾고 있습니다!

IHFB가 최근 급성장하여 에듀테크 유니콘이 되기 위해 새로운 시스템을 개발하고 있습니다. 데이터 파이프라인 구축부터 인프라 개선 등 기존 레거시에 존재하는 다양한 문제를 해결, 개선하고 있습니다. 새로운 도전을 즐기시는 분들과 함께 하면 교육의 혁신을 더욱 빠르고 멋지게 이룰 수 있을 것 같습니다!

밀당과 함께할 동료를 찾습니다.

참고 문서

https://docs.ksqldb.io/en/latest/, Mastering Kafka Streams and ksqlDB(Mitch Seymour)

--

--

HaJin Jo
IHFB  R&D 팀블로그
0 Followers

아이헤이트플라잉버그스 데이터 엔지니어입니다.