Use Snowflake ❄️ to Load Tables from CSV and Parquet Data in Amazon S3

Amiri McCain
10 min readJul 5, 2024

--

We will be working with the database from Open Food Facts

The Solution 👨🏽‍🔬 🧪

To skip the scenic route and go straight to the code solution, scroll down👇🏽 or cmd+f 🍎 (or ctrl+f 🪟) to “Solution Code” or “Solution Code Steps” otherwise, enjoy the stroll 🚶🚶‍♀🚶‍♂️️

Abstract 🎨

This is the fourth article in the series. When we say “loading” or “load” or as it says in part in the title of this article “Load Tables from… Amazon S3,” in the context of Snowflake, we are talking about loading database tables that we create in Snowflake, with some sort of external data — external in our case meaning from the cloud. We will continue to use Open Food Facts’ data.

Photo by Sri Lanka on Unsplash

Previous Articles in this Series ↩

Loading Structured/semi-structured Data from an External Stage (Amazon S3)

Bulk loading from Amazon S3 | Snowflake Documentation

The data is, for the most part, structured. Looking at the data in all of the columns, we can see that some of it is unflattened data — and so we could call this data semi-structured. We’re not going to unflatten columns at this time, but rather just pull it all into one table in Snowflake. Note that the data in S3 is CSV and Parquet data.

Procedure at a High Level 🚁

  1. Create an Amazon S3 bucket and a role — CREATE WAREHOUSE | Snowflake Documentation. Can do this programmatically or with the user interface (UI) — Snowflake’s UI is called Snowsight. In this case, I created the snowflake warehouse using Snowsight.
  2. Setup role, warehouse, and database.
  3. Create external_stages, and file_formats schemas.
  4. Create a storage integration.
  5. Create file_formats object.
  6. Create external stage — specific to Amazon S3.
  7. ✨ Create the table using INFER_SCHEMA along with ARRAY_AGG(OBJECT_CONSTRUCT(*)) in the CREATE_TABLE query. This will allow us to programmatically extract column names and datatype without explicitly defining them in our CREATE TABLE query. Very cool. Note that the code you see in the scripts: SELECT * FROM TABLE(INFER_SCHEMA()) was for inspection and development purposes, only.
  8. Load data from external stage (S3) into new Snowflake table using the COPY INTO function.
  9. View the results, handle any import/loading errors.

Solution Code 🧭

CSV
Parquet

Let’s break down this code (CSV version) in the next section below. The parquet version largely does the same thing, with the main difference being that the Parquet version of my script does not include creating the storage integration, since I did that step first in the CSV version and so in the Parquet version, I instead run the query ALTER STORAGE INTEGRATION to add the S3 path to the Parquet files.

Solution Code Steps🪜

  1. Create the virtual warehouse and the database:
CREATE WAREHOUSE COMPUTE_WH
WITH WAREHOUSE_SIZE = 'X-SMALL'
WAREHOUSE_TYPE = 'STANDARD'
AUTO_SUSPEND = 300
AUTO_RESUME = TRUE
INITIALLY_SUSPENDED = TRUE
MIN_CLUSTER_COUNT = 1
MAX_CLUSTER_COUNT = 1
SCALING_POLICY = 'STANDARD';
--SHOW WAREHOUSES LIKE 'COMPUTE_WH_TEST';
--DROP WAREHOUSE IF EXISTS COMPUTE_WH;

CREATE DATABASE IF NOT EXISTS AMIRI_DB;
--DROP DATABASE IF EXISTS AMIRI_DB;

-- Get details of database
SHOW databases like 'AMIRI_DB';

Create the Snowflake virtual warehouse so that we can run queries and any needed DML operations. The warehouse size of X-SMALL is plenty for our operation and the configuration that you see in the above code block is typical. Next, create the database. I used the role of SYSADMIN throughout this entire project.

2. Select the appropriate role, warehouse, and database:

-- Select role, warehouse, and db
USE ROLE SYSADMIN;
USE WAREHOUSE COMPUTE_WH;
USE DATABASE AMIRI_DB;

If you have just created the warehouse and the database while in the role of SYSADMIN, running these commands is probably not needed. However, when you come back to this (e.g. log off and log back in), you may need to run these commands and so it is a good habit to get into.

3. Create schemas:

-- Create schemas
CREATE OR REPLACE SCHEMA external_stages;
CREATE OR REPLACE SCHEMA file_formats;

Schemas get created as database objects. “A schema is a logical grouping of database objects (tables, views, etc.) Each schema belongs to a single database.

4. Create the Storage Integration Object:

-- Create storage integration object (SYSADMIN role)
-- Must update AWS role Trust relationships > Trusted entities policy after running create storage integration below
-- DO NOT RERUN OR WILL NEED TO UPDATE AWS
CREATE OR REPLACE STORAGE INTEGRATION s3_int
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = S3
ENABLED = TRUE
STORAGE_AWS_ROLE_ARN = 'arn:aws:iam::012345678901:role/snowflake-access-role'
STORAGE_ALLOWED_LOCATIONS = ('s3://amiriscratch/openfoodfacts/chunk/csv/') -- can have multiple paths here
COMMENT = 'Storage integration object for openfoodfacts'

-- Describe
DESC INTEGRATION s3_int;

A Snowflake storage integration allows us to securely connect with a cloud provider’s cloud storage (Amazon S3, Google Cloud Storage, or Azure Storage). The DESC INTEGRATION s3_int; query allows you to see details of the integration that you just created.

When you run the CREATE STORAGE INTEGRATION query, you will need to update the AWS role’s “Trust relationships.” If you’re not familiar with how to do this, head on over to my article Create Storage Integration for Snowflake ❄️ to Securely Access Amazon S3 (Step By Step).

5. Create File Format Object:

 -- Create file format object
CREATE OR REPLACE FILE FORMAT AMIRI_DB.FILE_FORMATS.OPENFOODFACTS_CSV_FILEFORMAT
TYPE = csv
FIELD_DELIMITER = ','
NULL_IF = ('NULL', 'null')
EMPTY_FIELD_AS_NULL = TRUE -- this alone didn't fix, had to fill empty fields with null
ERROR_ON_COLUMN_COUNT_MISMATCH = FALSE
PARSE_HEADER = TRUE -- this parameter mattered, would not retrieve column names without it but (infer does work in a limited way when this is set to false)
FIELD_OPTIONALLY_ENCLOSED_BY = '"';

-- Describe
DESC FILE FORMAT AMIRI_DB.FILE_FORMATS.OPENFOODFACTS_CSV_FILEFORMAT;

The Snowflake file format object identifies all sorts of settings that determine how to read the file object that it is to import — file type, delimiter, date format, etc. After creating this object run the DESC FILE FORMAT query to see all of the configuration details — some of these property values can be changed.

Results table for DESC FILE FORMAT <name> query

6. Create External Stage:

-- Create external stage (created with SYSADMIN, use EXTERNAL_STAGES)
CREATE OR REPLACE STAGE AMIRI_DB.EXTERNAL_STAGES.CSV_FOLDER
URL='s3://amiriscratch/openfoodfacts/chunk/csv/en.openfoodfacts.org.products_0.csv' -- would not work when looking at all files
STORAGE_INTEGRATION = s3_int
FILE_FORMAT = AMIRI_DB.FILE_FORMATS.OPENFOODFACTS_CSV_FILEFORMAT
COPY_OPTIONS = (on_error = 'continue');
-- Notes on external stage creation:
-- Files that cause infer schema to fail: 2,3,4,5,6,9 failed
-- Files that allow infer schema to work properly: 0,1,7,8
-- Did not have this problem with parquet files

-- Use fully qualified name or make sure to set database to EXTERNAL_STAGES
-- Describe
DESC STAGE AMIRI_DB.EXTERNAL_STAGES.CSV_FOLDER;

CREATE STAGE | Snowflake Documentation

An external stage allows Snowflake to reference data files that are stored in a location outside of the Snowflake environment, such as an Amazon S3 bucket.

After creating this object run the DESC STAGE query to see all of the configuration details — some of these property values can be changed.

Results table for DESC STAGE <name> query

Use INFER_SCHEMA to get list of column names and identify data types from CSV (or Parquet) file(s):

-- Infer schema to get list of columns names, returns a table (SYSADMIN, amiri_db.external_stages)
-- Have to be in amiri_db.external_stages or use fully qualified name, otherwise I get an error
SELECT *
FROM TABLE(
INFER_SCHEMA(
LOCATION=> '@AMIRI_DB.EXTERNAL_STAGES.CSV_FOLDER' --'@csv_folder' --'@csv_folder/en.openfoodfacts.org.products_0'
, FILE_FORMAT=>'AMIRI_DB.FILE_FORMATS.OPENFOODFACTS_CSV_FILEFORMAT'
)
);

This was for inspection purposes only, as I was examining how this command works. This query took 1m 45s to complete. You will notice, in the next step, that we use also use INFER_SCHEMA along with ARRAY_AGG(OBJECT_CONSTRUCT(*)) in the CREATE_TABLE query.

Results table from SELECT * FROM TABLE(INFER_SCHEMA()) query

7. Create table using the INFER_SCHEMA function:

-- Create table using the infer schema function, problem is that column names get created out of order from original csv file(s)
CREATE OR REPLACE TABLE amiri_db.public.openfoodfacts
USING template (
SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
FROM TABLE(
INFER_SCHEMA(
LOCATION=>'@AMIRI_DB.EXTERNAL_STAGES.CSV_FOLDER', -- external stage
FILE_FORMAT=>'AMIRI_DB.FILE_FORMATS.OPENFOODFACTS_CSV_FILEFORMAT' -- command was failing without fully qualified name
)
));

This command allows us to create a table without manually typing in the 203 column names and datatypes; INFER_SCHEMA along with ARRAY_AGG(OBJECT_CONSTRUCT(*)) grabs all of the column header names from the CSV file and uses that for the table column names and this function also inspects the data in each column to automatically determine the data type. This is amazing! 🤩

If this would not have worked, the alternative would be that I would have had to manually type in all 203 column names and their data types into the CREATE TABLE query or perhaps do this in a way that was partially automated (e.g. programmatically get column names and their respective datatypes) and partially manual (e.g. manually paste those column names/datatypes into my CREATE TABLE statement). However, as it turned out, I was able to automate this process completely and this was a game changer. This query took 1m 46s to complete.

Early on in the development process, I was able to programmatically retrieve the column names. See image below where I copy/pasted results from the GENERATE_COLUMN_DESCRIPTION(ARRAY_AGG(OBJECT_CONSTRUCT(*)) query into a blank worksheet. Note, that some further data massaging would need to be done to get the data into a format that the CREATE TABLE would accept, not to mention that a half manual, half automated process is more error prone than a fully automated (and vetted) process.

Sample of the would-be alternative method

To quickly check out our shiny new (empty) table, let’s run the query: SELECT * FROM openfoodfacts LIMIT 1000;.

Use GENERATE_COLUMN_DESCRIPTION query for inspection:

-- Query the GENERATE_COLUMN_DESCRIPTION function.
SELECT GENERATE_COLUMN_DESCRIPTION(ARRAY_AGG(OBJECT_CONSTRUCT(*)), 'external_table') AS COLUMNS -- both 'table' and 'external_table' works here but response is slightly different format
FROM TABLE (
INFER_SCHEMA(
LOCATION=>'@AMIRI_DB.EXTERNAL_STAGES.CSV_FOLDER',
FILE_FORMAT=>'AMIRI_DB.FILE_FORMATS.OPENFOODFACTS_CSV_FILEFORMAT'
)
);

GENERATE_COLUMN_DESCRIPTION does some cool stuff. I was able to use it to retrieve the column names and datatypes along with additional formatting information, however, I was not able to use it as part of my CREATE TABLE statement.

8. Load data from external stage (S3) to Snowflake table using the COPY INTO function:

-- Load data from external stages into table
COPY INTO AMIRI_DB.PUBLIC.OPENFOODFACTS
FROM '@AMIRI_DB.EXTERNAL_STAGES.CSV_FOLDER'
file_format = (FORMAT_NAME = AMIRI_DB.FILE_FORMATS.OPENFOODFACTS_CSV_FILEFORMAT, PARSE_HEADER = FALSE, SKIP_HEADER=1);

This is where the rubber meets the road — everything has worked so far. Will the final step of loading the data from S3 into our Snowflake table be successful? Yes, I’m on the edge of my seat too! 😬

9. View the results, handle any import/loading errors:

Results from COPY INTO command from external stage (CSV)

Success (sort of)! We loaded more than 300,000 rows from S3 in 28s and with no errors. The 300,000+ rows only represent about 10% of the total number of rows. This represents 1 file of the 10 CSV files that I have in S3 and that are about 1GB per file, in size.

I did have some issues with the CSV external stage when all files were included, I suspect there are some improperly formatted columns/rows for several of the files and so I decided to the make the stage point to just one file for the purpose of this demonstration. Perhaps I will come back to this in a future article and go through my debugging process.

Photo by Sebastiano Piazzi on Unsplash

I did not have any such issues with the Parquet stage and Parquet formatted files. I created the CSV files and Parquet files from the same source (i.e. one ginormous TSV file saved as a .csv file), but evidently there is something I’m missing in the formatting of the CSV files.

Results from COPY INTO command from external stage (Parquet)

Parquet data. I was able to Load 3,027,506 rows of data from all 10 of my Parquet files (exact same data content as all of the CSV formatted files) into my table in 1m 26s and with zero errors! Not to mention the size on disk of the Parquet files is around 10% the size of the CSV files on disk.

Photo by Mark Basarab on Unsplash

Now let’s take a look at our shiny new data in our shiny new table. SELECT * FROM AMIRI_DB.PUBLIC.OPENFOODFACTS LIMIT 1000;

Data loaded from CSV
Data loaded from Parquet

Again, all of the Parquet formatted data loaded with zero errors. However, you can see that the ordering of the columns is wrong and does not follow the column order of the original files. I will plan on addressing this in a future article.

Summary

The INFER_SCHEMA function along with the ARRAY_AGG(OBJECT_CONSTRUCT(*)) function proved to be quite useful for automating the creation of a table in Snowflake. This task would have been painful without these two functions. On our journey, we also discovered that the Parquet file format has a much smaller storage footprint, and this is very desirable for a number of reasons, especially in production. With all of the SQL code and Snowflake functions carefully planned out and tested, we were able to build a connection, a data pipeline, if you will, from S3 to Snowflake and load nearly 10GB of data from the cloud into our table in less than one and a half minutes.

Useful Snowflake Functions During Development and Debug 📃

  • INFER_SCHEMA
  • SELECT ARRAY_AGG(OBJECT_CONSTRUCT(*))
  • SELECT GENERATE_COLUMN_DESCRIPTION(ARRAY_AGG(OBJECT_CONSTRUCT(*)), ‘external_table’)

References 📚

What’s Next?

I will continue to build on this project. Next, in upcoming articles:

  • Debug: We will debug the CSV formatting issues that we are having.
  • Reorder columns: Create a query that will rearrange our columns to the desired order.
  • Use dbt to modularize our queries and to create transformations on Snowflake ❄️ tables.
dbt Labs | Transform Data in Your Warehouse (getdbt.com)

Before you go!

Did you see what happens when you click and hold the clap 👏🏽 button? You could clap up to 5️⃣0️⃣ times, that is, if you enjoyed the article. Try it out! 😀

--

--

Amiri McCain

Data engineering, cloud, and electronics tech enthusiast.