MRT Public Data Service 개발 — 2
안녕하세요. 마이리얼트립 데이터플랫폼팀 서재권입니다. 지난 포스팅에 이어 CDC를 이용해 가져온 서비스 DB의 데이터를 Druid에 색인 및 Api로 제공하는 Public Data Service에 대해 공유해 드리겠습니다.
Public Data Service (PDS)
마이리얼트립에서 생성되고 있는 여러 출처의 데이터를 재가공해 서비스에서 필요한 다양한 실시간 및 배치성 지표를 제공하는 서비스입니다. 이러한 서비스를 제공하기 위해서는 먼저 실시간으로 생성되는 데이터를 수집(CDC)해야 하며, 이러한 데이터를 실시간으로 집계할 수 있어야 합니다. 이러한 요구 사항을 만족하기 위해 Kafka Stream, Druid를 이용하고 있습니다.
Kafka Streams
Kafka Streams는 Kafka에서 공식적으로 제공하는 스트리밍 데이터 처리용 Java 라이브러리입니다. CDC로 들어오는 데이터를 Druid에 색인 데이터 생성을 위해 사용하고 있으며, 아래와 같은 특징이 있습니다.
- 단순 Java 라이브러리로 매우 가볍습니다. 또한 다른 스트리밍 처리 프레임워크인 spark와 다르게, Kafka 이외의 외부 의존성이 없어 도입 장벽을 크게 낮출 수 있습니다. 이로 인해 특정 프레임워크(spring)에 탑재하여 추가 기능으로 사용할 수도 있고, 또는 단순 Java 프로그램으로 실행할 수도 있어 개발자의 자유도가 높습니다.
- Kafka 공식 커뮤니티에서 주도하여 개발하고 있기 때문에 Kafka 와 버전 호환성 문제가 없고, 제공하는 모든 기능을 쉽게 사용할 수 있습니다.
- Streams DSL을 지원해 간단하게 스트리밍 데이터에 transform, filter, join과 같은 연산을 적용할 수 있습니다. 아래는 마이리얼트립 PDS 서비스에 left join을 적용한 예 입니다.
4. 로컬 디버깅 환경 및 exactly once 제공
- TopologyTestDriver를 이용해 배포 전 로직 검증을 할 수 있습니다.
- 다른 라이브러리나 프레임워크와는 다르게, kafka 데이터 처리 시 exactly once를 제공해 데이터 유실 및 중복 전송 이슈로부터 자유로울 수 있습니다. 이는 현재로써 거의 유일합니다.
아래는 confluent에서 가이드하는 kafka의 데이터 처리 방식으로, kafka connect를 이용해 kafka로 데이터를 전송 및 저장하며, kafka streams를 이용해 저장된 데이터를 변경하는 프로세스를 보여주고 있습니다. 마이리얼트립 또한 동일한 방식으로 스트리밍 데이터를 처리하고 있습니다.
Druid
대규모 데이터에 대한 실시간 탐색 및 분석을 위해 설계된 Column Oriented, Distributed Data Storage로써 대용량의 이벤트 데이터를 빠르게 색인 및 서빙을 하는 OLAP 시스템으로 아래와 같은 특징이 있습니다.
Druid Storage
- Column-oriented storage
- 각 컬럼 별로 압축 방식을 달리 저장하여 저장 용량을 줄일 수 있습니다.
- 필요한 컬럼만 읽기 때문에 I/O의 부하를 줄이고 결과를 빠르게 도출할 수 있습니다.
2. Pre-aggregate 지원
- 데이터 적재 시 선택적으로 사전 집계(roll up)가 가능합니다.
- 저장되는 데이터의 총량은 크게 줄어드나, 원본 데이터는 유실됩니다. 따라서 색인 전 요구 사항에 맞춰 해당 설정의 사용 여부를 정해야 합니다.
Druid Architecture
기능에 따라 아래 3가지 유형의 노드와 외부 시스템으로 구분됩니다.
- 노드 종류
- master node : 데이터의 가용성 및 수집 관리
- query node : 사용자의 쿼리를 실행하고 결과를 반환
- data node : 데이터 수집을 수행하며 쿼리 가능한 데이터 저장 (queryable)
2. 외부 의존성
- Deep Storage : 쿼리 가능한 데이터(segment)의 저장소로 외부 클라우드 저장소(S3, GCS) 또는 HDFS를 사용할 수 있습니다.
- Metadata storage : 메타 데이터(segment, rule) 정보가 저장됩니다.
- Zookeeper : 서비스 디스커버리 및 리더 선출에 이용
Druid가 어떻게 실시간으로 데이터를 집계하는지에 대한 자세한 내용은 link를 참조하셔서 확인해보실 수 있습니다.
PDS Service Architecture
마이리얼트립에서는 Kafka streams를 이용해 Druid 색인 데이터를 생성하고 있으며, Helm과 ArgoCD를 이용해 배포 및 운영하고 있습니다.
실제 Druid 색인 데이터를 생성하고 서비스하는 프로세스는 아래와 같습니다.
- 적재 데이터 생성
- RDBMS의 특성상 CDC로 들어오는 테이블은 서비스에 필요한 모든 정보를 가지고 있지 않습니다. 이러한 이유 때문에 실제 서비스 테이블 이외 메타 데이터를 포함하고 있는 테이블을 추가로 CDC로 가져오는 작업이 필요합니다. 마이리얼트립에서는 UPS(Union Product Service) 테이블에 해당 데이터가 존재하기 때문에 서비스 테이블 이외 UPS 테이블을 추가로 전송하고 있습니다.
- join 시 lookup용으로 사용할 UPS 데이터는 항상 kafka에 존재해야 합니다. 따라서 해당 토픽의 cleanup.policy를 compact로 설정해 운영 중입니다.
- UPS 데이터는 약 140만 건 정도로 로컬 머신에 올라가기 충분할 정도의 크기 입니다. 따라서 스트리밍 join시 Ktable 대신 GlobalKTable를 이용해 left join을 하고 있습니다.
- Debezium source connector를 사용하고 있기 때문에 아래와 같이 우리가 원하는 데이터 이외의 정보가 추가되어 전달되고 있습니다. 또한 GlobalKTable 데이터는 다른 스트림과는 다르게 한번 선언하면 데이터 변경이 불가능합니다. 따라서 아래와 같이 string converter를 이용해 key를 만들고, custom transformer를 이용해 필요한 value만 추출하고 있습니다.
2. Druid 운영 및 색인
- Druid에서 제공하는 공식 helm을 이용해 운영하고 있습니다. 환경 변수에 druid_server_tier의 설정을 추가해서 아래와 같이 2개의 historical 노드 tier를 운영하고 있습니다.
- N개의 pods로 이루어진 분산 처리 시스템 특성상 모니터링은 필수입니다. Druid는 prometheus emitter를 지원합니다. 이를 바탕으로 prometheus + grafana 스택을 이용해 jmx 지표를 수집하고 있습니다. 기본적으로 Druid docker, helm은 promethues 설정이 켜져 있지 않기 때문에 다음과 같이 docker image, helm을 수정해서 사용하고 있습니다.
- kafka stream을 이용해 전송된 avro 데이터는 아래와 같은 설정을 이용해 Druid에서 색인을 하고 있습니다. 서비스 요구 사항상 특정 이벤트를 직접 조회해야 하는 경우도 존재하기 때문에 따로 roll up은 하고 있지 않습니다.
PDS 서비스 적용 후 달라진 점
이전 포스팅에서 CDC로 들어온 데이터를 big query로 전송하는 프로세스를 구축해 분석가들이 실시간으로 분석 쿼리를 수행할 수 있게 되었습니다. 하지만 big query의 데이터를 서비스에서 바로 사용하게 되면 과도한 비용이 들게 되고, 또한 latency 문제(AWS <-> GCP)로 인해 장애가 발생할 수 있습니다. 이러한 문제점 때문에 실 서비스에는 집계 정보를 내보낼 수 없었습니다. 하지만 PDS 서비스(CDC — Kafka Stream — Druid — Rest api)를 이용해 실시간으로 들어오는 후기, 예약 데이터에 대한 집계가 가능해졌고, 1차로 도시홈* 서비스에 적용할 수 있었습니다. (*도시홈 서비스 = 각 여행지를 기준으로 상품을 탐색할 수 있도록 만든 메인 화면)
또한 내부적으로 생성되는 사용자 로그에 대한 집계 또한 가능하게 되어 추후 파트너 페이지*에 판매되는 상품에 관한 정보의 집계에도 추가할 예정입니다. (*파트너페이지 = 마이리얼트립에 상품을 등록하는 판매자(파트너)가 본인의 상품과 예약을 관리할 수 있는 페이지)