YugabyteDB CDC、Kafka、SnowflakeSinkConnectorを使ったデータストリーミング

Tomohiro Ichimura
The Distributed SQL Blog
24 min readSep 17, 2022

YugabyteDBは、クラウド・ネイティブ・アプリケーション向けの高性能かつトランザクションに対応した分散データベースですです。YugabyteDBの変更データキャプチャ(CDC: Change Data Capture)は、データの変更(挿入、更新、削除)を確実に識別、キャプチャし、自動的に別のデータリポジトリインスタンスに適用することで、アプリケーションやツールからの利用を可能とします。
Snowflakeは、クラウドベースのデータウェアハウスおよび分析サービスです。

本ブログでは、Confluentクラウド上のSnowflakeSinkConnectorを使って、YugabyteDBのChange Data Capture機能からKafka connect経由でSnowflakeへデータをストリームする方法を説明します。

1. 環境

  • Cloud: AWS
  • Snowflake: Standard Edition
  • Kafka: 3.2.0
  • Docker: 20.10.17

ここでは、id、name、marksの3つのカラムを持つstudentというテーブルを作成しました。このテーブルを使って、YugabyteDBのCDCを経由しSnowflakeのシンクにストリーミングします。

CREATE TABLE STUDENT(id int primary key, name text, marks int);yugabyte=# \d student
Table "public.student"
Column | Type | Collation | Nullable | Default
--------+---------+-----------+----------+----------------------
id | integer | | not null |
name | text | | | 'default_name'::text
marks | integer | | | 0
Indexes:
"student_pkey" PRIMARY KEY, lsm (id HASH)

2. Confluent Cloud上のKafka Sinkの設定

  • https://confluent.cloud/login にログインします。
  • Environments “に移動します。
  • クラスタを選択するか、新規にクラスタを作成します。

このページで、Schema Registryに移動し、有効にします。これは、Schema Registryベースのフォーマット(例えば、Avro、JSON_SR(JSON Schema)、またはProtobufなど)を使うために必須となります。この後、クラスターに移動します。

このページから、「データ統合」→「クライアント」→「新規クライアント」→「Javaクライアント」と進みます。

ここで、Create Kafka Cluster API Keyをクリックし、ダウンロードして続行します。

これらの認証情報を使用して、Kafka Connectを設定します。

これが完了したら、左側のパネルから「トピック」をクリックします。ここで新しいトピックを作成し、Kafka Connectがメッセージを送信する場所にします。トピックの名前は、<database.server.name>.<table.include.list>とします。(3.4 を参照)

トピックが作成されると、YugabyteDBの接続を開始することができます。

3. Confluent CloudでYugabyteDBとKafkaトピックを接続

Confluent Cloud 上で YugabyteDB と Kafka Topic を接続するには、2つの方法があります。

今回はKafka Connectを使います。

YugabyteDB から Snowflake にデータをストリーミングするには、まず Confluent クラウド上の Kafka Topicに接続する必要があります。

今回は docker を使用しますので、dockerdocker-compose を事前にインストールする必要があります。

1. YugabyteDB Debezium Connector .jar ファイルをダウンロード

作業用のディレクトリを作成し、YugabyteDB Debezium Connectorのjarファイルを、下記のコマンドでダウンロードします。

wget https://github.com/yugabyte/debezium-connector-yugabytedb/releases/download/v1.7.0.8-BETA/debezium-connector-yugabytedb-1.7.0.8-BETA.jar

2. Dockerfileの作成

次に、Dockerfileを作成します。

vi Dockerfile

作成したDockerfileに、下記の内容を追加します。

FROM confluentinc/cp-kafka-connect-base:6.0.1
COPY debezium-connector-yugabytedb-1.7.0.8-BETA.jar /usr/share/java/kafka

3. Image Fileの作成

下記のコマンドでImageを作成します。

docker build . -t <image-name>

4. Docker-Compose Fileを作成

「docker-compose.yaml」というファイルを作成します。

vi docker-compose.yaml

このファイルに以下の内容を追加し、適切な <stream-id>, <user-name>, <password>, <bootstrap-server>, <host-IP> を入れます。 <user-name> は Confluent クラウドから取得するAPI キー、<password> は API シークレットです。

version: '3'
services:
kafka-connect-ccloud:
image: <image-name>
container_name: kafka-connect-ccloud
ports:
- 8083:8083
environment:
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_CUB_KAFKA_TIMEOUT: 300
CONNECT_BOOTSTRAP_SERVERS: "<bootstrap-server>"
CONNECT_REST_ADVERTISED_HOST_NAME: 'kafka-connect-ccloud'
CONNECT_REST_PORT: 8083
CONNECT_GROUP_ID: kafka-connect-group-01-v04
CONNECT_CONFIG_STORAGE_TOPIC: _kafka-connect-group-01-v04-configs
CONNECT_OFFSET_STORAGE_TOPIC: _kafka-connect-group-01-v04-offsets
CONNECT_STATUS_STORAGE_TOPIC: _kafka-connect-group-01-v04-status
CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_LOG4J_ROOT_LOGLEVEL: 'INFO'
CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: '3'
CONNECT_PLUGIN_PATH: '/usr/share/java,/usr/share/confluent-hub-components/'
# Confluent Cloud config
CONNECT_REQUEST_TIMEOUT_MS: "20000"
CONNECT_RETRY_BACKOFF_MS: "500"
CONNECT_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_SASL_MECHANISM: "PLAIN"
CONNECT_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<username>\" password=\"<password>\";"
#
CONNECT_CONSUMER_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_CONSUMER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_CONSUMER_SASL_MECHANISM: "PLAIN"
CONNECT_CONSUMER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<username>\" password=\"<password>\";"
CONNECT_CONSUMER_REQUEST_TIMEOUT_MS: "20000"
CONNECT_CONSUMER_RETRY_BACKOFF_MS: "500"
#
CONNECT_PRODUCER_SECURITY_PROTOCOL: "SASL_SSL"
CONNECT_PRODUCER_SSL_ENDPOINT_IDENTIFICATION_ALGORITHM: "https"
CONNECT_PRODUCER_SASL_MECHANISM: "PLAIN"
CONNECT_PRODUCER_SASL_JAAS_CONFIG: "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"<username>\" password=\"<password>\";"
CONNECT_PRODUCER_REQUEST_TIMEOUT_MS: "20000"
CONNECT_PRODUCER_RETRY_BACKOFF_MS: "500"
command:
- bash
- -c
- |
echo "Launching Kafka Connect worker"
/etc/confluent/docker/run &
#
echo "Waiting for Kafka Connect to start listening on localhost:8083 ⏳"
while : ; do
curl_status=$(curl -s -o /dev/null -w %{http_code} http://localhost:8083/connectors)
echo -e $(date) " Kafka Connect listener HTTP state: " $curl_status " (waiting for 200)"
if [ $curl_status -eq 200 ] ; then
break
fi
sleep 5
done
echo -e "\n--\n+> Creating Kafka Connect source connectors"
curl -s -X PUT -H "Content-Type:application/json" \
http://localhost:8083/connectors/ybconnector1/config \
-d '{
"tasks.max":"1",
"connector.class": "io.debezium.connector.yugabytedb.YugabyteDBConnector",
"database.hostname":"<host-IP>",
"database.master.addresses":"<host-IP>:7100",
"database.port":"5433",
"database.user": "yugabyte",
"database.password":"yugabyte",
"database.dbname":"yugabyte",
"database.server.name": "dbserver",
"snapshot.mode":"never",
"table.include.list":"public.test",
"database.streamid":"<stream-ID>"
}'
#
sleep infinity

5. 作成したDocker-Compose Fileを実行

Docker-compose.yamlファイルを下記のコマンドで実行します。

docker-compose up

4. Kafkaにレコードが送信されているかどうかを確認

ストリーミングデータ用に作成したテーブルに、いくつかのレコードを挿入します。ここで、Confluent のクラウドページに移動し、トピックをクリックします。そして、トピックを選択します。

メッセージセクションの下に、最新の挿入を反映したメッセージが見えるはずです。(メッセージを見るには offset = 0 を挿入する必要があるかもしれません)。

5. Snowflakeのセットアップ:キーの作成と追加

こちらのリンクでSnowflakeアカウントにログインするか、アカウントをお持ちでない場合、新規に作成してください。

Kafka SnowflakeSinkConnectorがSnowflakeにデータをシンクする前に、キーペアを作成する必要があります。公開鍵はSnowflakeユーザーに追加され、Kafkaコネクターは認証のために秘密鍵が必要になります。

キーペアを作成するために、以下のコマンドを実行します:

openssl genrsa -out snowflake_key.pem 2048

これで秘密鍵が作成されます。公開鍵を作成するには:

openssl rsa -in snowflake_key.pem -pubout -out snowflake_key.pub

2つのファイル(snowflake_key.pemとsnowflake_key.pub)が作成されます。ここで、公開鍵(snowflake_key.pub)をコピーします。ヘッダーやフッターはコピーせず、鍵だけをコピーするように注意してください。

また、コピーするキーは1行にまとめ、複数行に分かれていないことを確認してください。下記のコマンドを実行することで、必要なキーを取得することができます:

grep -v “BEGIN PUBLIC” snowflake_key.pub | grep -v “END PUBLIC”|tr -d ‘\r\n’

次に、Snowflakeで「Worksheets」をクリックし、新しいワークシートを作成します。右上のSECURITYADMINロールに切り替えます。

ワークシートで、以下のクエリを実行します:

CREATE USER <user-name> RSA_PUBLIC_KEY=’<public-key>’;

お好みの<user-name>を入力することが可能です。このユーザー名は、Kafkaコネクタの設定で必要になります。

Kafkaにメッセージをプッシュさせたいデータベースを作成する必要があります。

CREATE DATABASE <db-name>;

必要であれば、スキーマとKafkaメッセージを常駐させるテーブルを作成することができます。今回の例では、データベースを作成したときに自動的に作成されるPUBLICスキーマを使用しました。

ユーザーとデータベースを作成したので、このユーザーに必要な特権を割り当てる必要があります。Kafka Snowflake sink connectorのドキュメントでは、usage, create table, create pipe, create stageの権限のみが必要であると記載されています。しかし、「不正アクセス」や「権限不足」のエラーを防ぐため、作成したユーザーにデータベースとスキーマの全権限を付与します。

USE ROLE SECURITYADMIN;
GRANT ALL PRIVILEGES ON DATABASE TO ROLE securityadmin;
GRANT ALL PRIVILEGES ON SCHEMA <db-name>.PUBLIC TO ROLE securityadmin;
GRANT ROLE securityadmin TO USER <user-name>;
ALTER USER <user-name> SET DEFAULT_ROLE=securityadmin;

下記のコマンドでsecurityadminロールが割り当てられたことを確認します:

DESC USER <user-name>

下記のコマンドでsecurityadminロールが全ての権限が割り当てられたことを確認します:

SHOW GRANTS TO ROLE securityadmin

6. Confluent CloudでのKafka SnowflakeSinkConnectorのセットアップ

Confluent クラウドにアクセスし、左側のパネルで、Data Integration -> Connectors に進み、右側で「Add Connector」ボタンをクリックします。

ここで、コネクタの一覧からSnowflake Sinkを選択します。

その上、下記の情報が必要です:

  • 接続URL

・接続URLのフォーマットは、<アカウント名>.<地域ID>.<クラウド>.snowflakecomputing.comです。

・注意:snowflakeの場合、<アカウント名>はログインユーザー名と同じではありません。必要なアカウント名はsnowflakeのURLの中にあります。

https://app.snowflake.com/<region_id>.<cloud>/<account-name>/...
  • 接続ユーザー名

・先ほど作成した<ユーザー名>です。

  • 秘密鍵

・これは先ほど作成したsnowflake_key.pemの鍵ですが、ヘッダやフッタがなく、鍵の内容を1行のみにまとめたものとなります。

・下記のコマンドで秘密鍵から必要な部分のみを取得することができます。

grep -v "BEGIN RSA PRIVATE KEY" snowflake_key.pem | grep -v "END RSA PRIVATE KEY"|tr -d '\r\n'
  • 秘密鍵の復号化キー

・暗号化された秘密鍵を使用する場合は、その復号化鍵を指定する必要があります。今のところ空白にしておいてください。

  • Snowflake データベース名

・先ほど作成したデータベース名。

  • スキーマ名

・スキーマが作成されている場合は、そのスキーマの名前を指定してください。なお、今回はデフォルトのPUBLICスキーマを使用しました。

  • トピックとテーブルのマッピング

・Kafkaトピックからのメッセージを特定のテーブルに投入したい場合は、そのマッピングを指定する必要があります。このフィールドを空白にすると、SnowflakeSinkConnector は新しいテーブルを作成し、その中にメッセージを投入します。

すべて完了したら、「続ける」をクリックします。

適切な入力形式を選択してください。ここではJSONを使用しました。Advanced Configurationsでは、様々な接続の詳細を変更することができます。つまり、キャッシュデータをフラッシュするレコード数、コネクタレコードバッファのサイズ、メタデータを含めるかどうかなどのパラメータを決定することができます。また、必要に応じて、ここでトランスフォームを追加することもできます。今回はデフォルト値を使用しました。

上記すべてが完了したら、次のページに移動し、コネクタに適切な名前を付け、すべての設定を確認してください。ここで続行をクリックすると、SnowflakeSinkConnector が起動します。

何かエラーが発生した場合は、下記のトラブルシューティングのセクションを参照してください。

最後に Confluent Cloud 上で、コネクタをクリックすると下記のような画面が表示されるはずです。

7. Snowflakeでクエリを実行して関連情報を抽出する(最終テスト)

Kafkaコネクタは、メッセージを単一のJSONとして送信します。SnowflakeSinkConnectorからレコードをコピーしたテーブルには、1レコードにつき1つのJSONがあります。JSONレコードのサンプルは下記となります:

{
"content": {
"__deleted": "false",
"id": 1,
"marks": 100,
"name": "sumukh"
},
"meta": {
"CreateTime": 1659072043765,
"key": {
"payload": {
"id": 1
},
"schema": {
"fields": [
{
"field": "id",
"optional": false,
"type": "int32"
}
],
"name": "topic4.public.student.Key",
"optional": false,
"type": "struct"
}
},
"offset": 0,
"partition": 0,
"topic": "topic4.public.student"
}
}

有用な情報を抽出するためには、少々工夫したSQLを使う必要があります:

SELECT f.path,f.value FROM <table-name> p, lateral flatten(input => p.message:content, recursive => true) f WHERE f.seq=100;

以下のような結果が得られるはずです:

8. トラブルシューティング

ここでは、いくつかのよくある問題点をご紹介します。

  • “Failed to create a pipe due to insufficient privileges on the table.”

この問題は、SnowflakeSinkConnectorが新しいテーブルを作成するときに発生します。デフォルトでは、作成されたユーザーはこのテーブルへの書き込みに必要な権限を持っていません。これを解決するには、Snowflake で下記のクエリを実行します:

SHOW TABLES;

上記のコマンドで、すべてのテーブルのリストが表示されます。自動作成されたテーブル名はトピック名で始まり、その後に数字が続きますので、その<table-name>をコピーし、下記のコマンドを実行します:

GRANT ALL PRIVILEGES ON TABLE <table-name> TO ROLE securityadmin;
  • “Failed. Please make sure that the stage provided exists and the user has the right permission to operate on the SnowflakeSink stage.”

SnowflakeSinkConnector は内部ステージにレコードを保存しますが、このステージは自動作成され、必要な権限は付与されていません。

SHOW STAGES;

上記のコマンドで、すべてのステージのリストが表示されます。自動作成されたステージ名はコネクタ名で始まり、末尾にテーブル名を持つので、その<stage-name>をコピーし、下記のコマンドを実行します:

GRANT ALL PRIVILEGES ON STAGE <stage-name> TO ROLE securityadmin;
  • Snowflakeにデータが反映されていないが、KafkaのSnowflakeSinkConnectorが動作し、メッセージの処理が行われている。

可能性は2つあります。

”Not enough records”: 一つ目はレコード数が足りない場合です。キャッシュデータをフラッシュするレコード数のデフォルト値(最小値でもある)は10,000です。十分なレコードがない場合、それらはSnowflakeに反映されません。

”Enough records, but still no rows populated in Snowflake”; 二つ目はレコード数は十分ですが、それでもSnowflakeに入力されていない場合です。

この問題はSnowflakeSinkConnector が作成するテーブルに、バリアント型 (RECORD_CONTENT と RECORD_METADATA) の 2 つのカラムがあるが、メッセージには 1 つの JSON がある場合に発生します。

この問題が発生した場合、データはステージに存在しますが、テーブルには入力されません。この問題を解決するには、ステージからテーブルにコンテンツを手動でコピーする必要があります。

CREATE TABLE <new-table-name> (message variant);
GRANT ALL PRIVILEGES ON <new-table-name> TO ROLE securityadmin;
COPY into <new-table-name> FROM @<stage-name> file_format = (type = json);

9. まとめ

このブログでは、Kafka connectを使用してYugabyteDBからSnowflakeにデータをストリームするパイプラインを設定する手順を説明しました。CDCメッセージは単一のJSONとして保存され、それを照会して関連情報を抽出するために必要なすべてのツールを備えていることが、おわかりいただけたと思います。

ご質問はお気軽にお寄せください。また、当社のコミュニティSlackに参加して、エンジニア、エキスパート、ユーザーとリアルタイムでチャットすることも可能です。

本記事は、The Distributed SQL Blogsにて2022年8月16日に公開されたData Streaming Using YugabyteDB CDC, Kafka, and SnowflakeSinkConnectorを翻訳および一部訳注を追加しております。最新情報は英語版の記事を参照してください。

--

--