The One Billion Row Challenge with Snowflake

I recently learned about the One Billion Row Challenge initiated by Gunnar Morling. The challenge is to create a Java program to process a CSV file containing 1 billion rows, where each row contains the name of a weather station and a recorded temperature. The program needs to calculate the minimum, average, and maximum temperatures for each weather station.

Originally tailored for Java, the challenge has attracted interest in other programming languages, as well as approaches using databases and SQL. Initially, I considered using this challenge as an opportunity to delve into Rust or Go, two languages I’ve been eager to try.

However, I was inspired by Robin Moffat and Francesco Tisiot, who successfully tackled the challenge using SQL with DuckDB, PostgreSQL, and ClickHouse. Motivated by their approach, I chose to undertake the challenge using Snowflake.

This article details my various experiments and performance results.

Things to Note on Measuring Performance

In the actual challenge, all programs are run on a Hetzner Cloud CCX33 instance with 8 dedicated vCPU, 32 GB RAM. The time program is used to measure the execution times.

With Robin and Francesco’s work, they ran their databases locally. Clearly that’s not possible with Snowflake as it is only available in the cloud.

Additionally, the compute power for Snowflake is determined by the virtual warehouse size. You can think of a virtual warehouse a bit like a remote machine that does compute for you. A larger warehouse is going to give you more query processing power to reduce the overall time, so this is not going to be an exact apples to apples comparison, but hey, this is just for fun :-).

For the purposes of my testing, I used the Large warehouse option. In my testing, a Large warehouse was about 5 times faster during a table copy than an X-Small warehouse.

Loading and Processing CSV Files in Snowflake

The first step is to load the CSV file into Snowflake. There’s a number of ways to do this, from using Snowsight directly to Snowsql or hosting data externally to Snowflake within a S3 bucket. For this particular situation, Snowsight is not a viable option as the file we need to process is 13 GBs and Snowsight’s file limit is 50 MBs.

For the challenge, I tested out two primary approaches: an internal stage and external stage, along with a few variations for each approach.

A Snowflake stage is used for loading files into Snowflake tables or unloading data from tables into files. An internal stage stores data within Snowflake, while an external stage references data in a location outside of Snowflake like a S3 bucket.

In the first approach with an internal stage, I used Snowsql to move the file from my local machine into a Snowflake stage, then copy the file into a table, and execute the SQL against the table.

The second approach used an external stage that points to an AWS S3 bucket and an external table. I kept all data external to Snowflake and avoided loading a native table.

There’s pros and cons of both approaches. Let’s start by looking at the internal stage approach.

The Internal Stage Approach

After cloning the 1brc repo and generating the dataset, the data needs to be uploaded into an internal stage. With the data uploaded, we’ll copy the data into a table where we can run a query to produce the required output. The flow for this process is similar to what’s shown in the diagram below.

Example flow for moving measurement files into a table and the query operation needed to produce the challenge output.

To. move the measurement data into an internal stage I used the following commands.

CREATE OR REPLACE FILE FORMAT measurement_format
TYPE = 'CSV'
FIELD_DELIMITER = ';';

CREATE OR REPLACE STAGE measurements_record_stage
FILE_FORMAT = measurement_format;

PUT file:///LOCATION_TO_FILE/1brc/measurements* @measurements_record_stage
AUTO_COMPRESS=TRUE PARALLEL=20;

I’m compressing the CSV file into a zip file and the PARALLEL parameter helps speed up the upload by using mutiple threads. The output of the PUT command looks like this:

Example of internal stage content after uploading measurements.txt.

Note that I didn’t consider staging the file to be part of performance measurement. It’s essentially equivalent to moving the file onto a Hetzner Cloud CCX33 instance as done in the main challenge.

Creating a Table from the Internal Stage

The next step is to copy the internal stage data into a table.

First I created a table called measurements.

CREATE OR REPLACE TABLE measurements (
location VARCHAR(50),
temperature NUMBER
);

Next, I used the COPY INTO command to move the stage data into the measurements table.

COPY INTO measurements
FROM @measurements_record_stage
FILE_FORMAT = (FORMAT_NAME = measurement_format)
ON_ERROR = 'skip_file';

Doing this with the 13 GB CSV file is quite slow as you can see in the performance table below. By keeping all the data in a single file, we miss out on the parallel processing power of a cloud-based platform like Snowflake. The documentation recommends keeping files under 250 MBs compressed to optimize for parallel loading.

To address this, I split the measurements file into a collection of smaller files. I experimented with splitting the file into different size chunks to see how that impacted performance. The best result was with 10 million records per file and a total of 100 files.

Time to copy files into a table depending based on splitting up the files.

Moving the data from the stage into a table is where the bulk of the operational cost is going to be. Breaking the file up into chunks to parallel process the records during the table copy process saves a lot of time, but there’s still a significant cost.

In contrast, when handling this challenge in a programming language like Java, you can process the file and simultaneously compute the required values, storing them in a data structure tailored for the intended output. Creating a specialize tailored solution is going to likely be less costly than loading everything into a general-purpose database.

However, in the world outside of this competition, the advantage of a database is that you can run a myriad of queries against the dataset with very high performance. The flexibility and efficiency in querying typically outweighs the initial data loading costs.

Querying the Data in Snowflake

The last step is to query the data and produce the results in the format as specified by the One Billion Row Challenge.

Creating a list of results grouped by location along with the minimum, average, and maximum values is pretty straightforward.

SELECT location, 
MIN(temperature) AS min_temperature,
CAST(AVG(temperature) AS NUMBER(8, 1)) AS mean_temperature,
MAX(temperature) AS max_temperature
FROM measurements
GROUP BY location;

However, the challenge specifies that the data must be printed in alphabetical order in a single line similar to what’s shown below:

{Abha=5.0/18.0/27.4, Abidjan=15.7/26.0/34.1, Abéché=12.1/29.4/35.6, Accra=14.7/26.4/33.1, Addis Ababa=2.1/16.0/24.3, Adelaide=4.1/17.3/29.7, …}

To satisfy this requirement, I created an outer query that concats the bracket, slash, and comma characters as needed and flattens the results using the LISTAGG function.

SELECT '{' || LISTAGG(location || '='
|| CONCAT_WS('/', min_temperature, mean_temperature, max_temperature), ', ')
WITHIN GROUP (ORDER BY location) || '}'
FROM (
SELECT location,
MIN(temperature) AS min_temperature,
CAST(AVG(temperature) AS NUMBER(8, 1)) AS mean_temperature,
MAX(temperature) AS max_temperature
FROM measurements
GROUP BY location);

With this in place, the average query time with caching disabled is 1.77 seconds.

This gives the following overall timings:

  • 23.03 seconds
    - 21.26 seconds is table copy time
    - 1.77 second query time

If query caching is on, then once the first query is run, subsequent queries are about 10 times faster, with an average time of 0.182 seconds.

An overall time of 23.03 seconds is decent, but not spectacular. The best Java programs at the time of this writing are close to 6 seconds.

Avoiding the Table Copy Operation

Since the bulk of the operational cost of the first approach is the table copy, I attempted to see how I could avoid that.

Snowflake supports querying directly against an internal stage. The actual query syntax is very similar to querying against a table as shown below.

SELECT '{' || LISTAGG(location || '='
|| CONCAT_WS('/', min_temperature, mean_temperature, max_temperature), ', ')
WITHIN GROUP (ORDER BY location) || '}'
FROM (
SELECT t.$1 as location,
MIN(t.$2) AS min_temperature,
CAST(AVG(t.$2) AS NUMBER(8, 1)) AS mean_temperature,
MAX(t.$2) AS max_temperature
FROM @measurements_record_stage t
GROUP BY t.$1);

Snowflake’s documentation explicitly states that querying a stage is intended to only be used for simple queries and not a replacement for moving data into a table.

I ignored this warning just to see what the performance would be like. Querying the billion records contained within a single file was super slow. So slow that I killed the query operation.

However, similar to the optimization I made for the table copy where I split the records into multiple files, when I use the 100 files with 10 million records each, I am able to reduce the total time to calculate the minimum, average, and maximum temperatures and print the result to 12.33 seconds. That’s about 50% faster than the first method!

The External Stage Approach

I also wanted to see how an external stage would perform. My goal with the external stage is to keep the data from being copied into Snowflake. Under normal circumstances there could be financial reasons for doing this, but for this test, I was interested in comparing the performance with the internal stage approach previously discussed.

To create the external stage, I first uploaded the temperature measurement files to an S3 bucket. Similar to the internal stage approach, I broke up the single file into 100 separate files.

AWS S3 bucket after uploading 100 measurement files.

There’s a few of AWS IAM permissioning steps required to allow Snowflake to read or write to a S3 bucket you own. But once I had those configurations are in place, I created a storage integration as shown below.

CREATE STORAGE INTEGRATION s3_one_brc
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = 'S3'
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::<AWS_ROLE_ARN>'
STORAGE_ALLOWED_LOCATIONS = ('s3://one-brc-sean/', 's3://one-brc-sean/');

Then created the external stage referencing the storage integration.

CREATE STAGE s3_brc_stage
STORAGE_INTEGRATION = s3_one_brc
URL = 's3://one-brc-sean/'
FILE_FORMAT = measurement_format;

To query the external stage, I created an external table as shown below.

CREATE OR REPLACE EXTERNAL TABLE external_measurements(
location VARCHAR AS (value:c1::varchar),
temperature NUMBER AS (value:c2::number)
)
WITH LOCATION = @s3_brc_stage
FILE_FORMAT = (TYPE = CSV FIELD_DELIMITER = ';' SKIP_HEADER = 1);

Finally, I can query the data similar to an internal table.

SELECT '{' || LISTAGG(location || '='
|| CONCAT_WS('/', min_temperature, mean_temperature, max_temperature), ', ')
WITHIN GROUP (ORDER BY location) || '}'
FROM (
SELECT location,
MIN(temperature) AS min_temperature,
CAST(AVG(temperature) AS NUMBER(8,1)) AS mean_temperature,
MAX(temperature) AS max_temperature
FROM external_measurements
GROUP BY location);

Using this approach, I got an average end to end time of 24.83 seconds, so similar to the total time of the first approach with an internal stage and table copy operation. With caching turned on, subsequent queries average 0.232 seconds.

I also tried seeing if I could improve performance with a materialized view. This turned out to be much worse. The query time against the materialized view was about the same as the external table, but there’s an additional cost to create the view, which took on average about 40 seconds.

CREATE OR REPLACE MATERIALIZED VIEW measurements_view AS                                       
SELECT location, temperature FROM external_measurements;

Cranking Up My Warehouse Size

Throughout the prior experiments, I used a Large warehouse. Just for fun and to see how things would compare, I spun up a 4X-Large warehouse instance, and re-ran the internal stage steps I outlined previously.

With a 4X-Large warehouse, the table copy time was cut from 21.26 seconds to 14.66 seconds, yielding a total time to copy and execute the query to 16.50 seconds.

I also tried querying the internal stage directly as before to avoid the table copy. This cut the total cost from 12.33 seconds to 6.67 seconds.

Wrapping Up

This was a fun little challenge where I got to dive into some features of Snowflake I don’t use on a regular basis.

The best performance I achieved with the Large warehouse was 12.33 seconds where I split the file up into 100 smaller files, used an internal stage, and queried the stage directly. I’m clearly cheating a bit by splitting the file up and not counting that pre-processing step towards the total time.

Snowflake or any kind of database isn’t really the ideal approach to this challenge as the challenge is all about speed to return the required string. There’s no value in the context of the challenge to speeding up multiple fetches or having the flexibility to do other types of analysis. But it’s fun to try :-).

If you have any suggestions or ideas about how to optimize the approach, please let me know.

--

--

Sean Falconer
Snowflake Builders Blog: Data Engineers, App Developers, AI/ML, & Data Science

Head of Developer Relations and Marketing @ Skyflow | Engineer & Storyteller | 100% Canadian 🇨🇦 | Snowflake Data Superhero ❄️ | AWS Community Builder