Unlocking Real-Time Insights: Streaming Azure Event Hub Data to Snowflake

Streaming data from Kafka, Azure Event Hubs, etc. is a common practice in modern data processing and analytics. These platforms are designed to handle large volumes of real-time data and provide a reliable way to ingest and process streaming data. Event Hub is commonly used in scenarios where data needs to be processed and analyzed in near real-time, such as monitoring, analytics, and event-driven applications.

Getting data out of Azure Event Hub and into a database for near real-time insight can present several challenges, primarily related to data processing, transformation, infrastructure management, and monitoring. Many Snowflake customers sink data in blob storage, from Azure Event Hub, before loading it into Snowflake, which increases the overall latency. Low latency is essential for various applications because it directly impacts the responsiveness, and user experience; for example, Clickstream data latency can pose several challenges such as delayed insights and missed opportunities.

To overcome these challenges Snowflake offers various features such as Snowpipe Streaming, Kafka Connector, Snowpark Container Services (Private Preview in AWS), and Streamlit in Snowflake (Public Preview in AWS), which help you build a data pipeline and monitor it without managing any infrastructure i.e. all parts of managed service.

To build a pipeline from Azure Event Hub to Snowflake you will need a compute cluster/VM to host Kafka Connect. This compute cluster is difficult to manage. But with the help of Snowpark Container Services, this can be implemented with ease of use.

Data Flow from Event Hub to Snowflake

Snowpark Container Services (SPCS), is a fully managed container service based on Kubernetes. SPCS allows, SREs and Developers to bring their own docker container, upload it into a private repository within Snowflake, and run it in the compute cluster of your choice (GPU, high memory, etc). so, using SPCS you can deploy the Kafa Connect container to get data from Azure Event Hub. You can check this article to get more details on Snowpark Container Service.

Snowflake Kafka Connector works with Kafka Connect without a single line of code, all you need is the configuration details of your Snowflake Account.

In order to set up the data pipeline, you need to install some tools on your computer: Docker Desktop to create a container image and push it to the private repository, Snowcli (optional) to upload files to Snowflake Stage. You also need to set up Azure EventHub or if it already exists, have configuration info. Check this out here how to find it.

Here is the simple three steps approach to accomplish this:

Step 1: Create a Docker Container for Kafka Connect:

Create a folder and create the following four files in that folder:
1. Dockerfile: To create a docker container (no need to change this file)
2. startup.sh: Execute when the docker container starts, (no need to change this file)
3. source-connect-distributed.properties: Config for Azure Event Hub (you must change some fields based on your setup)
4. snowflake_config.json: Snowflake configuration file (you must change some fields based on your setup)
Please note that: from the following code only change the one in <CHANGE CAPTIAL LETTER TEXT> all others remain the same unless you know what you are doing.

#Dockerfile
FROM confluentinc/cp-kafka-connect:latest
ENV CONNECT_PLUGIN_PATH: "/usr/share/confluent-hub-components"
# install snowflake kafka connector
RUN confluent-hub install --no-prompt snowflakeinc/snowflake-kafka-connector:latest
# copy properties files for source (Azure Event Hub) and sink (snowflake)
USER appuser
# source and sink config properties
COPY source-connect-distributed.properties /home/appuser/
COPY snowflake_config.json /home/appuser/
# startup script
COPY startup.sh /home/appuser/startup.sh
CMD ["chmod +x /home/appuser/startup.sh" ]
ENTRYPOINT ["/home/appuser/startup.sh"]
#!/bin/bash
#startup.sh file
export CONNECT_REST_ADVERTISED_HOST_NAME=$HOSTNAME
cat "$HOME/source-connect-distributed.properties" | sed "s/<<spcshost>>/$CONNECT_REST_ADVERTISED_HOST_NAME/g" > $HOME/eventhub-connect-distributed.properties
sleep 15 && curl -X PUT -H "Content-Type: application/json" --data @/home/appuser/snowflake_config.json http://localhost:8084/connectors/snow-connect/config &
/usr/bin/connect-distributed $HOME/eventhub-connect-distributed.properties
#source-connect-distributed.properties file

bootstrap.servers=<CHANGE YOUR-EVENTHUB-NAMESPACE>.servicebus.windows.net:9093
group.id=connect-cluster-group
#event-hub-for-snowpipe-streaming-consumer-group-01

# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status

# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1

rest.advertised.host.name=<<spcshost>>
rest.advertised.listener=http
rest.advertised.port=8084
listeners=http://localhost:8084
offset.flush.interval.ms=2000

key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.json.JsonConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter

internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false

# Namespace sas
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="<CHANGE PRIMARY KEY VALUE start with Endpoint=sb:>";

consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="<CHANGE PRIMARY KEY VALUE start with Endpoint=sb:>";

plugin.path=/usr/share/confluent-hub-components
#snowflake_config.json file
{
"connector.class": "com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max": "8",
"snowflake.enable.schematization":"TRUE",
"topics": "<CHANGE YOUR TOPIC NAME>",
"snowflake.topic2table.map": "<CHANGE YOUR TOPIC NAME>:<CHANGE SNOWFLAKE TABLE NAME>",
"buffer.count.records": "100",
"buffer.flush.time": "2",
"buffer.size.bytes": "20000",
"snowflake.url.name": "<CHANGE YOUR SNOWFLAKE ACCOUNT>.snowflakecomputing.com:443",
"snowflake.user.name": "<CHANGE SNOWFLAKE USER>",
"snowflake.private.key":"<CHANGE YOUR PRIVATE KEY, SHOULD BE IN ONE LINE>",
"snowflake.database.name": "<CHANGE YOUR SNOWFLAKE DB NAME>",
"snowflake.schema.name": "<CHANGE YOUR SNOWFLAKE SCHEMA NAME>",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"snowflake.role.name": "kafka_connector_role",
"snowflake.ingestion.method": "SNOWPIPE_STREAMING",
"value.converter.schemas.enable": "false"
}

Once you have all the above files in the folder, run the following command in the shell/console to create a docker image, make sure you are in the folder where the above file exists:

#create docker container
docker build -t kafkaconnect:dev .

# test it , check it pickup your snowflake config file
docker run --rm -it kafkaconnect:dev /bin/bash

#your docker container are ready to run in Snowflake

Step 2. Create various objects related to Snowpark Container Service: Login to Snowflake Snowsight, make sure to change the role to ACCOUNTADMIN, and run the following commands. (You have to run this only once!)

USE ROLE ACCOUNTADMIN
SET PWD = '<CHANGE SOMETHING YOU REMEMBER>';
SET USER = 'SPCS_USER';

CREATE USER IF NOT EXISTS IDENTIFIER($USER) PASSWORD=$PWD
COMMENT='FOR SNOWPARK CONTAINER SERVICE USER';
CREATE ROLE if not exists spcs_role;
GRANT ROLE SYSADMIN TO ROLE spcs_role;
GRANT IMPORTED PRIVILEGES ON DATABASE snowflake TO ROLE spcs_role;
GRANT ROLE spcs_role TO USER IDENTIFIER($USER);
GRANT BIND SERVICE ENDPOINT ON ACCOUNT TO ROLE spcs_role;

-- this step is important so when loign in container this role applies.
ALTER USER IDENTIFIER($USER) SET DEFAULT_ROLE = spcs_role;

CREATE SECURITY INTEGRATION IF NOT EXISTS snowservices_ingress_oauth
TYPE=oauth
OAUTH_CLIENT=snowservices_ingress
ENABLED=true;

-- change role to create object based on custome role we created
USE ROLE SPCS_ROLE;

CREATE OR REPLACE DATABASE spcs_db;

CREATE OR REPLACE WAREHOUSE spcs_wh
WAREHOUSE_SIZE = XSMALL
AUTO_SUSPEND = 120
AUTO_RESUME = TRUE;

ALTER USER IDENTIFIER($USER) SET
DEFAULT_WAREHOUSE=spcs_wh
DEFAULT_NAMESPACE=spcs_db.public;

-- Internal secure stage for yaml files
CREATE STAGE IF NOT EXISTS specs
ENCRYPTION = (TYPE='SNOWFLAKE_SSE');

-- private image repository
CREATE OR REPLACE IMAGE REPOSITORY images;

CREATE COMPUTE POOL IF NOT EXISTS std1_pool
MIN_NODES = 1
MAX_NODES = 1
INSTANCE_FAMILY = standard_1;

SHOW IMAGE REPOSITORIES;
-- please note repository_url and extract hostname
-- orgname-accountname.registry.snowflakecomouting.com
-- hostname you will use to login to registry

-- some useful commands for you reference
describe compute pool std1_pool;
SHOW IMAGE REPOSITORIES;
show compute pools;
alter compute pool std1_pool resume;
-- to stop consuming credits
alter compute pool std1_pool stop all;


alter compute pool std1_pool suspend;

Step 3: Create Service in Snowpark Container Service

First, Push the container (you created in Step1) to the private image repository you created above, to do that, you need to log in to the Snowflake account using the user “SPCS_USER” you created in step 2.

# run on shell
# login to registry
docker login <CHANGE orgname-accountname>.registry.snowflakecomputing.com/spcs_db/public/images -u spcs_user -p <CHANGE>
# tag the docker image
docker tag kafkaconnect:dev <CHANGE orgname-accountname>.registry.snowflakecomputing.com/spcs_db/public/images/kafkaconnect:dev
# push docker image to snowflake private image repository
docker push <CHANGE orgname-accountname>.registry.snowflakecomputing.com/spcs_db/public/images/kafkaconnect:dev

Second, create the YAML specification file from below, and upload it to the internal stage SPEC created in Step 2. This file identifies which docker image to use and various service parameters when it creates the service.

#kafka-connect.yaml file
spec:
containers:
- name: kafkaconnect
image: <CHANGE ORGNAME-ACCOUNTNAME>.registry.snowflakecomputing.com/spcs_db/public/images/kafkaconnect:dev
endpoints:
- name: kafkaconnect
port: 8084
public: true
networkPolicyConfig:
allowInternetEgress: true
# Run on shell
# if you have install snowcli then use following to upload stag
#or use snowsight to upload yaml file into stage "SPEC" you created
#or use snowsql

snow stage put ./kafka-connect.yaml specs --overwrite --connection spcs_env

By now, you have a docker container in the Snowflake Image Repository, a compute pool to host the container, and a YAML file in the internal stage,. So finally, you can create a Snowpark Container Service.

-- make sure compute pool is running, ready or idle state
SHOW COMPUTE POOLS;

-- create service - this will deploy docker container in compute pool
CREATE SERVICE kafkaconn_service
IN compute pool std1_pool
FROM @specs SPEC='kafka-connect.yaml';

-- check the status of service
CALL SYSTEM$GET_SERVICE_STATUS('kafkaconn_service');

-- check the logs in the docker containers
CALL SYSTEM$GET_SERVICE_LOGS('kafkaconn_service', '0', 'kafkaconnect',1000);

That is all you need to set up a data pipeline, everything is managed once deployed. You can monitor services and do analytics using Streamlit in Snowflake in near real-time. Here is what the data flow looks like:

Snowflake Data Cloud Platform

Here is the overall data pipeline with transformation using Streamlit in Snowflake (see code below). You can see latency from Azure Event Hub to Snowflake in 3 sec and with transformation, latency is 11 sec. I used the Triggered task feature for transformation. As described in my previous article.

Snowpipe Streaming Latency - Raw: 3 sec, End-to-End with Transformation: 11sec
#streamlit in snowflake
import streamlit as st # web development
import pandas as pd
import time # to simulate a real time data, time loop
from snowflake.snowpark.context import get_active_session

sess = get_active_session()

def exec_sql(sess, query):
try:
rowset=sess.sql(query)
except Exception as e:
st.error("Oops! ", query, "error executing", str(e), "occurred.")
return pd.DataFrame()
else:
try:
tdf = pd.DataFrame(rowset.collect())
except Exception as e1:
st.error(str(e1))
return pd.DataFrame()
else:
return tdf
return

st.set_page_config(
page_title = 'Real-Time Dashboard for Advertisement Spend ',
page_icon = '✅',
layout = 'wide'
)

# dashboard title

st.title("Real-Time Advertising Spend Dashboard")

placeholder = st.empty()
# near real-time / live messeges simulation
cquery='select channel, round(sum( cost ),0) spent , count(*) clicks\
from streamdb.eventhub.eventhub_data \
group by channel ;'
cquery2='select campaign, round(sum( cost ),0) spent , count(*) clicks\
from streamdb.model.evenhub_final \
group by campaign ;'
for seconds in range(100000):
#while True:
df1 = exec_sql(sess,"with a as ( \
select \
TO_TIMESTAMP_ltz(to_number(record_metadata:CreateTime)/1000) cts, \
to_timestamp_ltz(to_number(timestamp)) ots, \
datediff('ms',ots,cts) latency \
from streamdb.eventhub.eventhub_data \
) \
select count(*) cnt, round(avg(abs(latency)/100)) lat from a;")

sps_lat = int(df1["LAT"])
sps_cnt = int(df1["CNT"])
df2 = exec_sql(sess,"select round(AVG(datediff('second',orig_ts,last_ts))) e2elat \
from streamdb.model.evenhub_final")
e2elat = int(df2["E2ELAT"])

with placeholder.container():
# create three columns
kpi1, kpi2, kpi3 = st.columns(3)

# fill in those three columns with respective metrics or KPIs
kpi1.metric(label="Latency Snowpipe Streaming (in sec)", value=sps_lat)
kpi2.metric(label="Count", value=sps_cnt )
kpi3.metric(label="End to End Latency (in sec)", value=e2elat)

# create two columns for charts

adf = exec_sql(sess, cquery2)
bdf = exec_sql(sess, cquery)
fig_col1, fig_col2 = st.columns(2)
with fig_col1:
st.markdown("### By Channel - Data View")
st.dataframe(bdf)
with fig_col2:
st.markdown("### By Campagin - Chart")
tdf = adf.set_index('CAMPAIGN')
fig2 = st.bar_chart( tdf)

time.sleep(1)

Snowpark Container Services gives you a unified service experience by allowing you to deploy images securely and provide managed cost-effective compute infrastructure. This is one use case you can implement, but many use cases can run using this service such as bringing and running your own LLM model next to your data in Snowflake, training ML model using Nvidia GPU, and data-intensive applications. Furthermore, you can use this to build applications using the Snowlake Native Apps framework. The idea behind this service is to bring code near the data to avoid latency, security/governance nightmare, performance, and egress/managing cost data outside of Snowflake.

I hope this will be useful in your data engineering project!

Many Thanks to Vishal Verma for helping with this.

Important: In this article, we use https://hub.docker.com/r/confluentinc/cp-kafka-connect docker container, so read the license section.

Disclaimer: The opinions expressed in this post are my own and not necessarily those of my employer (Snowflake).

--

--