검색엔진 색인 파이프라인 교체 CDC, Kafka, Spark, Elasticsearch

sg
네이버 쇼핑 개발 블로그
16 min readFeb 23, 2023

쇼핑카탈로그개발팀에서 검색엔진(Solr -> Elasticsearch) 색인 파이프라인 변경 프로젝트를 진행하며 얻은 경험을 공유드립니다.

네이버 쇼핑은 꾸준히 연 10~20% 의 지속 성장을 해왔습니다. 그 결과 상품수는 약 18억건 이상, 카탈로그 약 1억건 이상의 데이터를 다루는 거대 시스템이 되었습니다.
데이터의 증가에 따라 초기 설계되었던 검색엔진 파이프라인은 Throughput을 감당하기 힘들어졌으며 데이터 정합성과 Legacy dependancy에 대한 유지보수 공수가 나날히 증가하였습니다.
신규 버티컬과 레거시를 커버하는 새로운 시스템의 필요성이 대두되어. 총 문서 20억건, 일 변경량 3억건 이상의 Data Flow를 처리할 수 있는 새로운 파이프라인을 설계 및 전환하였습니다.

기술 스택

  • CDC(Change Data Capture)
  • Kafka, Kafka-connect, KSQL
  • Spark
  • Elasticsearch

파이프라인 구조

(기존) Solr 파이프라인
(신규) Elasticsearch 파이프라인

이전 기술의 문제점 및 개선 방법

1. 색인 누락

이전 기술와 가장 근본적인 차이는 이벤트의 Source입니다. 이전 파이프라인에서 이벤트의 Source는 어플리케이션이었습니다. 운영자가 데이터를 변경하거나 시스템이 데이터를 변경하였을 때 이벤트를 전파하여 색인을 하게 됩니다. 색인이 필요한 데이터를 각 어플리케이션에서 이벤트를 발생시켜줘야 기에 누락 발생 여지가 있었습니다. 데이터 source가 여러군데이고 정보 구성 방식이 불명확하기에 누락 시 문제 원인을 파악하기 쉽지 않았습니다.

신규 파이프라인은 색인 체계를 단일화 시키고 데이터 흐름을 명확히 하였습니다. 우선 이벤트의 Source는 DB(CDC) 입니다. 어떤식으로 데이터가 변경이 되었든 최종 결과를 가지고 색인을 합니다. 또한 최종적으로 색인을 담당하는건 kafka-connect로 단일화 하였습니다. CDC 이벤트에 담겨진 정보 이외의 추가정보가 색인 필요할 경우 별도 App에서 해당 정보를 채우지만 App에서 색인 하지않고 connect에 위임함으로서 색인 과정을 단일화하여 모니터링 용이성을 취하였습니다.

2. 색인 속도

색인 프로세스를 기존 복잡한 과정에서 간결화 시킨것도 속도에 큰 도움이 되었지만 변경 이벤트를 대부분 Stream 처리하고 K8S기반으로 유동적 resource를 관리함으로서 모든 색인에 대하여 지연없는 적용이 가능해졌습니다.

3. 낮은 Solr 버전으로 신규 프로젝트 의존성 문제

기존 프로젝트들에선 검색엔진 활용을 내부 공통 모듈을 이용하거나 Solr 라이브러리를 직접 참조하여 의존성 문제가 있었습니다. 신규 검색엔진의 경우 조회 API 하나로 두어 infra 의존성을 끊었습니다. 기존 Legacy 프로젝트들 모두 해당 API를 바라보도록 변경하였습니다.

4. 전체색인 데이터 디펜던시 (Oracle snapshot)

기존 파이프라인은 하루에 한번 필히 전체 색인을 수행하였습니다. 그런데 Oracle snapshot 데이터를 기반으로 실행되었기 때문에 snapshot 과정의 오류로 인해 프로세스가 돌지 못하는 경우가 존재하였습니다.
신규 파이프라인은 특별한 사유(필드 추가)에만 전체색인을 수행합니다. 전체색인을 진행할때는 PG와 sync가 맞춰진 kudu 스토리지를 바라보기에 전체색인이 돌지 못하는 문제는 개선되었습니다.

기술 스택 선정 이유

  • CDC는 데이터 변경의 후속처리를 하는 기술이기에 이벤트와 데이터의 관심사항을 분리할 수 있었고 파이프라인의 복잡성을 줄일 수 있어 채택하였습니다.
  • KSQL, kafka-connect는 현재 메세지 처리 패러다임의 중심에 있는 kafka 엔진 활용 및 스트림 처리와 scale out 이 원활한점, 편리한 사용법을 이유로 채택하였습니다.
  • Elasticsearch는 기존 Solr 와 동일한 Lucene 엔진을 사용하는 검색엔진으로 현재 검색엔진의 대표격이며 많은 유즈케이스와 좋은 성능으로 채택하였습니다.

기술 구현 문제 해결

1. 문서 삭제, 부분삭제 증감색인 문제 (kafka-connect key 수정)

  • ES Document 중 일부 필드 삭제 : 카탈로그 혹은 상품을 구성하는 일부 특성이 삭제되는 경우 해당 테이블의 Key를 가진 ES Document의 특정 필드를 부분 삭제한다.
  • ES Document 전체 삭제 : 카탈로그 혹은 상품이 삭제되는 경우 해당 테이블의 Key를 가진 ES Document도 함께 삭제합니다.

색인을 위해 두가지 삭제가 진행되어야하는데, ES Document Version Conflict 로 Document 삭제가 불가능한 문제가 있었습니다.

  • Version Conflict
    - ES에서 업데이트 충돌을 막기 위해 Document 별로 자체적으로 관리되는 version이 존재하며, Kafka Sink Connect를 통한 ES 색인 시 메세지의 Kafka Offset을 version으로 사용한다. 만약 수정하고자 하는 Document의 version보다 어플리케이션에서 받은 토픽 메세지 파티션이 작거나 같으면 Version Conflict 문제가 발생
    - 현재 파이프라인의 구조는 Document 생성/삭제 이벤트 토픽과 부분 수정 이벤트 토픽이 분리되어 있고 생성/삭제 및 수정 이벤트가 빈번히 발생하기 때문에 아래와 같은 상황이 발생할 수 있음

Kafka Sink Connect의 version 설정은 다음과 같습니다.

  • “key.ignore” 옵션에 따라 ES version 정책이 나뉨
    - key.ignore : true → Kafka Connect에서 Key를 “topic+partition+offset” 형식으로 새로 정의하여 Document ID로 사용 → ES “Internal versioning”
    - key.ignore: false → 카프카 메세지의 Key를 Document ID로 사용 → ES “External Versioning”

Document ID를 네이버 쇼핑 카탈로그 및 상품의 고유 ID로 사용하고 있으므로 External Versioning을 통해서 Kafka Offset을 통한 version으로 결정되고 있던 것 이었습니다.

이를 해결하기 위해 external version을 무시할 수 있도록 Kafka Sink Connect를 커스텀 수정하였으며 삭제를 담당하는 Connect Task 설정에 추가하여 활용중입니다.

2. 형태소 분석기

기존의 Solr에서는 ngram 방식을 통해 토크나이징을 진행했으나 이번에 전환에서 검색엔진의 장점을 이끌어내기 위해 여러 형태소 분석기 도입을 검토했습니다.

저희가 풀고자했던 텍스트 검색 문제는 아래와 같습니다

  • 대소문자 구분없이 검색 결과가 동일하게 나와야 한다.
  • 특수문자가 포함된 상품 및 카탈로그도 정상적으로 검색되어야 한다.
  • 숫자+의존명사 검색시 정상적으로 검색되어야 한다.
  • 검색어와 연관이 높은 상품 및 카탈로그가 상위 노출되어야 한다.

그에 따라 시도해 본 형태소 분석기는 다음과 같습니다.

  1. nori analyzer : Elasticsearch 한국어 공식 플러그인으로 내부적으로 mecab 사용자 사전을 사용한다.
    - 준수한 전체색인 속도 : 8600만건 색인 시 약 40분 소요
    - 특수문자, 숫자+의존명사, 상위노출이 제대로 발생하지 않는 문제가 존재
    - mecab 에서 제공되는 용어사전 외의 새로운 고유 명사를 별도로 관리해야 함
  2. linguist2 analyzer : 네이버 검색 시스템을 위한 기본 NLP 라이브러리로 한국어 기준으로 네이버에서 관리되는 hanakma 용어 사전을 사용한다.
    - 다소 느린 전체색인 속도 : 8600만건 색인 시 약 1시간 40분 소요
    - 형태소 분석기를 통해 tokenizing 된 토큰수가 많고 테스트 시 동의어 사전 옵션을 사용하고 있어서로 추정
    - 특수문자의 경우 유니코드 U+10000 이상인 이모지가 포함될 경우 색인 불가 이슈가 존재한다
    - 숫자 + 의존명사 문제 해결 가능
    - 동의어 사전을 제공함으로써 사전에 등록된 단어와 동일한 의미의 토큰을 가진 카탈로그 및 상품도 함께 검색되어 검색 퀄리티를 높일 수 있다.
  3. ngram tokenizer: 임의의 개수 n을 정하여 n개의 연속적인 단어 나열 단위로 끊어 토큰을 저장한다.
    - 준수한 전체색인 속도 : 8600만건 색인 시 약 40분 소요
    - 위의 대소문자, 특수문자, 숫자+의존명사, 상위 노출 문제 모두 해결 가능
    - 형태소 분석, 동의어 검색, 오타 발생 시 연관 검색어 추천 등과 같은 추가 기능 제공이 어려움
    - 기존 검색 방식과 유사하여 사용자 사용 호환성 유지
    - 토큰 길이 제한 필요

정리하면 다음과 같으며 Index 요구사항에 따라 취사선택합니다.

3. denormalize 이슈

상품 Index에 브랜드 정보가 필요한 경우, 브랜드의 정보가 변경될 때 브랜드에 속한 모든 상품에 정보를 update 해주어야합니다. 이런 field를 Denormalized Field라고 하는데요.
브랜드, 몰에 따라 정보 변경 한번에 천만건 이상의 update가 일어날 수 있기 때문에 효율적인 방법을 찾아야 했습니다.
denormalized field를 색인, 조회 하기위한 방법으로 다음 4가지로 추리고 진행했습니다.

  1. Index Join — Elasticsearch join field type을 이용하는것으로 parent/child 구분하여 색인처리
  2. 실시간 bulk update application — 별도 application에서 실시간 denormalized target을 조회하여 색인처리
  3. Udate_by_query API — bulk update를 위한 Elasticsearch 내부 API로 간편한 사용법 parallelize, throttling 조절 옵션 존재
  4. Spark Job — 트래픽이 적은 시간대 spark job으로 일괄 처리하는 방식

여러가지 방법을 시도해보았으나, 실시간 증감색인의 version conflict issue가 있었으며 우회하여도 유지보수에 비효율 증대가 문제되었습니다.
결론적으로 저희는 denormalize field를 없애고 현재 사용되는 Client 사용처(legacy)를 변경하여 검색 depth를 주는것으로 선회하였습니다.

4. 실패처리 방안 (DLQ 파이프라인, Kafka-connect fail to DLQ)

전체색인을 수행하지 않고 증감색인만으로 유지되는 파이프라인을 가정하여 운영플랜을 계획하였기 때문에 실패처리에 대하여 Robust한 시스템을 구성해야했습니다.

우선 색인 실패를 유발하는 케이스는 두가지로 나뉠 수 있습니다.

  1. Temporary failure — 일시적 문제로, 별 다른 처리를 하지 않아도 상태가 괜찮아 질 수 있는 실패 (ex. Network issue)
  2. Permanent failure — 색인 불가능한 경우. Data, Plugin 등 예상하지 못한 곳에서 발생하며 운영 초기 판단 및 예상 불가 (ex. Analyzer U+10000이상 글자 색인 문제)

실패를 대응 방법으로는 3가지로 나뉠 수 있습니다.

  1. 최대한 Retry
  2. 몇번의 Retry 후 해당 Message skip (DLQ, 실패 log 등 실패 event 별도 관리)
  3. 몇번의 Retry 후 DLQ(Dead Letter Queue) 전송 후 refresh 파이프라인

대응 방법 별 실패 유발 케이스를 정리하면 다음과 같습니다.

[1. 최대한 Retry]

[2. Retry 후 Skip]

[3. Retry 후 Refresh pipeline]

저희는 운영 초기 Permanent failure 예상이 힘들기 때문에 3번 방법으로 운영 후 파이프라인이 안정화 됬을 때 간결화를 위해 2번 방법을 가는것으로 정책 합의를 하여 적용하였습니다.

그런데, Kafka-connect 에서는 모든 Error를 DLQ로 보내지 않습니다.

위와 같이 확인한 바, Kafka-connect에서 핸들링 할 수 없는 에러에 대해서는 관리하지 않겠다는 기조인데요. 저희는 Error 정책에 따라 모든 실패 케이스가 DLQ pipeline을 타야한다고 생각했습니다.

Kafka-connect에서 Elasticsearch bulk request 실패를 처리하는 방법은 2가지로 나뉩니다.

  1. Elasticsearch에서 실패 메세지를 직접 받는 경우 (mapper_parsing_exception, illegal_argument_exception, action_request_validation_exception)
  2. 네트워크 오류 및 기타 오류

1번 케이스는 DLQ로 처리되고 있었지만 2번 케이스는 제대로 처리되고 있지 않았습니다.

Kafka-connect의 실패 관련 수정 이력(Githup)을 살펴보면 11.0.4 ver 에서 retry 를 추가한 후 Retries hang forever issue로 원복 되는 등 이력을 파악했구요. 저희는 모든 Error가 커버될 수 있게 Fail case 전부 DLQ로 보내도록 DLQ handling 수정을 하여 적용하였습니다.

5. 전체색인 속도 최적화

전체색인은 평소에 수행되지 않지만 Index 초기 세팅시와 부분 변경 등 수행해야 할 때가 있습니다. 색인 용량이 대용량이기도 하고 서비스에 지장이 없도록 최소한의 시간으로 수행해야하기에 최적화를 진행하였습니다.

spark 배치를 통한 전체색인 과정
- kudu table에서 필요한 데이터 추출 > dataframe → sink connector 또는 ‘elasticsearch-spark’ 패키지를 통해 타겟 인덱스에 색인

최적화를 위한 시도들

  • ES hadoop config 설정
    - es.batch.size.bytes: ElasticSearch bulk API에서 사용하는 batch byte size 설정
    - es.batch.size.entries: ElasticSearch bulk API에서 사용하는 batch entry size 설정
    - refresh_interval: Elasticsearch 에서 세그먼트가 만들어지는 refresh interval 설정
    (es.batch.write.refresh=true로 사용시 모든 multiple bulk 가 끝났을 때 refresh하기 때문에, refresh_interval를 조정해주는건 영향이 없었습니다)
    [위 설정은 수행 시간 차이에 큰 영향이 없음]
  • Dataframe repartition
    - es_rejected_execution_exception 에러 핸들링 (threadpool.write.queue_size 증가: write 하기까지 queue에 쌓아놓은 size 설정)
    [Dataframe 추출 수행시간은 축소되었으나, 색인 성능 자체에는 큰 변화가 없음]
  • 색인 방식 변경
    - spark에서 ES로 바로 색인하는 방법과 kafka message를 발행하여 sink-connector로 색인하는 방법 속도 비교
    [실제 색인되는 속도에는 큰 차이가 없음]
  • Index shard size 조절
    - shard size를 20 / 30 / 60 등 설정하여 색인 속도를 측정
    [Index 설계시 참고하였던 문건들의 shard당 추천 용량이 10~50GB인데, 이 수치 부터 색인 속도는 saturation 되는것을 확인]
  • Index mapping의 analyzer로 인한 색인 속도 저하 확인
    - analyzer 필드가 다수 존재하는 인덱스와 그렇지 않은 인덱스에 각각 전체색인을 진행한 결과
    [색인 속도의 차이가 발생하는 것을 확인하였으며 analyzer간의 차이도 유의미함]
  • Index replica 0으로 셋팅
    - 전체색인을 하는 중 데이터를 안전하게 delivery할 필요가 없기 때문에 replica를 0으로 설정
    [색인 속도가 크게 향상]
  • Kafka max.in.flight.requests 설정
    - 단일 브로커에 대해 병렬적으로 실행될 produce request의 개수 설정
    - 메시지의 순서 역시 보장되어야 하기 때문에, 이 설정을 10 → 1 로 변경하였습니다.
    [색인 속도적인 측면에서는 오히려 오버헤드가 없어져 속도가 향상되는 것을 확인]
  • Spark sql 쿼리 최적화
    - 복잡한 서브 쿼리를 별도의 temp view로 추출하여 사용
    - spark의 skewed data 이슈를 피하기 위해 self-join 시, join 조건 컬럼이 null 인 row와 그렇지 않은 row로 분리하여 join 수행
    [데이터 추출, 색인 시간 유의미한 향상]
  • Spark에서 user-defined-function 사용 지양
  • Executor/Driver memory size 증가
  • Sink-connector task 개수 조절
    - 색인되는 전체 데이터의 수에 따라 적절한 connector task 개수를 설정

위와 같은 다양한 시도를 통해 최적화 진행 후 최종적으로 18억 건 색인 시, 소요 시간 약 3시간으로 단축하였습니다.

6. 운영 Index 교체/파이프라인 교체 시나리오

저희는 한 클러스터로 Down time 없이 서비스를 제공하기 때문에 필드 변경 등 Index 교체가 필요 할 때 대응 방법이 필요하였습니다.

Alias를 이용하여 client의 target을 보장해주어야 하는데, 문제는 kafka-connect에서 색인 시 alias를 이용하지 못합니다.
- 현재 시스템에 적용한 kafka connect base(11.0.8 ver)는 해당기능이 없으며 11.1.x ver부터 기능이 추가되었지만 사용 시 throughput이 굉장히 낮아집니다. 이유는 매 bulk request 마다 alias를 조회하여 데이터를 넣어주기 때문입니다.

이를 해결하기 위해 정책으로 조회와 색인을 Alias, Index로 분리하고 Index 교체 시 Clone API 를 활용하도록 하였습니다.

또한 여러 Index 에서 같은 Stream topic을 활용하는 경우가 있습니다. 이때 파이프라인 수정으로 Schema가 변경된다면 메세지 전파 시 호환성에 문제가 생길 수 있습니다.

호환성을 보장하도록 파이프라인 앞단부터 정지하여 Lag이 소화 된 후 Connector resume하도록 정책화 하였습니다.

마치며

현재 쇼핑카탈로그개발팀의 검색엔진 파이프라인은 위 이슈들을 해결 후 기존 파이프라인을 대체하여 원활히 서비스되고 있습니다. 본 글에서는 다루지 않았지만 각 시스템별 Log는 별도 Elasticsearch로 전송하여 관리되고 있으며, Grafana 와 사내시스템으로 모니터링하고 있습니다. 색인 파이프라인을 구성중이신 분들에게 이 글이 도움이 되었으면 좋겠습니다.

--

--