Simplifying Real-Time Data Ingestion: Stream Azure Event Hubs events into Snowflake with Snowpipe Streaming

Dated: 25-May-2023

Contributors: Nagesh Cherukuri ,Adrian Lee Xinhan

Introduction

In today’s data-driven world, real-time data processing is becoming increasingly crucial for businesses to gain insights, make informed decisions, and deliver great customer experiences. Azure Event Hubs, a highly scalable event streaming platform for managing real-time data ingestion and processing. To simplify and enhance the real-time data processing pipeline, Snowflake’s Snowpipe Streaming integration with Azure Event Hubs offers a powerful solution that ensures seamless data flow, reliability, and scalability. In this blog post, we will explore the benefits of using Snowpipe Streaming with Azure Event Hubs and how it can simplify real-time data processing.

This document encompasses the steps involved in streaming data from Azure Event Hub using Snowflake Connector for Kafka and also using Snowflake Ingest SDK(in the part-2 of the blog). Also we included a set of best practices and fundamental settings that can be employed to configure Snowpipe Streaming for achieving high throughput and lower latency. Implementing these recommendations can significantly enhance the overall performance of the streaming ingest process.

Why Snowpipe Streaming

Why do we need Snowpipe streaming when we already have Snowpipe ?

To understand why we need Snowpipe Streaming let's understand some of the limitations with Snowpipe.

Snowpipe is a powerful tool for ingesting data into Snowflake, but it does have some limitations to consider:

  • When there are a large number of small files, it can result in throttling and this happens because Snowpipe is optimised for certain file size, and processing many small files can cause overhead that slows down the ingest process. To avoid this issue, it’s recommended to consolidate small files into larger file size roughly 100–250 MB (or larger) before ingesting them with Snowpipe.
  • Another consideration with Snowpipe is the cost of ingestion. The cost is broken down into two parts: the charge for the Snowflake-managed warehouse used to load the data from the files to Snowflake tables, and charges for every 1000 files which is staged before loading into snowflake tables. While Snowpipe can be an efficient way to ingest data, it’s important to keep these costs in mind and optimize your usage accordingly. For example, consolidating files and minimizing the number of ingests can help reduce costs.

Snowpipe Streaming avoids this problem by using ingest API where it writes rows of data to Snowflake tables, unlike bulk data loads or Snowpipe, which write data from staged files. This architecture results in lower load latencies, with corresponding lower costs for loading similar volumes of data, which makes it a powerful feature for handling real-time data streams.

Note: Snowpipe Streaming is very useful in scenarios where you are getting stream (smaller set ) of data continuously throughout the day and you want to ingest those events in Snowflake with low latency like within few seconds. But if you are ingesting file from cloud storage or you want to ingest data from Kafka topics without any constraints on SLA, then Snowpipe is still a very good option in such scenarios.

https://docs.snowflake.com/en/user-guide/data-load-snowpipe-streaming-overview

As you can see from the above screenshot with Snowpipe Streaming service we are ingesting data from Kafka topics or files, files,weblogs using Snowflake Ingest SDK. The Snowflake Connector for Kafka leverages the Snowflake Ingest SDK to seamlessly stream data from Kafka topics and directly populate the Snowflake tables. This integration only requires the inclusion of an extra key in the Kafka connect configuration file.

“snowflake.ingestion.method”:”SNOWPIPE_STREAMING”

Below sections has details about how to ingest data using Snowpipe Streaming.

Steps to Ingest data from Azure Event Hubs using Snowflake Connector for Kafka

Requirements

Below are the services used for the setup.

  1. Azure Event Hubs ( Standard/Premium )
  2. Azure VM (Standard D4s v3 (4 vcpus, 16 GiB memory)) to run the Kafka Connect on the same region as Azure EH. You can use a docker or any VM.
  3. VM/Docker to push data to Event Hubs. For this blog we have used a Azure VM which is on the same region as of EH.
  4. Snowflake Connector for Kafka 1.9.1 or later
  5. Snowflake Account

Create Azure Event Hubs

You can follow the below instructions mentioned in the link below to create an Azure Event Hub namespace and event hub. Make sure to spin the standard edition for the pricing tier and not the basic as it does not support Kafka endpoints.

https://learn.microsoft.com/en-us/azure/event-hubs/event-hubs-create

While creating the partition count you have to experiment with the right number that suits your workload. Initially, you can start with 2 or 3 partitions to assess the overall throughput and increase the value based on your throughput needs. It is crucial to use appropriate values for TU (Throughput Units) or PU (Processing Units) when working with the Premium Tier. Insufficient TU provisioning can negatively impact ingestion throughput. You may explore the option of enabling auto-inflate and setting a maximum value that suits your workload, service level agreements (SLAs), and pricing considerations.. Below is an example of the Event Hub setting that we are using for this blog post.

Take note of the event hub namespace which is kafkaeh in this case. Also take note of the host name which in this case would be kafkeh.servicebus.windows.net. This will be used to specify the value for the bootstrap servers in the Kafka Connect configurations.

Setup Kafka Connect

To set up Kafka Connect with a one worker node for Snowflake following steps needs to be performed. You can run the Kafka Connect on a Azure VM or on a Docker container as well.

Download and Install the Kafka on the worker node(VM/Docker) by following the below steps:

  • Create a downloads folder on your home directory $ mkdir Downloads
  • Run the following command which downloads the Kafka version 2.8.2
$curl "https://downloads.apache.org/kafka/2.8.2/kafka_2.13-2.8.2.tgz" -o ~/Downloads/kafka.tgz
  • Create a directory with name kafka in our home directory (/home/azureuser)
$ mkdir kafka && cd /home/azureuser/kafka
  • Extract the tar file by running
$ tar -xvzf ~/Downloads/kafka.tgz --strip 1
  • Once Kafka is installed, download the snowflake kafka connect libraries and move them to /home/azureuser/kafka/libs folder. You can run the below commands to download the required libraries for snowflake-kafka-connector. Ensure you are downloading the latest version and at the time of writing this article the latest version was 1.9.1.
$ sudo wget https://repo1.maven.org/maven2/com/snowflake/snowflake-kafka-connector/1.9.1/snowflake-kafka-connector-1.9.1.jar -P /home/azureuser/kafka/libs/
$ sudo wget https://repo1.maven.org/maven2/org/bouncycastle/bc-fips/1.0.1/bc-fips-1.0.1.jar -P /home/azureuser/kafka/libs/
$ sudo wget https://repo1.maven.org/maven2/org/bouncycastle/bcpkix-fips/1.0.3/bcpkix-fips-1.0.3.jar -P /home/azureuser/kafka/libs/
  • Ensure java sdk is installed and you can check that by running $ java –version. If Java is not installed then you can run the below command to install openjdk.
$ sudo apt-get install openjdk-11-jdk

Replace the following parameters with the parameters inside your own account

bootstrap.servers=testkafhkeh.servicebus.windows.net:9093
group.id=connect-cluster-group
# connect internal topic names, auto-created if not exists
config.storage.topic=connect-cluster-configs
offset.storage.topic=connect-cluster-offsets
status.storage.topic=connect-cluster-status
# internal topic replication factors - auto 3x replication in Azure Storage
config.storage.replication.factor=1
offset.storage.replication.factor=1
status.storage.replication.factor=1
rest.advertised.host.name=connect
offset.flush.interval.ms=10000
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter=org.apache.kafka.connect.storage.StringConverter
internal.value.converter=org.apache.kafka.connect.json.JsonConverter
internal.key.converter.schemas.enable=false
internal.value.converter.schemas.enable=false
# required EH Kafka security settings
security.protocol=SASL_SSL
sasl.mechanism=PLAIN
sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://testkafhkeh.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=dfdfdfdsfdf";
producer.security.protocol=SASL_SSL
producer.sasl.mechanism=PLAIN
producer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://testkafhkeh.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=dfdfdfdsfdf";
consumer.security.protocol=SASL_SSL
consumer.sasl.mechanism=PLAIN
consumer.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://testkafhkeh.servicebus.windows.net/;SharedAccessKeyName=RootManageSharedAccessKey;SharedAccessKey=dfdfdfdsfdf";
plugin.path=/home/azureuser/kafka/libs

You should get the value for the password in the above configuration from Azure Portal as shown below. Ensure the value for the password is ending with a semicolon(;).

Once you have updated the connect-distribution file you would need to create another file which will have details about the snowflake account details and the Kafka topics name. Create a file with the name SF_Connect.json and add the following lines. To know more about the configuration, refer to the link below.

https://docs.snowflake.com/en/user-guide/kafka-connector-install#distributed-mode

{
"name":"snowflakesink",
"config":{
"connector.class":"com.snowflake.kafka.connector.SnowflakeSinkConnector",
"tasks.max":"16",
"topics":"vehicle_partition",
"buffer.count.records":"1000",
"buffer.flush.time":"2",
"buffer.size.bytes":"2000000",
"snowflake.url.name":"ana95816.snowflakecomputing.com:443",
"snowflake.user.name":"snowflake_user_namr",
"snowflake.private.key":"MIIEowIBAAKCAQEAsJFqSunLz+pA016RVK",
"snowflake.database.name":"snowpipe_streaming",
"snowflake.schema.name":"dev",
"key.converter":"org.apache.kafka.connect.storage.StringConverter",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable":"false",
"snowflake.role.name":"accountadmin",
"snowflake.ingestion.method":"SNOWPIPE_STREAMING"
}
}

I have kept the buffer.flush.time to 2 seconds to reduce the latency.

  • Start the Kafka connect in distributed mode. Go to the Kafka folder and run the below command.
$ ./bin/connect-distributed.sh config/connect-distributed.properties
  • After starting Kafka Connect, proceed to open a new terminal and execute the following CURL commands. Navigate to the config folder within the Kafka home directory and run the following commands:.
$ curl -X POST -H "Content-Type: application/json" --data @SF_Connect.json http://localhost:8083/connectors
  • To check all the connectors, run the following command.
$ curl -s -X GET "http://localhost:8083/connectors/"
  • To check the name of the topic for the connector
$ curl -s -X GET "http://localhost:8083/connectors/snowflakesink/topics" | jq '.'
  • To delete a connector, run the following command.
$ curl -s -X DELETE "http://localhost:8083/connectors/snowflakesink"

By running through the above steps you should be able to successfully ingest streaming data from Azure Event Hubs to Snowflake tables.

Capturing Costs

In order to calculate the total cost of your streaming pipeline you need to look into the below three costs

  • Client Cost: calculated per hour for the time taken by Snowpipe Streaming client to load the data into Snowflake tables
  • File Migration Cost: Credits consumed to migrated data into Snowflake tables
  • Cloud Services Cost : Charged at an hourly rate of 0.01 Snowflake Credits per client instance

You can query the below two Account Usage views to identify the total credits.

snowpipe_streaming_file_migration_history

snowpipe_streaming_client_history

  • Running the below query helps you to get the credits consumed for the duration of time it took for the Kafka client to load the data to Snowflake. Here we are selecting the client name based on the name we set it in Kafka connect configuration file.

SELECT COUNT(DISTINCT event_timestamp) AS client_seconds, date_trunc('hour',event_timestamp) AS event_hour, client_seconds*0.000002777777778 as credits, client_name, snowflake_provided_id
FROM SNOWFLAKE.ACCOUNT_USAGE.SNOWPIPE_STREAMING_CLIENT_HISTORY
where client_name like '%KC_CLIENT_sf_sink_premium_3machines%'
GROUP BY event_hour, client_name, snowflake_provided_id order by event_hour desc;

The provided output displays the credit consumption in relation to the duration of the client streaming data into Snowflake. The number of tasks in the Kafka configuration file is set to 8 and you see the corresponding credit usage by each client based on the duration.

  • Below query gives you the credits consumed by the Snowpipe Service to load the data into Snowflake table based on the volume of the data.
--File migration cost for each table
select round(CREDITS_USED,5) as Credits,* from SNOWFLAKE.ACCOUNT_USAGE.SNOWPIPE_STREAMING_FILE_MIGRATION_HISTORY
where table_name like 'VEHICLESENSORDATAPREMIUM_3MACHINES%';

The query output presents the credit consumption by the service during the process of loading data into Snowflake tables. This consumption is calculated based on the number of bytes migrated, which in this specific scenario corresponds to 3 million records.

File Migration Credits

Cloud Service Cost

  • This is calculated per hour of each client runtime multiplied by 0.01 credits. Client in this context is number of machines on which Kafka connect is running. If you are using 2 machine then the number of clients is 2.

Cloud Service Cost = Client runtime in hours *Num clients * 0.01

  • If there is only Kafka Connect client running from 3:00pm-3:15pm then you will be only charged for that 15 minutes (0.25 hours) only. So the the Cloud Service cost will be :

0.25*1*.01 = .0025 Credits

Please note, since Snowpipe Streaming is in Public Preview pricing may change before it is made generally available.

Best Practices for Ingesting Streaming Data with High throughput and Lower Latency

Below are some of the best practices for ingesting streaming with high throughput and lower latency using Azure Event Hubs and Snowpipe Streaming:

  • Use a dedicated Event Hub for each Snowpipe stream. This will help to ensure that there is no contention for resources between different streams.
  • Ensure you are using the right Azure Event Hubs TU of PU for maximizing the throughput. Lack of proper sizing w.r.t to TU in case if Standard Event Hubs can lead to throttling.
  • Increasing the number of partitions of the Event Hub which helps in improving the throughput. Match it with the number of consumers and also match the partitions count with the number of TU’s if you are using Standard Event Hubs.
  • In the SF_Connect.json file keep the value for buffer.flush.time to as low as possible and the minimum value is 1 (sec). This will help in reducing the latency for the records to be seen in the Snowflake tables.
  • Set the value for tasks.max equal to the number of cores available on the VM/Docker where the Kafka Connect client is running to improve the throughout.
  • Consider running multiple Kafka Connect instances to read data from different topics. This approach increases the overall ingestion throughput.

Conclusion

The integration of Azure Event Hubs with Snowpipe Streaming simplifies the process, enabling businesses to ingest, process, and analyze real-time data with ease. With its scalability, reliability, and cost-effectiveness, the combination of these two powerful services unlocks new possibilities for customer to gain insights and derive value from their streaming data.

Please note Snowpipe Streaming is in public preview at the time of writing this blog and not GAed yet.

References

Snowpipe Streaming:

Snowpipe Streaming Cost:

Kafka Support on Azure Event Hubs

--

--