How to configure Snowflake Connector for Kafka
Apache Kafka is an open-source distributed publish-subscribe message system designed to provide quick, scalable and fault-tolerant handling of large volume of data feeds in real-time.
Kafka Connect is a free, open-source component of Apache Kafka that connects with external systems such as databases, key-value stores, search indexes, and file systems. A source connector collects data from a system (databases, streams tables, or message brokers). A sink connector delivers data from Kafka topics into other systems. The Snowflake Connector for Kafka (“Kafka connector”) reads data from one or more Apache Kafka topics and loads the data into a Snowflake table.
In this tutorial we are going to learn how to configure Snowflake Connector for Kafka.
For this exercise, we need
- kafka source. For this exercise, we are going to create kafka server in gcp
- Free databricks account to write code in python to send few messages to the kafka source
Creating kafka server in gcp:
1. Use the following url to create a free account in gcp. you will get $300 in free credits
https://cloud.google.com/free/
2. Create a free account in databricks community edition.
https://databricks.com/try-databricks
3. Login to gcp console and create a VM under Compute Engine. You can pick Apache server on Cent OS or Ubuntu server.
4. Once the server is created, go to the command line and edit the properties of listeners, advertised.listeners and listener.security.protocol.map parameters in server. properties file located in /opt/kafka/config folder.
5. Download snowflake-kafka-connector jar files using following commands and move them to /opt/kafka/libs folder.
sudo wget https://repo1.maven.org/maven2/org/bouncycastle/bc-fips/1.0.1/bc-fips-1.0.1.jar
sudo wget https://repo1.maven.org/maven2/org/bouncycastle/bcpkix-fips/1.0.3/bcpkix-fips-1.0.3.jar
sudo cp *.jar /opt/kafka/libs
6. The Kafka connector relies on key pair authentication rather than basic authentication (i.e. username and password). We need to create public-private key pair using OpenSSL. The public key is assigned to the Snowflake user defined in the configuration file.
Private Key: In Windows, open command prompt and enter the following command.
OpenSSL genrsa 2048 | OpenSSL pkcs8 -topk8 -v2 aes256 -inform PEM -out C:\tmp\cert\snowflake_key_v1.p8
Please note the Encryption Password (ex:12345678).
Public Key:
openssl rsa -in C:\tmp\cert\snowflake_key_v1.p8 -pubout -out C:\tmp\cert\snowflake_key_pub.pub (pass phrase:12345678)
7. Log into Snowflake account and complete the following activities.
— change the role
USE ROLE ACCOUNTADMIN;
— create database, schema, table and warehouse
CREATE DATABASE KAFKA_CONNECT_POC;
CREATE SCHEMA KAFKA_CONNECT_POC.KAFKA_CONNECT_MESSAGES;
CREATE TABLE KAFKA_CONNECT_MESSAGES.KAFKA_MESSAGES (RECORD_METADATA VARIANT, RECORD_CONTENT VARIANT);
CREATE WAREHOUSE KAFKA_WAREHOUSE WITH WAREHOUSE_SIZE = ‘XSMALL’ WAREHOUSE_TYPE = ‘STANDARD’ AUTO_SUSPEND = 180 AUTO_RESUME = TRUE;
— create db role
CREATE ROLE KAFKA_CONNECT_POC_ROLE;
— create necessary grants
GRANT USAGE ON DATABASE KAFKA_CONNECT_POC to role KAFKA_CONNECT_POC_ROLE;
GRANT USAGE ON SCHEMA KAFKA_CONNECT_POC.KAFKA_CONNECT_MESSAGES TO ROLE KAFKA_CONNECT_POC_ROLE;
GRANT CREATE TABLE ON SCHEMA KAFKA_CONNECT_POC.KAFKA_CONNECT_MESSAGES TO ROLE KAFKA_CONNECT_POC_ROLE;
GRANT CREATE STAGE ON SCHEMA KAFKA_CONNECT_POC.KAFKA_CONNECT_MESSAGES TO ROLE KAFKA_CONNECT_POC_ROLE;
GRANT CREATE PIPE ON SCHEMA KAFKA_CONNECT_POC.KAFKA_CONNECT_MESSAGES TO ROLE KAFKA_CONNECT_POC_ROLE;
GRANT OWNERSHIP ON TABLE KAFKA_MESSAGES to role KAFKA_CONNECT_POC_ROLE;
— assign role to snowflake user
GRANT ROLE “KAFKA_CONNECT_POC_ROLE” TO USER “XXKAFKASNOWFLAKE”;
GRANT USAGE ON WAREHOUSE “KAFKA_WAREHOUSE” TO ROLE “KAFKA_CONNECT_POC_ROLE”;
— assign default role to snowflake user
ALTER USER SKKAFKASNOWFLAKE set DEFAULT_ROLE = KAFKA_CONNECT_POC_ROLE;
— assign public key to snowflake user (change rsa_public_key to public key created in earlier step)
ALTER USER SKKAFKASNOWFLAKE SET rsa_public_key=’MIIBIjA…………M68fEE2FbXgZQIDAQAB’;
8. Connect to kafka VM and create a file snowflake_connect.properties in /opt/kafka/config folder with the following information (You will have to change the parameter values for snowflake.url.name, snowflake.user.name, snowflake.private.key and topics).
name=mykafkaconnectsnowflake
connector.class=com.snowflake.kafka.connector.SnowflakeSinkConnector
tasks.max=8
topics=poc
snowflake.topic2table.map=poc:KAFKA_TABLE_IN
buffer.count.records=1
buffer.flush.time=10
buffer.size.bytes=5
snowflake.url.name=CE91XX.us-central1.gcp.snowflakecomputing.com:443
snowflake.user.name=XXKAFKASNOWFLAKE
snowflake.private.key=MIIFL……………………+uEkN1pBH9Pvo9errDMhd
snowflake.private.key.passphrase=12345678
snowflake.database.name=KAFKA_CONNECT_POC
snowflake.schema.name=KAFKA_CONNECT_MESSAGES
value.converter.schema.registry.url=
key.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
value.converter=com.snowflake.kafka.connector.records.SnowflakeJsonConverter
9. Connect kafka VM and start zookeeper service & broker service. You can open multiple command prompts and run the following commands.
start zookeeper service:
1. cd /opt/kafka/
2. sudo bin/zookeeper-server-start.sh config/zookeeper.properties
start broker services:
1. cd /opt/kafka/
2. sudo bin/kafka-server-start.sh config/server.properties
create ‘poc’ topic: (replace xx.xxx.xxx.x with your kafka VM public IP address):cd /opt/kafka/
sudo bin/kafka-topics.sh - create - topic poc - bootstrap-server xx.xxx.xxx.x:9092list of topics:
cd /opt/kafka/
sudo bin/kafka-topics.sh --list --zookeeper localhost:2181
10. Login to Databricks community edition and open a Notebook.
Add a new cell to the notebook and enter the following in Cell 1 and run it:
%sh
pip install kafka-python==2.0.0
Add another cell, add the following code and run it. You will have to replace xx.xxx.xx.x with the public IP of kafka VM.
from kafka import KafkaProducer
bootstrap_servers = ['xx.xxx.xx.x:9092']
topicName = 'poc'
producer = KafkaProducer(bootstrap_servers = bootstrap_servers)
dict_poc = b'{"message}": "Hello World"}'
for i in range(100):
ack = producer.send(topicName, dict_poc)
metadata = ack.get()
#print(metadata.topic)
#print(metadata.partition)
11. Log into Snowflake account and query the table.
select * from “KAFKA_CONNECT_POC”.”KAFKA_CONNECT_MESSAGES”.”KAFKA_TABLE_IN”
If you would like us to evaluate and review your current data pipeline or help you modernize your data processing pipeline, please email us at info@datasurge.com or complete the form on our contact us page.