Real-time data streaming from database(MySQL) to Snowflake using Snowpipe Streaming API

Parag Jain
5 min readApr 27, 2023

--

Stream data into Snowflake without message broker services (Low code)

Real time data helps generate data insights faster, act upon the new data and make business decisions. There is no doubt that there are benefits of real-time data pipelines but it’s important to track cost-performance metrics to curb any runaway cost. There are other challenges like data velocity, latency, scalability and performance to data streaming. But a cost prohibitive solution certainly creates a barrier to entry into real time data streaming.

Snowflake recently announced a cost effective and low latency solution for streaming data from sources and message brokers (like Kafka). Today, we will put it to test and try to ingest 1 Million rows from MySQL database into Snowflake.

What you will learn

  1. Snowflake ingest SDK has a streaming API that can be used to replicate all data or ingest delta records from your database or transaction system into Snowflake with or without message brokers.
  2. Create custom Java application to build streaming pipeline into Snowflake
  3. Create Streaming data ingestion pipelines without any message broker like Kafka

Snowflake recently announced public preview (at the time of writing) of Snowpipe Streaming ingest API. It is a Java based open-source API that can be integrated easily in your programs, application, and project build. More information on this is Maven repo and snowflake inget SDK github link.

To begin, you can add the Snowflake Ingest Service SDK by adding the following to your maven project

<!-- Add this to your Maven project's pom.xml -->
<dependency>
<groupId>net.snowflake</groupId>
<artifactId>snowflake-ingest-sdk</artifactId>
<version>{version}</version>
</dependency>

This API can provide very low latency of streaming data ROWS (yes, you read it right, its not files) into Snowflake tables. It can reduce overall engineering and cost to bring data into snowflake. There can be connectors and partner tools build in future on top of this API to ingest data into Snowflake for example, Kafka connector for Snowflake now uses Snowpipe Streaming.

Cost effective data Streaming

We are going to use MySQL JDBC connector in a Docker container to retrieve the records from source and upload it to Snowflake using Java based app utilizing Snowpipe Streaming API

The methods provided in the Snowflake ingest API will help us create a streaming Client, than open a Channel and insert this data to Snowflake using the <InsertRow> method.

Sounds simple, well it is!

All this can be done in one single Java program with few lines of code. I built two services namely, app and db running in separate docker containers to achieve this.

Our source data in MySQL table has sample dataset borrowed from snowflake STORE_SALES table in SNOWFLAKE_SAMPLE_DATA database under TPCDS_SF10TCL schema. All columns in the dataset are numeric and its ok, because we just want to put engineering efforts, speed and cost to test than anything else.

Code

GitHub —> snowpipe-streaming-api-demo

$ git clone https://github.com/sfc-gh-pjain/snowpipe-streaming-api-demo

There are Dockefiles and compose to build the image and run the container services (app and db).

The App.java (here) file is where we make use of the Snowflake Snowpipe streaming API. In few simple steps

Create Snowpipe Streaming API client

We need to create a Client and Channel to communicate with snowflake which will be used to stream rows of data into a table.

Snowpipe Streaming client-channel mapping
SnowflakeStreamingIngestClient client = SnowflakeStreamingIngestClientFactory
.builder(props.getProperty("clientName"))
.setProperties(props).build())

Open a channel to stream the data.

OpenChannelRequest request1 = OpenChannelRequest.builder(props.getProperty("channel"))
.setDBName(props.getProperty("database"))
.setSchemaName(props.getProperty("schema"))
.setTableName(props.getProperty("table"))
.setOnErrorOption(OpenChannelRequest.OnErrorOption.CONTINUE).build();

SnowflakeStreamingIngestChannel channel1 = client.openChannel(request1);

Lastly, call the insertRows API to load data

InsertValidationResponse response = channel1.insertRow(row, String.valueOf(totalRowsInTable));

How to run this demo —

Prerequisites:

1. You can deploy this in your own docker client. We will not cover how to install docker, but its very easy, you can check it at https://www.docker.com/get-started/

2. Some familiarity with java programming

3. We are using community version of mysql server for demo purpose only, user and password are simple but in actual production you can use your own strong authentication guidelines

Setup and Run:

After you check the docker is running, here are the steps to duplicate this Streaming test –

  1. Clone or fork or download this repo to local or where you desire to run this demo
  2. We are using key pair authentication to communicate with Snowflake (default). You can setup key-pair using this link Snowflake key pair authentication setup
  3. Update your account specific information in the snowflake_account_config.properties file. Paste your PEM private key in this file in a single line, do not include “Begin — -” and “End — “ lines. We are not using passphrase for simplicity as this demo is about streaming not password hardiness.
  4. Open the setup_snowflake.sql file in text editor to copy and paste its content into a Snowflake worksheet and click Run All. This will setup a database and target table. Change it to your specific role if needed.
  5. Go to snowpipe-streaming-api-demo (root folder) and run this command in a terminal . It will take a while to download all the packages for first time.
docker compose up

6. Once the streaming load is completed you can verify your data in snowflake target table and bring the docker services down by running

docker compose down

Conclusion

we saw that Snowflake can ingest massive amount of data in seconds without the inflight delays and staging layer for ingestion. This opens up a variety of use cases like IoT data integration, source database integration, change data capture from source, end to end ELT using ingested data into Snowflake.

Many features like Dynamic tables (PrPr at the time of writing this blog), stored procedures(Python, Java, scala, javascript, SQL), UDF, UDTF, Snowpark (Python, Java & Scala), tasks and stream can be used to build an end to end data pipeline and host it without the need for any external tools or orchestration.

Reference:

Checkout full capabilities of Snowpipe Streaming here

Disclaimer: Any opinions expressed are solely my own and do not express the views or opinions of my employer.

--

--