MRT Public Data Service 개발 — 1

Jaegwon Seo
How we build MyRealTrip
9 min readOct 26, 2022

안녕하세요. 마이리얼트립 데이터플랫폼팀 서재권입니다.

데이터플랫폼팀은 마이리얼트립에서 발생하는 수많은 데이터를 수집 가공해 Data Warehouse를 구축하고, 실시간으로 발생하는 다양한 데이터를 이용해 서비스에서 필요한 api를 제공합니다. 배치 Data Warehouse 구축 관련 포스팅에 이어, 이번 포스팅에서는 CDC를 이용한 실시간 데이터 분석 시스템을 구축한 경험을 공유해 드립니다.

CDC?

CDC란 Change Data Capture의 약자로, 데이터 베이스(DB)의 변경된 데이터만 추출해 전송하는 시스템입니다. 과거부터 발생한 모든 이벤트 (변경사항)를 순서대로 받아, 누적하게 되면 결국 소스 DB와 동일하기 때문에, CDC는 다양한 소스 DB의 데이터를 분석 시스템으로 쉽게 전송할 수 있게 도와줍니다. 현재 마이리얼트립에서는 가장 대중적이고 널리 사용되고 있는 Kafka Connect를 이용해 CDC 플랫폼을 구축하였으며 각 커넥터는 debeziumconfluent를 이용하였습니다.

Kafka Connect

Kafka Connect는 여러 데이터 소스와 Apache Kafka를 연결해주는 매개체로써, 코딩 없이 간단한 설정만으로 DB -> Kafka, Kafka -> DB로 데이터를 전송할 수 있도록 도와주는 플랫폼입니다.

kafka connect pipeline

Kafka Connect는 아래와 같이 5개의 요소로 이루어져 있으며 다음과 같은 특징이 있습니다.

  1. worker : connector 및 task를 실행하는 프로세스
  2. connector: Kafka Connect의 추상 객체로써 kafka에서는 인터페이스만 제공하고 있습니다. debezuim, confluent와 같은 회사들은 이 인터페이스를 구현해 confluent hub를 통해 제공하고 있습니다.
  3. task: kafka에 데이터를 전송을 담당하는 실제 구현체로써 connector의 설정에 의해 동작하게 됩니다.
worker — connect — task로 이루어진 kafka connect cluster

4. converter : bytes형식으로 데이터를 저장하는 kafka와 이 데이터를 외부 시스템에서 사용하기 위해 데이터를 변경하는 과정입니다. confluent에서는 다양한 converter를 제공하고 있으며 마이리얼트립에서는 가장 대중적인 avro converter를 사용하고 있습니다.

5. transform : 각 connector에서는 데이터의 변환 없이 정해진 포맷의 데이터를 추출 및 적재하는 역할만 하기 때문에 데이터의 변경이 필요한 경우 kafka에서 제공하는 Transformer를 사용하거나, 직접 Transformation 인터페이스를 구현해 변경할 수 있습니다.

Kafka Connect Architecture

마이리얼트립에서 사용하고 있는 Kafka Connect cluster의 기본 아키텍쳐입니다.

모든 인프라는 EKS에서 동작하고 있으며, 안정적인 운영 및 리소스 최소화를 목표로 아래와 같이 작업했습니다.

  1. Helm 차트 이용

Confluent에서는 이미 kafka eco system에 대한 helm을 제공하고 있으며, 이를 마이리얼트립 요구 사항에 맞춰서 아래와 같이 수정해 사용하고 있습니다.

  • 서비스 별 클러스터 운영 : 각 서비스 간 영향을 최소화하기 위해서 커넥터 클러스터를 분리하여 운영하고 있습니다. 이미 제공되고 있는 helm을 각 커넥터 클러스터 별로 복사하는 것은 소스 유지에 비효율적이기 때문에 아래와 같이 클러스터 별 alias를 할당하고, values.yaml 파일을 이용해서 클러스터 별 설정을 관리하고 있습니다. 이렇게 서비스가 추가될 때마다 최소한의 수정으로 신규 커넥터 클러스터를 배포할 수 있습니다.
connect cluster 추가
클러스터 별 설정

2. kowl을 이용한 클러스터 관리

Confluent helm에서 제공하는 ui 툴인 Confluent Control Center은 굉장히 많은 기능을 제공하는 훌륭한 툴이지만 ,사용상의 제약이 있어 오픈소스로 제공되는 kowl을 이용해서 커넥터 클러스터를 관리하고 있습니다. kowl 또한 cloudhut에서 제공하는 helm을 사용하고 있으며, 아래와 같이 kafka, schema registry, connect의 기능을 활성화해 사용하고 있습니다.

kowl을 이용한 connect 클러스터 관리

3. Schema registry를 이용한 스키마 관리

마이리얼트립 CDC의 1차 목표는 서비스 DB와 분석용 DB를 완전히 분리하는 것이기 때문에, 실 서비스 DB의 데이터를 bigquery로 전송하는 것을 최우선으로 하고 있습니다. 이러한 이기종 간 데이터 전송 시 스키마 관리가 정확하게 되지 않으면 소스의 뒷단까지 에러가 전파될 수 있기 때문에 schema registry를 이용해 잘못된 데이터가 들어오지 않도록 관리합니다. Optianal한 값이 추가 또는 삭제되더라도 에러가 발생하지 않도록 Compatibility Type을 Full로 설정하여 schema registry 정책을 운영하고 있습니다.

4. 커넥트 클러스터 모니터링

  • jmx exporter을 통해 생성된 매트릭은 prometheus — grafana 스택을 이용해 수집을 하고 있습니다. confluent helm에서 제공하는 기본 설정이 connect의 jmx-configmap.yaml에 있기는 하지만, 간단한 정보만 제공하고 있기 때문에 link에서 제공하는 자료를 참고해 모니터링 스택을 구축했습니다.
  • Kafka Connect 특성상 커넥터에서 N개의 task를 생성해 데이터를 전송하게 됩니다. 하지만 각 task의 실패가 커넥터로 전파되지 않기 때문에, 커넥터 상태는 정상이지만, 하위 task의 상태가 failed인 경우가 종종 발생하게 됩니다. 이러한 이슈 때문에 에러 발생을 파악하기 위해서는 커넥터와 task의 상태를 모두 모니터링해야 합니다.
connect — running, task — fail 상태
  • 커넥터에 장애가 발생했다면 자동으로 재시작되지 않기 때문에 주기적으로 rest api를 이용해 커넥터 클러스터의 상태를 체크하고 다시 작동시키고 있습니다.

5. Fargate를 이용한 serverless 운영

  • AWS 에서 제공해주는 EKS 환경은 EC2와 동일하게 직접 서버를 관리해주어야 합니다. (AMI, 노드 수, ..) 이러한 단순 작업 또한 운영 리소스가 들어가기 때문에 fargate를 이용해 CDC 플랫폼을 배포 및 운영 하고 있습니다. fargate는 아래와 같이 별도의 소스 수정 없이 pods의 label만 변경해 사용 중입니다.

Connector 설정

모든 커넥터 설정은 debezium, confluent 문서를 참고해서 생성하고 있으며, 마이리얼트립에서 사용한 커넥터 별 설정은 아래와 같습니다.

1. postgresql

1–1) logical decoding output plugin

  • wal2json : toast table에 저장된 컬럼 값이 업데이트 된 경우 해당 컬럼의 스키마가 빠진 상태로 전송되는 이슈가 있습니다. 이러한 이슈 때문에 schema registry에서 스키마 검증에 실패 하게 되고 전송 실패로 이어질 수 있기 때문에 안정적인 운영이 어렵습니다.
  • decoderbufs : debezuim에서 제공하는 plugin이지만 aws RDS에는 기본적으로 설치가 되어 있지 않아 사용이 어렵습니다.
  • pgoutput : wal2json에서 발생하는 toast table과 동일하게 컬럼의 실제 값은 읽어 오지 못하지만 스키마와 __debezium_unavailable_value 의 형식으로 데이터를 전송하기 때문에 전송 실패는 일어나지 않습니다. 그래서 마이리얼트립에서는 pgoutput 플러그인을 선택했습니다.

1–2) heart beat

  • postgresql logical replication 사용 시 wal disk space consumption 문제를 해결하기 위해 주기적으로 heartbeat.action.query를 수행하고 있습니다. 만약 heartbeat.action.query가 정상적으로 수행되지 않을 시, 아래와 같이 pg_old_repl_slot이 점점 쌓이게 되고, 결국 disk full 에러가 발생하게 됩니다.
  • 1–3) publication
  • postgresql connect를 실행하게 되면 자동으로 debezium이라는 이름의 publication을 생성해줍니다. 하지만 특정 테이블(primary key가 없는 테이블)의 경우 테이블 설정을 ‘REPLICA IDENTITY’로 변경해주어야 합니다. 이러한 이슈를 방지하기 위해 모든 테이블이 아닌 타겟 테이블 대상으로 publication을 생성해 연결하고 있습니다.

2. Bigquery

  • AWS → GCP로 데이터를 전송하면 클러스터 상태 또는 각 메세지의 크기에 따라 타임아웃이 나는 경우가 있습니다. 이러한 장애를 방지하기 위해 아래와 같이 kafka consumer의 설정을 override해 각 커넥터 별 설정을 지정하고 있습니다.

CDC 적용후 달라진점

CDC를 적용하기 전 마이리얼트립에서는 airflow를 이용한 일 단위 배치 작업을 통해 데이터를 bigquery로 전송하였고, 이로 인해 D-1일 시점의 데이터까지만 분석이 가능하였습니다. 하지만 CDC를 적용한 이후에는 실시간으로 서비스 지표를 확인할 수 있기 때문에 ‘방금 예약이 발생한 상품은 무엇인지’, ‘방금 후기가 등록된 상품은 무엇인지’ 등에 대한 실시간 정보도 고객에게 전달할 수 있게 되었습니다.

이외에도 회원, 파트너, 항공, 호텔, 결제 등 마이리얼트립 내 다양한 서비스에서 CDC를 통해 가져온 데이터를 활용하고 있습니다. 다음 포스팅에서는 어떻게 이를 구현했는지 소개해 드리고자 합니다. 감사합니다.

--

--