카프카 커넥트 적용기

권태진
SSG TECH BLOG

--

안녕하세요. SSG 공통서비스개발팀 권태진입니다. “What’s on my desk”에 이어 이번에는 카프카 커넥트 적용기라는 제목으로 글을 쓰게 되었습니다. 해당 글을 통해 최근 팀에서 카프카 커넥트를 이용, postgresql의 데이터 동기화를 진행했던 사례를 여러분과 공유하고자 합니다. 카프카 커넥트 적용을 고려하고 있는 분들에게 작게나마 도움이 되는 글 이였으면 좋겠습니다.

이 글은 다음의 순서로 진행됩니다.

  1. 카프카 커넥트란?
  2. 왜 카프카 커넥트인가?
  3. 카프카 커넥트 적용
  4. 주의사항

1) 카프카 커넥트란

먼저 카프카 커넥트가 무엇인지 알아보겠습니다. 공식사이트에서는 카프카 커넥트를 각각 다음과 같이 설명하고 있습니다.

confluent.io (https://www.confluent.io/)
Kafka Connect allows you to integrate Apache Kafka® with other apps and data systems with no new code.

apache.org (https://kafka.apache.org/documentation/#connect)
Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other systems.

요약하면 카프카 커넥트란 다음과 같이 정의할 수 있을 것 같습니다.

카프카 커넥트란 “카프카"와 “다른 시스템"간 데이터를 코드없이 연결해주는 툴이다.

조금 더 이해를 돕기 위해 카프카 커넥트의 구성도를 같이 살펴보도록 하겠습니다.

카프카 커넥트 구성(1)
카프카 커넥트 구성(2)

이미지 출처 : https://www.confluent.io/ko-kr/blog/kafka-connect-deep-dive-converters-serialization-explained/

카프카 커넥트는 정의와 그 구성에서 알 수 있듯이 일종의 중개자라고 할 수 있습니다.

① 소스와 카프카의 중간에 위치해서 변경된 데이터를 카프카로 전송하거나,
② 카프카와 타겟의 중간에 위치해서 카프카에서 타겟으로 데이터를 전송할 수 있습니다.
(참고로 해당 글은 ①번 구성에 대한 적용기입니다. ②번 구성에 대한 글은 다음에 기회가 된다면 추가로 연재하도록 하겠습니다.)

그리고 정의에서 알 수 있듯이 메시지 전송을 위해 별도의 코딩은 필요없으며 몇 가지 설정으로 원하는 결과를 얻을 수 있는 편리한 도구입니다.
예를 들어 카프카 커넥트를 이용하면 별도의 프로듀서나 컨슈머에 대한 코딩 없이 MySQL의 데이터 변경분을 엘라스틱서치로 바로 전송할 수도 있습니다.

카프카 커넥트에 대한 간단한 FAQ로 해당 단락은 마무리 하도록 하겠습니다.

  1. 카프카 커넥트는 그래서 어디에 설치하는 건가요?
    ➥ 본인 환경이 허락하는 내에서 어디에 설치해도 무방합니다. 카프카 커넥트는 원격으로 연결되기 때문에 물리적인 위치는 크게 상관없습니다. 구성도에 나와 있는 카프카 커넥트의 위치도 개념적인 위치일 뿐 물리적인 위치는 아닙니다.
  2. 외부시스템에 카프카 커넥트를 사용하면 좋다고 합니다. 그런데 카프카 커넥트를 사용할 수 있는 환경이라면 결국 내부 시스템 아닌가요? 그렇다면 굳이 프로듀서/컨슈머가 아닌 카프카 커넥트를 사용할 필요가 있나요?
    ➥ 네. 맞습니다. 카프카 커넥트를 사용할 수 있는 환경이면 거의 대부분 내부 시스템일 것입니다. 우리가 관리하지 않는 외부시스템에서 카프카 커넥트를 허용해줄 가능성은 거의 없죠. 그럼에도 카프카 커넥트를 사용할 일은 생기게 됩니다. 그 이유는 다음 장에서 설명드리도록 하겠습니다.
  3. 정말 별도의 코드가 없나요?
    ➥ 설치하고 몇 가지 설정만 한다면 별도의 코드없이 사용가능합니다.
  4. 어떤 시스템에든 적용이 가능한가요?
    ➥ 그렇지는 않습니다. 공개된 커넥터가 있는 시스템에만 적용이 가능합니다. 커넥터 목록은 컨플루언트(Confluent)에서 유지 및 관리됩니다.(https://www.confluent.io/product/connectors/)

2) 왜 카프카 커넥트인가

저희팀은 재작년부터 모놀리틱 시스템을 MSA로 전환중에 있으며 공통적으로 CQRS 패턴으로 쿼리를 구현하고 있습니다. 팀마다 사용하는 DB는 다르며 저희팀은 Postgresql DB를 커맨드를 위한 DB로, Mongo DB를 view를 위한 DB로 사용하고 있습니다. 이 때 Postgresql DB와 Mongo DB 사이에 데이터 동기화가 필요했고 전사적으로 사용 가능한 카프카가 존재했기에 새로운 메시지 큐를 구축하기 보다는 기존의 카프카를 활용하기로 결정했습니다. 그리고 view전용의 자료구조 형태로 데이터를 가공해서 저장해야 했기에 카프카에서 Mongo DB로 데이터를 업데이트 하는 것은 Consumer로 구현하기로 했습니다.

남은 고민은 데이터 변경분을 어떻게 감지하고 메시지를 생성할 것 인가 였습니다. 처음 생각했던 방안은 2가지 였습니다.

① 데이터 변경을 감지하기 위한 공통DAO나 annotation 사용
② 배치를 이용하여 변경분을 조회하고 메시지를 생성

하지만 위 두 가지 방식은 각각 분명한 단점이 있었습니다.

① 데이터 변경을 감지하기 위한 공통DAO나 annotation 사용

공통 DAO나 annotation을 붙이지 않으면 누락되는 데이터가 발생합니다. 하나의 팀 하나의 시스템을 통해서만 데이터가 변경된다면 어느 정도 고려해볼가치가 있겠지만 그렇지 않은 데이터들이 대부분이였기 때문에 누락이 생길 가능성이 높았습니다. 또 다른 문제점은 운영하다 보면 DB에서 값을 직접 변경하는 경우가 생기게 됩니다. 이럴 때는 데이터 변경분에 대해서 수동으로 메시지를 발생시켜줘야 하는데 이 역시 누락될 가능성이 높습니다.

② 배치를 이용하여 변경분을 조회하고 메시지를 생성

①번의 방식보다는 데이터 누락에 대한 이슈는 줄어들겠지만 다음과 같은 문제가 발생합니다. 먼저 변경분에 대한 딜레이가 커집니다. 변경분이 적다면 수초이내 동기화가 되겠지만 변경분이 늘어날 수록 동기화가 느려집니다. 그리고 계속해서 DB를 조회해야 하기 때문에 DB부하도 늘어나게 되죠. 이런 현상은 동기화 데이터가 많아질수록 점점 큰 문제로 다가오게 됩니다.

정책상 위 두가지 방식의 단점을 어느 정도 허용할 수 있다면 둘 중 나은 방법을 선택해도 상관없습니다. 다만 저희는 내부적으로 둘 모두 허용할 수 없었고 결국 카프카 커넥트가 최선의 대안이 될 수 있다는 결론에 도달하게 되었습니다.

카프카 커넥트 적용시 얻을 수 있는 이점은 다음과 같았습니다.

  • 변경분을 별도로 감지하지 않아도 되기 때문에 변경분 누락에 대해서 신경쓰지 않아도 된다.
  • 변경분을 거의 실시간으로 전달받을 수 있다.

3) 카프카 커넥트 적용

DB설정변경

카프카 커넥트를 사용하기 위해서는 DB의 설정에도 약간의 변경이 필요합니다. 저희는 DBA팀에서 Postgresql DB를 관리하고 있었기에 카프카 커넥트를 사용할 수 있도록 설정 변경을 요청하였고 DBA팀에서 다음과 같은 처리를 해주셨습니다.

✔︎ 파라미터 설정
1)
shared_preload_libraries = ‘pg_stat_statements,pg_hint_plan,repmgr,wal2json,decoderbufs
2)wal_level = logical

✔︎ extention : wal2json, decodebufs

✔︎ KAFKA 전용계정생성

카프카 커넥트 설치

저희는 개발은 도커이미지를 통해 커넥트를 설치했으며 운영은 별도 설치파일로 커넥트를 설치하고 운영중입니다. Postgresql 플러그인은 개발/운영 동일하게 debezium postgresql plugin을 사용하고 있습니다. 카프카 커넥트 설치는 설명의 편의를 위해 도커 이미지를 기준으로 하도록 하겠습니다.

✔︎ 사용한 카프카 커넥트 이미지 : https://hub.docker.com/r/confluentinc/cp-kafka-connect

✔︎ postgresql 플러그인 다운로드 : https://debezium.io/documentation/reference/stable/install.html → Postgres Connector plugin archive

Postgresql 플러그인만 우선 다운로드 하시고 다음을 진행해주시면 됩니다. 플러그인 path 부분은 조금 헷갈릴 수 있어 부연설명을 하자면 다운로드 사이트에서 debezium-debezium-connector-postgresql-1.8.1.zip 파일을 다운로드 받았고 그 경로가 “/etc/kafka-connect/” 이며 압축해제후 최종 경로가 “/etc/kafka-connect/debezium-debezium-connector-postgresql-1.8.1/”라고 한다면 플러그인의 경로(CONNECT_PLUGIN_PATH)는 “etc/kafka-connect/”가 됩니다. 이 부분만 주의하시면 설치에 큰 어려움은 없습니다.

카프카 커넥트 Docker 이미지 설치 명령어

위 두 가지 명령만 수행하시면 커넥트 설치는 끝이 납니다. 각 옵션명을 보시면 아시겠지만 카프카를 어느 정도 알고 계시다면 옵션들이 어떤 의미인지 쉽게 이해하실 수 있습니다. 옵션에 대한 자세한 설명은https://docs.confluent.io/platform/current/installation/docker/config-reference.html 에서 확인 가능합니다.

카프카 커넥트 등록

설치까지 완료되었다면 실제로 커넥트를 등록해야 원하는 동작을 수행하게 됩니다.

커넥트 등록 및 수정, 재시작, 상태확인 등은 모두 rest api를 통해서 이루어지게 됩니다. 그래서 커넥트 설정시 hostname과 rest_port를 설정해주고 있습니다.

커넥트 등록은 크게 ①설정파일 만들기, ② 등록api 호출의 두 과정으로 이루어 집니다.

설정파일 만들기

설정파일을 보기전 카프카 커넥트의 기본적인 메시지 포맷과 저희팀에서 필요했던 메시지 전송에 대한 요구사항을 먼저 알아보도록 하겠습니다.

카프카 커넥트의 기본적인 메시지 포맷은 INSERT, UPDATE, DELETE 이벤트 별로 약간 상이하긴 하나 기본적으로 다음 값들을 포함하고 있습니다.

▪︎ 스키마

▪︎ before 데이터

▪︎ after 데이터

▪︎ 기타 META 데이터

아래 메시지는 INSERT 이벤트에 대한 메시지 포맷예시입니다.

UPDATE, DELETE 이벤트 메시지 내용은 INSERT 이벤트와 대동소이합니다. UPDATE는 payload의 before가 있고, DELETE는 payload의 after가 없다 정도의 차이만 있다고 보시면 됩니다.

그리고 저희가 원했던 메시지 전송에 대한 요구사항은

  1. 특정 테이블의 변경 데이터만 필요하다.
  2. schema 정보는 불필요하다.
  3. after data만 필요하다.
  4. META 데이터중 “op”, “table”, “ts_ms”만 필요하다
  5. 각 메시지는 정해진 토픽으로 전송되어야 한다.

카프카 커넥트가 기본적으로 생산하는 메시지 및 기본 메시지 전송정책과 저희가 원했던 메시지 전송에 대한 요구사항을 적용했을 때 아래와 같은 config파일이 만들어 졌습니다.

요구사항과 config를 매핑시켜보면

  1. 특정 테이블의 변경 데이터만 필요하다.
    - table.include.list: devdmsadb.dpage,devdmsadb.dpage_cmpt
    테이블 목록에는 스키마이름도 포함됩니다. 테이블 목록의 기본포맷은 “schemaName.tableName” 입니다.
  2. schema 정보는 불필요하다
    - key.converter.schemas.enable: false
    - value.converter.schemas.enable: false
  3. after data만 필요하다
    - transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
    - transforms.unwrap.delete.handling.mode: rewrite

    delete event의 경우에는 after data가 기본적으로 없기 때문에 “rewrite” 옵션이 반드시 필요합니다.
  4. META 데이터중 “op”, “table”, “ts_ms”만 필요하다
    - transforms.unwrap.add.fields: op, table, source.ts_ms
  5. 각 메시지는 정해진 토픽으로 전송되어야 한다.
    - transforms.Reroute.type: io.debezium.transforms.ByLogicalTableRouter
    - transforms.Reroute.topic.regex: [^\\s]+(.*?)\\.(dpage.*|disp_apl_tgt_salestr)$
    - transforms.Reroute.topic.replacement: front-$2_chng

    카프카 커넥트가 생성하는 기본 토픽명의 형식은 “serverName.schemaName.tableName” 입니다. 카프카 커넥트가 생성하는 기본 토픽명은 기존에 저희가 사용하던 토픽명의 정책과 맞지 않아 변경이 필요했습니다.

config역시 그 명칭이 직관적이라 config명 만으로도 대략 어떤 설정이구나 라는 것을 쉽게 알 수 있습니다.

위와 같이 설정했을 때 최종적인 메시지 포맷은 다음과 같습니다.

카프카 메시지.최종

그 밖의 설정값에 대한 자세한 설명은 아래 링크를 참고하셔서 사용하시면 됩니다.

등록API 호출

설정 파일을 만든 후에는 작성된 설정파일을 통해 등록 API를 호출합니다. API 호출 이후부터 각 이벤트에 대한 메시지가 정상적으로 발생하는 것을 볼 수 있습니다.

등록 API(with config.json)

앞서 얘기한 것처럼 카프카 커넥트와는 REST API를 통해 통신할 수 있습니다.
자주 사용하는 기타 다른 명령어는 다음과 같습니다.

자주 사용 하는 명령어 (https://docs.confluent.io/platform/current/connect/references/restapi.html)

4. 주의사항

마지막 단락에서는 카프카 커넥트를 적용하면서 발견했던 몇 가지 주의사항을 공유하고자 합니다.

  1. 카프카 커넥트가 비정상적으로 종료되었을 경우에 postgresql의 wal파일이 계속 증가할 수 있습니다. 모니터링을 통해 카프카 커넥트의 비정상 종료를 감지하고 wal 파일 사이즈에 대해서도 계속 체크를 해야 합니다. 그렇지 않으면 postgresql 서버에 문제가 발생할 수 있습니다.
  2. 카프카 커넥트 설정을 변경하는 경우에는 PUT method를 호출해야 합니다.
  3. 토픽이 자동생성되는 환경이 아니라면 필요한 토픽을 미리 생성해야 합니다. 메시지가 전송되는 토픽뿐만 아니라 카프카 커넥트와 관련된 환경변수를 관리하는 토픽도 만들어져야 합니다. 카프카 설치시 환경변수(CONNECT_CONFIG_STORAGE_TOPIC,CONNECT_OFFSET_STORAGE_TOPIC,CONNECT_STATUS_STORAGE_TOPIC) 로 지정했던 3개의 토픽과 hearbeat로 사용하게되는 __debezium-hearbeat.<schema> 토픽 총 4개의 토픽이 대상입니다.

이상으로 카프카 커넥트 적용기에 대한 설명은 모두 끝이 났습니다. 긴 글 읽어주셔서 감사드리며 카프카 커넥트 도입을 고려하고 있는 분들에게 작게나마 도움이 되는 글이였으면 좋겠습니다.

--

--