SimStreamer: From Zero to Snowpipe Streaming in 4 Steps

Introduction

Snowflake recently introduced the Snowpipe Streaming API in public preview.

Prior to Snowpipe Streaming, Snowpipe offered the ability to micro-batch ingestion of data in near real-time with a couple of minutes latency. To address the use case of a continuous ingestion of streaming data, the recommendation was to use Kafka, and leverage the Snowflake Kafka connector for Snowpipe to ingest the data into Snowflake. The Snowflake Kafka connector was then able to micro-batch the ingestion using either a time based interval, memory threshold, or number of messages. This approach under the covers uses micro-batching and serializes the data onto files into stages before loading them onto Snowflake.

Snowpipe Streaming addresses the use case of ingesting continuous real-time streaming data directly within Snowflake with low latency, within seconds, without having to land the data. Snowpipe Streaming ingest mode can be used either using the Snowflake Kafka Connector, which can ingest rows directly from Kafka to target tables, or through a Java SDK.

The following diagram from the Snowflake documentation illustrates both approaches using the Snowflake Kafka Connector:

Kafka Connector Snowpipe Streaming vs Snowpipe (source: docs.snowflake.com)

The Snowpipe Streaming service is currently implemented as a set of APIs for the Snowflake Ingest Java SDK, available to download from the Maven Central Repository. It allows clients to implement custom Java Applications to be able to ingest data from any source and ingest directly into Snowflake.

The motivation for this article is to provide a sample program that can be executed from a client workstation in order to ingest rows into Snowflake leveraging the Snowpipe Streaming API.

SimStreamer

In order to experiment with Snowpipe Streaming API, SimStreamer does not require you to ingest rows from an actual streaming data source. SimStream leverages the Java Faker library to produce a set of fake but realistic rowsets matching the TPC-H LINEITEM table. Snowflake comes with the TPC-H Sample dataset schema, making it easy to experiment with Snowflake.

The Java program can easily be modified to customize to generate streaming data matching your schema & use case. Snowpipe Streaming can be used using 3 steps.

Step 1: Clone the Repo

git clone https://github.com/Snowflake-Labs/simstreamer.git
  • The program was developed and compiled using OpenJDK version 19.0.2. No other JDK version has been tested.

Step 2: Create Target Table in Snowflake

As mentioned above, SimStreamer generates data for TPC-H LINEITEM table. In order to set-up the target table, you can create the target database and table as follows after setting the appropriate role:

create or replace database snowpipe_test;
create or replace table public.lineitem like snowflake_sample_data.tpch_sf1.lineitem;

Step 3: Customize the connections and simulation settings

The repo comes with a connection.json file that you will need to customize according to your target Snowflake account, credentials and target database & table.

One prerequisite is to have key-pair authentication setup with your target Snowflake account. Detailed instructions on how to setup private key-pair authentication can be found in the Snowflake Documentation. The current program makes use of unencrypted keys (passphrase parameter is not used).

Once you have configured the private key-pair authentication, the private key (contents of private_key.pem) needs to be copied in the private_key parameter (without the BEGIN/END ENCRYPTED PRIVATE KEY lines). You also need to remove the return characters using an appropriate text editor.

The account name can be retrieved from the target account URL you are using. The rest of the parameters are self-explanatory. Please make sure to use users credentials which have write access to the previously created table.

{
"user": "<user-id>",
"url": "https://<account-name>.snowflakecomputing.com:443",
"account": "<account-name>",
"private_key": "<private_key.pem>",
"port": 443,
"host": "<account-name>.snowflakecomputing.com",
"schema": "public",
"scheme": "https",
"database": "<db-name>",
"connect_string": "jdbc:snowflake://<account-name>.snowflakecomputing.com:443",
"ssl": "on",
"warehouse": "<warehouse-name>",
"role": "<role>"
}

The default simulation settings can be left as-is. The range of data is customized to match data from schema SNOWFLAKE_SAMPLE_DATA.TPCH_SF1 in order to do joins with other dimension tables. The following parameters governs the simulation:

  • totalRowsinTable: Number of rows to stream.
  • num_channels: Number of channels to create and load data in parallel.
  • sleep_ms: Introduces latency in ms between each row. By default, it’s set to zero.
{
"l_orderkey_start": 1,
"l_partkey_min": 1,
"l_partkey_max": 200000,
"l_suppkey_min": 1,
"l_suppkey_max": 10000,
"l_shipdate_min": "1992-01-02",
"l_shipdate_max": "1998-12-01",
"totalRowsinTable": 1000000,
"num_channels": 4,
"sleep_ms": 0
}

Step 4: Running the simulation

For convenience, you can move the SimStreamer.jar file from the target directory to the parent directory of the repo where the connection.json and simulation.json files are located:

cp target/SimStreamer.jar .

The simulation can be ran as follows:

java -jar SimStreamer.jar -u tpch

SimStreamer will spin-up 4 different threads (based on the simulation parameters) and produce and push rows via 4 channels via a single client connection. A channel represents a logical, named streaming connection to Snowflake for loading data into a table.

The channels can be queried from Snowflake as follows:

show channels in table lineitem;

This will show the different channels that were spinned-up by SimStreamer:

Channels created by SimStreamer on SNOWPIPE_TEST.PUBLIC.LINEITEM

The name of the channels created by SimStreamer are arbitrary. The following columns are of interest:

  • client_sequencer: Counter that is incremented each time a channel is reopened by a client application. For SimStreamer, this value will always be zero as each execution spins-up new channels, and does not have any logic built-in to reuse channels.
  • row_sequencer: Counter incremented each time rows are flushed and ingested through the channel. This can be used to monitor the progression of the ingestion.
  • offset_token: Value set by the application when inserting the row. SimStreamer passes the row number in the rowset allocated to a channel.

One can notice the difference between row_sequencer and offset_token as row_sequencer only represents number of committed rows.

The table LINEITEM can be queried to check the rows being ingested:

select * from LINEITEM limit 10;

This gives the following output:

Since SimStreamer generates fake but realistic dataset within the parameters of TPC-H dataset, we can run TPC-H Q3 query for example which will join it with existing SNOWFLAKE_SAMPLE_DATA database:

-- TPCH Q3
SELECT
l_orderkey,
SUM(l_extendedprice * (1 - l_discount)) AS revenue,
o_orderdate,
o_shippriority
FROM SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.CUSTOMER,
SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.orders,
lineitem
WHERE c_mktsegment = 'BUILDING'
AND c_custkey = o_custkey
AND l_orderkey = o_orderkey
AND o_orderdate < '1992-03-15'
AND l_shipdate > '1992-02-15'
GROUP BY l_orderkey, o_orderdate, o_shippriority
ORDER BY revenue DESC, o_orderdate
LIMIT 10;

This returns the top 10 unshipped orders with the highest value:

TPC-H Q3 Output

We can monitor the throughput of the ingestion by creating a table to track down the number of rows ingested at a given timestamp:

create or replace table ingest_rate (ts time, num_rows int);

Then, issue the query:

insert into ingest_rate
select current_time(), count(*) from lineitem;

We can query the ingest_rate table:

select * from ingest_rate;
ingest_rate output

We can see that SimStreamer is able to push approximately 1800 rows/s across 4 different channels. Note that this is in no way representative of the ingest throughput that snowpipe streaming can achieve. This workload is mainly CPU bound at my local workstation (as Java Faker is not best optimized to generate data at a fast throughput rate, and it is a CPU intensive operation to generate fake rows across many columns). One way to increase the throughput could be to increase the number of channels. Results will vary based on the CPU capacity of your client workstation, your network and the number of channels used.

We can also monitor the amount of time spent loading data into Snowflake tables using Snowpipe Streaming with the SNOWPIPE_STREAMING_CLIENT_HISTORY View.

Note that the view can have up to 2 hours latency before you can see any output:

SELECT * FROM SNOWFLAKE.ACCOUNT_USAGE.SNOWPIPE_STREAMING_CLIENT_HISTORY 
where client_name = 'ICESTREAM' order by event_timestamp desc limit 3;
SNOWPIPE_STREAMING_CLIENT_HISTORY output

The following query will return the hourly credits consumed by each client loading data into Snowflake tables using Snowpipe Streaming within the last 365 days:

SELECT COUNT(DISTINCT event_timestamp) AS client_seconds, date_trunc('hour',event_timestamp) AS event_hour, 
client_seconds*0.000002777777778 as credits, client_name, snowflake_provided_id
FROM SNOWFLAKE.ACCOUNT_USAGE.SNOWPIPE_STREAMING_CLIENT_HISTORY
where client_name = 'ICESTREAM'
GROUP BY event_hour, client_name, snowflake_provided_id
order by event_hour desc
limit 5;
Cost of Streaming Ingest

Once you have done experimenting with the SimStreamer, you can simply exit the program by sending an interrupt with Ctrl+C on the command line where you invoked it in the first place.

Conclusion

SimStreamer provides you with an easy way to experiment with Snowpipe Streaming API without having to provide any streaming source, nor setting-up any Kafka connector. Its source code can easily be customized to fit the schema you want to match your actual use case. You would essentially just need to create a function to build a row like BuildRowTCPH() in file RunnableChannel.java and invoke it in ProcessChannelRowSet().

Dynamic Tables are also available in Private Preview which allows to implement declarative streaming data pipelines in Snowflake. SimStreamer can be used to experiment with Dynamic Tables as well, given that it provides a streaming data source, for the well known TPCH schema available as a sample dataset into any Snowflake account.

This will be part 2. Stay Tuned.

--

--