Snowflake 로 스트림 데이터 처리하기 (feat. Kafka connector)

MJ Lee
Snowflake Korea
Published in
15 min readOct 31, 2022

Apache Kafka 는 분산 이벤트 스트림 처리 플랫폼으로서, 안정적인 대용량 메시지 처리, 낮은 latency, 높은 throupuht 등의 장점을 기반으로 비동기 방식의 실시간 데이터 처리를 위해 대다수 기업에서 사용되고 있는 솔루션입니다.

Kafka Cluster 와 (Snowflake 같은) 외부 시스템을 연동하기 위한 프레임워크가 바로 Kafka Connector 로, Kafka Cluster 와는 별도의 클러스터로 운영됩니다. Snowflake 가 제공하는 Kafka connector는 Kafka Cluster 토픽으로부터 데이터를 읽어 Snowflake 로write 하도록 설계되었습니다. 이번 글과 다음 글을 통해 1) Snowflake Kafka connector의 동작 방식, 설정 방법 및 2) JMX 를 활용한 Kafka Connector 모니터링 방법과 3) 이를 개선한 Snowpipe Streaming 를 활용하는 방식에 대해 자세히 알아보겠습니다.

  1. Snowflake Kafka Connector 동작 방식 및 설정 방법
  2. JMX 를 활용해서 Prometheus/Grafana 로 Kafka Connector 모니터링 하기
  3. Snowpipe Streaming 을 활용한 Snowflake Kafka Connector (Preview 기능)

1.Snowflake Kafka Connector 동작 방식 및 설정 방법

일반적으로 Kafka 메시지가 row 한 건을 포함한다고 봤을 때, Kafka 토픽의 이런 row 단위 스트림이 Snowflake 의 테이블로 insert 됩니다. 다른 Pub/Sub 플랫폼처럼 Kafka 도 Publisher 와 Subscriber 의 다대다 관계를 지원하는데, Snowflake 에서는 토픽 하나가 Snowflake 의 테이블 하나로 매핑 되는 패턴이 일반적입니다. 토픽에 대응하는 Snowflake 의 테이블은 커넥터 설정파일에서 지정할 수 있으며 대응하는 테이블이 없을 경우 토픽 이름을 참조하여 자동으로 생성됩니다. 미리 타겟 테이블을 만들 경우 variant 타입으로 RECORD_CONTENT, RECORD_METADATA 두 컬럼으로 구성된 테이블을 생성하면 됩니다. RECORD_CONTENT 컬럼에는 Kafka 메시지가 저장되며, RECORD_METADATA 에는 topic 이름, partition, offset, createTime(메시지가 토픽에 도달한 시각), key, schema_id 등 메타정보가 저장됩니다.

Snowpipe + Kafka Connector 아키텍처

동작 방식은 다음과 같습니다. 먼저 Producer 애플리케이션이 메시지를 Kafka 클러스터로 publish 하면 해당 데이터는 파티션으로 전송됩니다. Snowflake Kafka connector 가 Topic 에서 메시지를 가져옵니다. 이때 Kafka connector 에 설정 파일 (./kafka_connector_install_path/config/SF_connect.properties) 에 명시된 아래 세 값이 임계치가 되는데,

buffer.count.records

buffer.flush.time

buffer.size.bytes

각각은 버퍼에 쌓인 레코드 수, flush 주기, 버퍼 사이즈에 해당하며 이 중 한 가지에 먼저 도달하면 커넥터는 버퍼에 있는 메시지들을 임시 파일 형태로 Snowflake Internal stage 에 write 합니다. 토픽 당 하나의 stage 가 생성되며 파티션의 개수만큼 Snowpipe 가 생성됩니다. 이후 커넥터는 각 Snowpipe 를 트리거해서 임시 파일들이 Snowflake 타겟 테이블로 적재되도록 합니다.

이후 커넥터는 Snowpipe 를 모니터링하고 임시 파일이 타겟 테이블로 로딩된 것을 확인한 후 각 파일들을 삭제합니다. 파일 로딩이 실패하면 커넥터는 해당 파일을 (해당 토픽에 대응하는 테이블의) table stage 로 이동시키고 에러 메시지를 발생시킵니다.

임계치 중 buffer.flush.time 는 비용/latency 사이에 tradeoff 관계가 있으니 이 점을 고려해서 튜닝이 필요합니다. Flush time 을 줄이면 latency 가 줄어들겠지만 그만큼 작은 파일들이 더 많이 생성되기 때문에 더 자주 snowpipe 가 동작하게 됨으로써 비용이 높아지게 됩니다. buffer.size.bytes 는 OOM 을 피하기 위한 커넥터 노드의 heap memory 사이즈를 고려해서 설정해야 합니다. 대략 해당 노드에 할당된 파티션 갯수 * buffer size + alpha 가 커넥터가 사용하는 max 메모리가 될 것입니다. 결론적으로 위 세가지 임계치를 모두 증가시키면 latency 는 증가하지만 비용은 감소하고, 파티션과 토픽 개수를 줄여도 latency 는 증가하지만 비용은 감소하게 됩니다.

Kafka Connector 설정 방법은 Snowflake 메뉴얼에 상세히 기술되어 있으니 다음 링크를 참고하세요. https://docs.snowflake.com/en/user-guide/kafka-connector-install.html 이 메뉴얼에서 key pair 설정 시 encrypted private key 방식을 따를 경우 커넥터 외에 bcpkix-fips.jar bc-fips.jar 를 추가로 다운받아야 하는 점에 유의해야 합니다.

커넥터 클러스터를 분산 모드로 띄운다면 ./kafka_connector_install_path/config 아래 connect-distributed.properties 파일을, standalone 모드로 띄운다면 connect-standalone.properties 파일에 bootstrap.servers 가 Kafka Cluster 를 바라보도록 설정하는 것도 필요합니다.

2. JMX 를 활용해서 Prometheus/Grafana 로 Kafka Connector 모니터링 하기

Snowflake Kafka Connector 는 Kafka 환경에 대한 메트릭을 수집하는 데 활용할 수 있는 JMX MBean(Managed Bean)을 제공합니다. 이 정보를 Prometheus 및 Grafana 로 로드하면 Kafka 환경을 쉽게 모니터링할 수 있습니다. JMX 기능은 Snowflake Kafka Connector 에 기본으로 enable 되어 있으니 별도의 설정이 필요 없습니다.

2.1 Kafka Connector 에 JMX enable 하기

Snowflake Kafka Connector 에 JMX 를 enable 하기 위해서는 다음 순서를 따릅니다.

a. 원격에서 실행중인 Kafka 로 JMX 커넥션을 생성하기 위해서는 Kafka 실행 스크립트에 KAFKA_JMX_OPTS 환경 변수를 설정합니다. hostname IP 는 현재 환경에 맞게 설정합니다.

export KAFKA_JMX_OPTS="-Dcom.sun.management.jmxremote 
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Djava.rmi.server.hostname=192.168.0.11
-Dcom.sun.management.jmxremote.rmi.port=9584"

b. 동일 서버에서 실행중인 Kafka 로 JMX 커넥션을 생성하기 위해서는 JMX_PORT 환경변수를 Kafka 실행 스크립트에 설정합니다.

export JMX_PORT=9584

이렇게 하면 커넥터가 제공하는 MBean을 사용해서 Kafka Connector 를 모니터링 할 수 있습니다. 이제 Prometheus 와 Grapana 를 사용해서 모니터링 대시보드를 만들어 보겠습니다.

2.2 Prometheus JMX exporter 다운로드 및 설정

우선 커넥터가 실행중인 노드 /opt/prometheus 에 Prometheus JMX exporter 를 다운로드 합니다. 커넥터에서 JMX 메트릭을 주기적으로 scraping 한 후 Prometheus 로 전송하는 역할을 하게 됩니다.

sudo wget https://repo1.maven.org/maven2/io/prometheus/jmx/jmx_prometheus_javaagent/0.3.0/jmx_prometheus_javaagent-0.3.0.jar

Prometheus JMX exporter 설정 파일을 kafka.yml 이름으로 만듭니다. 모든 메트릭을 scraping 할 것이므로 아래와 같이 설정합니다.

lowercaseOutputName: true
rules:
- pattern : ".*"

환경 변수를 아래와 같이 설정하고 커넥터를 재시작합니다.

export KAFKA_OPTS="-javaagent:/opt/prometheus/jmx_prometheus_javaagent-0.3.0.jar=7073:/opt/prometheus/kafka.yml"

2.3 Prometheus 설치

Prometheus 는 메트릭을 시계열 데이터로 수집하고 저장하는 오픈소스 시스템 모니터링 툴킷으로, 메트릭 정보는 key-value 를 label 로 하여 타임스탬프와 함께 저장됩니다. 다음 링크 https://prometheus.io/download 에서 Prometheus 가 설치될 서버의 OS 에 맞는 파일을 다운로드 하고 압축을 해제합니다. prometheus.yml 파일에 다음과 같이 JMX exporter 의 정보를 추가합니다.

scrape_configs:
- job_name: "prometheus"
static_configs:
- targets: ["localhost:9090"]
- job_name: "kafka-connector"
static_configs:
- targets: ["localhost:7073"]

Prometheus 를 재시작합니다.

./prometheus --config.file=prometheus.yml

2.4 Grafana 설치

Grafana 는 오픈소스 시각화 웹 애플리케이션으로, Prometheus 와 같은 데이터 소스에 연결되어 웹 차트, 그래프, 알람 등을 제공합니다. 다음 링크 https://grafana.com/docs/grafana/latest/setup-grafana/installation/ 안내에 따라 Grafana 를 설치하고 실행합니다.

2.5 수집 메트릭의 이해

Prometheus 가 메트릭을 잘 수집하는지 확인해보겠습니다. http://localhost:9090/ 로 이동한 후 아래처럼 Snowflake_kafka 로 시작하는 메트릭 이름이 검색되는지 확인합니다.

Snowflake Kafka Connector 의 JMX MBean 이 제공하는 메트릭을 Prometheus 에서 수집하는 화면

메트릭은 아래와 같은 포맷으로 수집되는데 이 값을 참고해서 JMX exporter 설정 시 사용했던 kafka.yml 파일의 pattern 에 regular expression 을 활용한 필터링이 가능합니다.

snowflake_kafka_connector_snowflakesink_count{category="buffer", instance="localhost:7073", job="kafka-connector", name="buffer-record-count", pipe="SNOWFLAKE_KAFKA_CONNECTOR_snowflakesink_PIPE_test_0"}

여기서 Snowflake Kafka Connector 가 제공하는 메트릭의 category 와 name 을 살펴보면 어떤 값을 모니터링 할 수 있는지 알 수 있습니다.

Snowflake Kafka Connector 에서 제공하는 메트릭 (일부)

전체 메트릭은 여기를 참고하세요. Category 를 latencies 로 선택하면 kafka-lag, commit-lag, ingestion-lag 로 latency 를 모니터링 할 수 있습니다.

kafka-lag: 레코드가 Kafka 에 넣어진 시간과 해당 레코드가 커넥터로 fetch 된 시간 사이의 차이(초)

commit-lag: 파일이 internal stage 로 업로드된 시간과 insertFiles REST API가 호출된 시간 사이의 차이(초)

ingestion-lag: 파일이 internal stage 로 업로드된 시간과 파일 수집 상태가 insertReport 또는 loadHistoryScan API를 통해 리포팅 된 시간 간의 차이(초)

2.6 대시보드 구성하기

Grafana 화면으로 이동합니다. http://localhost:3000/datasources 에서 Prometheus 기본 정보를 입력하면 Prometheus 를 Data source 로 쉽게 설정할 수 있고, 메인 화면에서 Create -> Dashboard 를 클릭하면 대시보드를 만들 수 있습니다.

빈 대시보드에서 Add a new panel 을 클릭하면 아래와 같은 화면이 보일텐데요. Data source 를 앞서 설정한 Prometheus 로 선택한 다음 1 번에서 metric 이름을 검색하고 2번에서 모니터링 하고자 하는 메트릭의 label (예, category, name, pipe) 을 선택한 후 3번에서 각 label 에 대한 값을 선택하면 해당 메트릭을 시계열로 보여주는 패널이 생성되며 이후 대시보드에 저장됩니다.

Grafana 에서 패널을 생성하는 화면

같은 방식으로 모니터링 하고자 하는 메트릭들을 하나의 대시보드로 구성한 화면입니다. latency 와 JVM 등 원하는 메트릭을 모니터링 할 수 있으며 Kafka Cluster 에도 앞서 설명한 JMX 설정과 JMX exporter 를 설치하면 모니터링이 가능합니다.

Grafana 로 생성한 Snowflake Kafka Connector 의 모니터링 대시보드

3. Snowpipe Streaming 을 활용한 Snowflake Kafka Connector (Preview 기능)

Snowflake Kafka Connector 버전 1.8.1 이상부터는 Snowpipe Streaming 을 활용할 수 있습니다. 앞서 설명한 세 가지 임계값 (flush 시간, buffer 메모리, 메시지 갯수) 중 어느 하나에 먼저 도달하면 Snowpipe Streaming API 가 호출되어 internal stage 에 임시 파일로 로딩되는 과정 없이 rowset 단위 데이터가 바로 Snowflake 테이블에 write 됩니다. 따라서 latency 와 비용이 더 낮아지는 개선된 아키텍처 입니다.

Snowpipe Streaming + Kafka Connector 아키텍처

설정 방법은 간단합니다. Kafka Connector 설정 파일 (SF_connect.properties) 에snowflake.ingestion.method 값을 SNOWPIPE_STREAMING으로 명시하면 Snowpipe 를 사용하지 않고 Snowpipe Streaming 가 사용되도록 동작합니다. 또한 타겟 테이블에서 Kafka 메시지의 중복이 없도록 exactly-once 처리가 보장됩니다. 버전 1.8.1 이상에서는 delivery.guarantee 의 default 값이 EXACTLY_ONCE 이므로 별도로 값을 명시하지 않아도 됩니다. 참고로 delivery.guarantee 는 1.7.0 에서부터 지원된 설정으로 1.7.0 에서 default 값은 AT_LEAST_ONCE 인데, 이를 Snowpipe 를 사용하는 아키텍처에서 EXACTLY_ONCE로 명시하면 exactly-once 가 보장됩니다.

지금까지 Snowflake Kafka Connector 로 스트림 데이터를 처리하는 방법에 대해 알아봤습니다. Snowpipe Streaming 아키텍처는 Snowpipe 를 사용하는 아키텍처 대비 latency 와 비용 측면에서 상당한 개선이 있을텐데요. 다음 글에서는 Snowpipe Streaming 아키텍처와 성능에 대해 좀 더 자세히 알아보도록 하겠습니다.

--

--