Navigating SCDs in Snowflake: A Quickstart Guide

Sarthak Sarbahi
Towards Data Engineering
19 min readDec 14, 2023

--

Photo by Damian McCoig on Unsplash

In this story, I’ll guide you through a simple yet exciting tutorial on Snowflake, focusing on a concept called SCD — or “Slowly Changing Dimensions.” Think of SCD as a way to keep track of changes in important data details or dimensions, like customer info or product specifics, which are crucial for understanding and reporting data. By the time we wrap up, you’ll have a clear grasp of the various types of SCDs and how you can put them into action using Snowflake. So, let’s dive in and start exploring!

What is Snowflake?

Snowflake is a cloud-based data warehousing platform that provides a fully managed, scalable solution for data storage, processing, and analysis. Its unique architecture allows for the separation of storage and compute resources, enabling users to scale each independently and pay only for what they use.

This platform supports structured and semi-structured data, like JSON and offers robust support for SQL queries. It integrates easily with various data integration tools and business intelligence applications. Snowflake’s design optimizes performance for both large-scale data operations and complex query execution.

Key features include automatic scaling, data sharing capabilities, and near-zero maintenance, making it a compelling choice for organizations looking to implement a flexible, cost-effective data solution in the cloud.

How can I access Snowflake?

You can access Snowflake with their 30-day free trial by visiting this link. Just choose the cloud platform you prefer for hosting your Snowflake instance and pick the edition that suits your needs. There are options like the Enterprise or Business Critical editions. I opted for the Enterprise edition hosted on Google Cloud Platform.

What are the prerequisites for this tutorial?

  • Snowflake: You can get started with a Snowflake instance by visiting this link.
  • Google Cloud Storage (GCS): You will need access to a Google Cloud Platform (GCP) account. You can sign up for a 3-month free trial by visiting this link. Once you have access to a GCP account, you will need to create a GCS bucket. Don’t worry if you’re new to GCP. I’ll take you through the steps.
  • Sample data: You can find the sample data here. Make sure to download both CSV files. We will require them as part of this tutorial.

Checked off all three items on the list? Fantastic! Now, let’s dive right in and get started!

Key Snowflake ideas to know

Before we dive into the practical side of things, it’s essential to grasp some key Snowflake concepts. We’ll be using these throughout the tutorial, so it’s important to get a good handle on them first.

  1. Warehouse: In Snowflake, a ‘warehouse’ isn’t a building for storing goods, but a virtual compute engine that does the work of analysing and processing your data. These warehouses come in different “sizes” depending on the compute power required.
  2. Roles: In Snowflake, roles define what each user can see and do. It helps in managing security and ensuring that people only access the data they’re supposed to.
  3. Stage: A ‘stage’ in Snowflake is a temporary storage area where you can keep your data before you move it into the main database. It’s like the waiting room for your data, where it sits and gets ready to be processed and organized.
  4. Integration Object: Simply put, an integration object in Snowflake is used to securely connect Snowflake with external systems, like your apps or other data sources. It’s like a safe bridge that allows data to flow between Snowflake and other places without compromising security.
  5. Stream: A Stream in Snowflake is a cool feature that keeps an eye on a table and notes down every change — like when data is added, updated, or deleted. This concept is also referred to as “Change data capture”. It’s like having a surveillance camera on your data table, recording every move, so you know exactly what changed and when.

Now that we’ve covered the essential concepts, it’s time to roll up our sleeves and dive into the hands-on part of our journey.

Load data in Google Cloud Storage

Navigate to the GCP console by visiting this link. Once there, use the search bar at the top to look for “Cloud Storage”. Next, hit the CREATE button.

Google cloud platform — the console

You’ll need to name your bucket; I named mine scd-snowflake. Make sure that the bucket name has to be globally unique.

Creating bucket in Google Cloud Storage

Leave the other settings as they are by default. Click CREATE. If a pop-up appears, confirm to disable public access to your bucket. This step is important to ensure your data isn’t publicly available on the internet. Then, go ahead and click on UPLOAD FILES to upload the two CSV files you downloaded earlier in this tutorial.

Files uploaded to the bucket

Check that both files are successfully uploaded. Now it’s time to head over to Snowflake.

Getting started with Snowflake

Launch Snowflake by using the instructions provided in your email. After logging in, navigate to the Worksheets tab on the left side. Click on the + icon at the top right and choose SQL Worksheet. This will be our space for crafting SQL commands in Snowflake.

New SQL worksheet in Snowflake

Feel free to name your worksheet. Just double-click on the tab at the top, which currently shows a timestamp. I have named the worksheet scd_with_snowflake.

Our first task is to set up the necessary role. We’ll use the ACCOUNTADMIN role to create a warehouse in Snowflake, as it’s the highest-level role with complete permissions.

USE ROLE ACCOUNTADMIN;

To run a query, type it into the worksheet, highlight it, and press the “Play” button (in blue) at the top right of the worksheet. This action executes the query.

Running the first query in the worksheet

Now, we’ll move on to creating a warehouse, essential for data processing in Snowflake.

CREATE OR REPLACE WAREHOUSE COMPUTE_WAREHOUSE
WITH
WAREHOUSE_SIZE = XSMALL
AUTO_SUSPEND = 300
AUTO_RESUME = TRUE
INITIALLY_SUSPENDED = TRUE

The command we’ll use sets up a warehouse named COMPUTE_WAREHOUSE. We’re starting with an XSMALL size for minimal computing resources. This warehouse will automatically suspend if inactive for 5 minutes (300 seconds) and will be in a suspended state initially. It will automatically resume when it’s needed for processing data.

Time to integrate GCP and Snowflake

Next, we’re going to set up a storage integration object in Snowflake. This will help us read data from our GCP cloud storage.

CREATE STORAGE INTEGRATION gcp_integration
TYPE = EXTERNAL_STAGE
STORAGE_PROVIDER = GCS
ENABLED = TRUE
STORAGE_ALLOWED_LOCATIONS = ('gcs://<bucket_name>/');

Remember to swap out <bucket_name> with your GCS bucket’s actual name. Now, let’s take a peek at the integration object’s properties.

DESC STORAGE integration gcp_integration;

Viewing these properties gives us insight into the storage integration. But this is just one part of the Snowflake-GCS integration process. Our next step involves adding the STORAGE_GCP_SERVICE_ACCOUNT property value as a member to our GCS bucket. This grants it access to the data in the bucket. For now, just copy the property_value.

Viewing properties of storage integration

Time to go back to our bucket in Google Cloud Storage. Click on PERMISSIONS. In the VIEW BY PRINCIPALS section, select GRANT ACCESS.

Permissions tab of the bucket

Paste the property value into New principals. For the role, choose Storage Object User. After this, hit Save.

Add principal and assign the role

Finally, we’ll create the database and schema in Snowflake.

CREATE OR REPLACE DATABASE DEMO_DB;
CREATE OR REPLACE SCHEMA DEMO_SCHEMA;

And there you have it! We’ve successfully integrated Snowflake with our GCS bucket.

Creating a stage in Snowflake

Now, let’s move on to creating the stage object with the query provided.

CREATE OR REPLACE stage DEMO_DB.DEMO_SCHEMA.DEMO_STAGE
URL = 'gcs://<bucket_name>/'
STORAGE_INTEGRATION = gcp_integration

Make sure to replace <bucket_name> with the real name of your GCS bucket. With the stage object in place, we can easily check out what’s inside our GCS bucket.

LIST @DEMO_DB.DEMO_SCHEMA.DEMO_STAGE;

Get ready to see this integration come to life!

Viewing GCS bucket contents in Snowflake

As you’ll notice, we can now peek into the contents of the GCS bucket right from Snowflake. This is a clear indication that our integration is functioning just as we hoped.

Working with tables in Snowflake

Let’s begin by creating a table to store our data from the CSV files. We’ll call this table ITEMS.

CREATE OR REPLACE TABLE DEMO_DB.DEMO_SCHEMA.ITEMS (
item_serial_number INT,
item_name VARCHAR(20),
country_of_origin VARCHAR(15)
);

Following that, we’ll set up a staging table. Think of this as a temporary spot for data before it moves to the final ITEMS table.

CREATE OR REPLACE TABLE DEMO_DB.DEMO_SCHEMA.ITEMS_STAGING (
item_serial_number INT,
item_name VARCHAR(20),
country_of_origin VARCHAR(15)
);

Next up, we’re going to create a stream object in Snowflake. This stream will track any changes made to the ITEMS_STAGING table, including inserts, updates, and deletes. It’s important to know that the stream empties itself after the data has been read. We’ll use this stream to incrementally load new records into our final ITEMS table.

CREATE OR REPLACE STREAM DEMO_DB.DEMO_SCHEMA.ITEMS_STREAM ON TABLE DEMO_DB.DEMO_SCHEMA.ITEMS_STAGING;

Now, it’s time to load our first CSV file (household_items.csv) into the staging table. We’ll use Snowflake’s COPY command for this. The COPY command is quite versatile, allowing us to load various file types like CSV and JSON into Snowflake tables.

COPY INTO DEMO_DB.DEMO_SCHEMA.ITEMS_STAGING
FROM @DEMO_DB.DEMO_SCHEMA.DEMO_STAGE
file_format= (type = csv field_delimiter=',' skip_header=1)
files = ('household_items.csv')
VALIDATION_MODE = RETURN_ERRORS;

In our COPY command, we’re targeting the ITEMS_STAGING table first. We’re also using the stage object we created earlier. We’ve defined how the data should be formatted as it’s loaded, specified the file name, and used the VALIDATION_MODE = RETURN_ERRORS parameter. This parameter is really handy for testing the COPY command and spotting any errors — it won’t load data if we use this argument.

No errors returned with COPY

Next, we’ll execute the COPY command again, but this time without the VALIDATION_HOME parameter, so the data gets loaded.

COPY INTO DEMO_DB.DEMO_SCHEMA.ITEMS_STAGING
FROM @DEMO_DB.DEMO_SCHEMA.DEMO_STAGE
file_format= (type = csv field_delimiter=',' skip_header=1)
files = ('household_items.csv')
ON_ERROR = 'CONTINUE';

The ON_ERROR = ‘CONTINUE’ parameter means the command will run and bypass any errors that might crop up.

Records are inserted in the staging table

Great, our staging table now has the data! You can check out what’s inside the staging table with this query.

SELECT * FROM DEMO_DB.DEMO_SCHEMA.ITEMS_STAGING;

Now, let’s see what’s happening in our stream object. We’ll use SELECT * FROM DEMO_DB.DEMO_SCHEMA.ITEMS_STREAM;. Here, METADATA$ACTION = INSERT indicates the rows that have been inserted into the staging table.

View contents of the stream object

Our next step is to transfer data into our final ITEMS table using the stream.

INSERT INTO DEMO_DB.DEMO_SCHEMA.ITEMS SELECT ITEM_SERIAL_NUMBER, ITEM_NAME, COUNTRY_OF_ORIGIN FROM DEMO_DB.DEMO_SCHEMA.ITEMS_STREAM;

And voilà! You can view the data in the ITEMS table with SELECT * FROM DEMO_DB.DEMO_SCHEMA.ITEMS;. Next, let’s understand the first kind of SCD.

SCD Type 0: No changes in the dimension column

Let’s take a closer look at the data within our ITEMS table. It consists of three columns:

  • item_serial_number: This column lists the serial number of each household item.
  • item_name: Here, you’ll find the name of each household item.
  • country_of_origin: This indicates the country where the item was manufactured.

In this tutorial, we’ll focus on the country_of_origin column as our dimension column that might change over time.

Data in the ITEMS table

SCD Type 0 essentially means that if there are any changes in the data, they won’t affect the dimension column. For instance, if a Fan’s country of origin shifts from Spain to Portugal, the information in our table will stay the same. This SCD type is typically used for data that doesn’t change often, like lookup or reference information. Now, let’s move on to exploring SCD Type 1.

SCD Type 1: Overwriting the dimension column

In SCD Type 1, we update the dimension column with new values. For example, if a Fan’s country of origin changes from Spain to Portugal, our table will reflect this new information by updating the existing record.

Now, let’s put this into practice in Snowflake. Up until now, we’ve loaded data from just one CSV. We’re going to load from the second CSV now (household_items_error.csv), following the same steps as before. First, we’ll run a validation on our COPY command to ensure everything’s set.

COPY INTO DEMO_DB.DEMO_SCHEMA.ITEMS_STAGING
FROM @DEMO_DB.DEMO_SCHEMA.DEMO_STAGE
file_format= (type = csv field_delimiter=',' skip_header=1)
files = ('household_items_error.csv')
VALIDATION_MODE = RETURN_ERRORS;

Oops, it looks like we encountered an error. But don’t worry, this is part of the learning process.

Error while loading

Errors while loading data from CSVs are common in real-world scenarios. If you check the CSV, you’ll notice that the country_of_origin column has a value that’s way too long (###############################), exceeding the 15-character limit we set for the column (VARCHAR(15)).

Knowing there’s an error, we can take it a step further. It’s often useful to store incorrect records separately for later analysis. Let’s create a new table for data with errors.

CREATE OR REPLACE TABLE DEMO_DB.DEMO_SCHEMA.REJECTED_RECORDS (
rejected_record TEXT
);

After setting up the new table, we’ll insert the erroneous record into it. Rerun the COPY command with VALIDATION_MODE = RETURN_ERRORS for household_items_error.csv, and then execute the following query.

INSERT INTO DEMO_DB.DEMO_SCHEMA.REJECTED_RECORDS SELECT rejected_record FROM TABLE(RESULT_SCAN(LAST_QUERY_ID()));

This adds one record to our REJECTED_RECORDS table. The last bit in the above query TABLE(RESULT_SCAN(LAST_QUERY_ID())) essentially takes the output of the last query and uses it here. You can view the error records with SELECT * FROM DEMO_DB.DEMO_SCHEMA.REJECTED_RECORDS;. Great, you’ve learned how to manage error records! Now, let’s try loading data into the staging table again.

Run the COPY command once more.

COPY INTO DEMO_DB.DEMO_SCHEMA.ITEMS_STAGING
FROM @DEMO_DB.DEMO_SCHEMA.DEMO_STAGE
file_format= (type = csv field_delimiter=',' skip_header=1)
files = ('household_items_error.csv')
ON_ERROR = 'CONTINUE';

This time, it’ll add four rows to the staging table and skip the erroneous one. But we don’t need to worry about the error record for now.

Four rows inserted in the staging table

Since our staging table has new records, the stream will capture these changes. We’ll use the stream for a UPSERT operation on our ITEMS table, which is a combination of UPDATE and INSERT.

MERGE INTO DEMO_DB.DEMO_SCHEMA.ITEMS AS I
USING DEMO_DB.DEMO_SCHEMA.ITEMS_STREAM AS S
ON I.item_serial_number = S.item_serial_number
WHEN MATCHED THEN
UPDATE SET I.country_of_origin = S.country_of_origin
WHEN NOT MATCHED THEN
INSERT (item_serial_number, item_name, country_of_origin) VALUES (S.item_serial_number, S.item_name, S.country_of_origin);

After performing this operation, two rows are inserted and two are updated in the ITEMS table. To confirm, let’s examine the final ITEMS table with SELECT * FROM DEMO_DB.DEMO_SCHEMA.ITEMS;.

Data in ITEMS table

You’ll see two new items (Wardrobe and Gas Stove) added, and the country_of_origin column for two items (Fan and Heater) updated. This demonstrates SCD Type 1. But what about retaining historical data, including both old and new records? For that, we need SCD Type 2.

If you encounter an error about “no active warehouse”, just run USE WAREHOUSE COMPUTE_WAREHOUSE;.

Before we delve into SCD Type 2, let’s clear out the staging table with TRUNCATE TABLE DEMO_DB.DEMO_SCHEMA.ITEMS_STAGING;.

SCD Type 2: Maintain old and new data with row versioning

SCD Type 2 is a go-to method in data warehousing where we keep both the old and new records. The trick is telling them apart, and for that, we add extra columns to our table. Let’s dive into an example for clarity.

We’ll start by creating a new table named ITEMS_HIST.

CREATE OR REPLACE TABLE DEMO_DB.DEMO_SCHEMA.ITEMS_HIST (
item_serial_number INT,
item_name VARCHAR(20),
country_of_origin VARCHAR(15),
ingestion_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
feed_key VARCHAR(20) DEFAULT TO_VARCHAR(DATE_PART('YEAR', CURRENT_TIMESTAMP), 'FM0000')||TO_VARCHAR(DATE_PART('MONTH', CURRENT_TIMESTAMP), 'FM00')||TO_VARCHAR(DATE_PART('DAY', CURRENT_TIMESTAMP), 'FM00')||TO_VARCHAR(DATE_PART('HOUR', CURRENT_TIMESTAMP), 'FM00')||TO_VARCHAR(DATE_PART('MINUTE', CURRENT_TIMESTAMP), 'FM00')||TO_VARCHAR(DATE_PART('SECOND', CURRENT_TIMESTAMP), 'FM00'),
crnt_flag INT DEFAULT 1
);

This table differs from our previous ones. The first three columns are unchanged, but there are three new ones to note:

  • ingestion_timestamp: Captures the exact time a row is added. New records will have a more recent timestamp than older ones.
  • feed_key: Identifies a set of records. Those from household_items.csv will have a different feed_key than those from household_items_error.csv. This key is crafted by concatenating various parts of a timestamp (Year + Month + Day + Hour + Minute + Second).
  • crnt_flag: This column is particularly useful. It indicates the most current row for each household item.

Next, we’ll set up the staging table and stream again.

CREATE OR REPLACE TABLE DEMO_DB.DEMO_SCHEMA.ITEMS_STAGING (
item_serial_number INT,
item_name VARCHAR(20),
country_of_origin VARCHAR(15)
);

CREATE OR REPLACE STREAM DEMO_DB.DEMO_SCHEMA.ITEMS_STREAM ON TABLE DEMO_DB.DEMO_SCHEMA.ITEMS_STAGING;

Then, we load the household_items.csv into the staging table.

COPY INTO DEMO_DB.DEMO_SCHEMA.ITEMS_STAGING
FROM @DEMO_DB.DEMO_SCHEMA.DEMO_STAGE
file_format= (type = csv field_delimiter=',' skip_header=1)
files = ('household_items.csv')
ON_ERROR = 'CONTINUE';

We’ll then proceed to insert data into ITEMS_HIST, using both the stream and staging table. This involves a three-step process, which needs to be done in order.

-- Step 1: Insert into final table using stream
MERGE INTO DEMO_DB.DEMO_SCHEMA.ITEMS_HIST AS I
USING DEMO_DB.DEMO_SCHEMA.ITEMS_STREAM AS S
ON I.item_serial_number = S.item_serial_number
WHEN MATCHED AND S.METADATA$ACTION = 'INSERT' THEN
UPDATE SET I.crnt_flag = 0
WHEN NOT MATCHED AND S.METADATA$ACTION = 'INSERT' THEN
INSERT (item_serial_number, item_name, country_of_origin) VALUES (S.item_serial_number, S.item_name, S.country_of_origin);

-- Step 2: Insert into final table using staging table
INSERT INTO DEMO_DB.DEMO_SCHEMA.ITEMS_HIST
SELECT
item_serial_number,
item_name,
country_of_origin,
CURRENT_TIMESTAMP AS ingestion_timestamp,
TO_VARCHAR(DATE_PART('YEAR', CURRENT_TIMESTAMP), 'FM0000')||TO_VARCHAR(DATE_PART('MONTH', CURRENT_TIMESTAMP), 'FM00')||TO_VARCHAR(DATE_PART('DAY', CURRENT_TIMESTAMP), 'FM00')||TO_VARCHAR(DATE_PART('HOUR', CURRENT_TIMESTAMP), 'FM00')||TO_VARCHAR(DATE_PART('MINUTE', CURRENT_TIMESTAMP), 'FM00')||TO_VARCHAR(DATE_PART('SECOND', CURRENT_TIMESTAMP), 'FM00') AS feed_key,
1 AS crnt_flag
FROM DEMO_DB.DEMO_SCHEMA.ITEMS_STAGING
WHERE item_serial_number IN (SELECT DISTINCT(item_serial_number) FROM DEMO_DB.DEMO_SCHEMA.ITEMS_HIST WHERE crnt_flag = 0);

-- Step 3: Truncate staging table
TRUNCATE TABLE DEMO_DB.DEMO_SCHEMA.ITEMS_STAGING;

Step 1 involves using the stream to add new items to ITEMS_HIST (those not already there). If an item exists, its crnt_flag is set to 0, marking it as no longer the most current.

Step 2 uses the staging table to add new rows for items already in ITEMS_HIST, marked with a crnt_flag of 1. However, in this step using household_items.csv, no new records are added since they’re already inserted in Step 1. This step becomes relevant when loading data from the second CSV, household_items_error.csv.

Step 3 involves clearing the staging table for future data loads. Let’s examine the data after completing these steps with SELECT * FROM DEMO_DB.DEMO_SCHEMA.ITEMS_HIST ORDER BY item_serial_number;.

Data in ITEMS_HIST table

You’ll notice each item has one row, with a crnt_flag of 1, indicating it’s the most current. Now, let’s load the second CSV into the staging table and repeat Steps 1 to 3.

COPY INTO DEMO_DB.DEMO_SCHEMA.ITEMS_STAGING
FROM @DEMO_DB.DEMO_SCHEMA.DEMO_STAGE
file_format= (type = csv field_delimiter=',' skip_header=1)
files = ('household_items_error.csv')
ON_ERROR = 'CONTINUE';

The output that you will get looks something like this.

Two rows for an item

The result will show that for items like Fan and Heater, there are two rows: the older one with a crnt_flag of 0 and a new one with a crnt_flag of 1. The feed_key also helps distinguish between the two sets of data. This is SCD Type 2 in action. But what if we want just one row per item with both the old and new dimension values? That’s where SCD Type 3 comes into play.

SCD Type 3: Maintain old and new dimension values in the same row

Returning to our household item example, the Fan, let’s say it was initially made in Spain, but later production moved to Portugal. In this scenario, SCD Type 3 keeps just one row for the Fan in our table. However, the dimension column is split into two: one for the old value (Spain) and another for the new one (Portugal).

But what if manufacturing shifts again, this time to Italy? The old value column then shows Portugal, and the new one updates to Italy. It’s important to note that in this process, we lose the initial detail that the Fan was once made in Spain.

That’s the essence of SCD Type 3. Let’s see how this works practically. We’ll start by re-creating our final ITEMS table.

CREATE OR REPLACE TABLE DEMO_DB.DEMO_SCHEMA.ITEMS (
item_serial_number INT,
item_name VARCHAR(20),
country_of_origin_current VARCHAR(15),
country_of_origin_previous VARCHAR(15)
);

You’ll notice a small change in the ITEMS table. There are now two columns for the country of origin — one for the previous (country_of_origin_previous) and one for the current location (country_of_origin_current). Next, we’ll also set up our staging table and stream again.

CREATE OR REPLACE TABLE DEMO_DB.DEMO_SCHEMA.ITEMS_STAGING (
item_serial_number INT,
item_name VARCHAR(20),
country_of_origin VARCHAR(15)
);

CREATE OR REPLACE STREAM DEMO_DB.DEMO_SCHEMA.ITEMS_STREAM ON TABLE DEMO_DB.DEMO_SCHEMA.ITEMS_STAGING;

Now, let’s load data into the staging table using the first CSV.

COPY INTO DEMO_DB.DEMO_SCHEMA.ITEMS_STAGING
FROM @DEMO_DB.DEMO_SCHEMA.DEMO_STAGE
file_format= (type = csv field_delimiter=',' skip_header=1)
files = ('household_items.csv')
ON_ERROR = 'CONTINUE';

Then, we insert data into the final ITEMS table using the MERGE statement.

MERGE INTO DEMO_DB.DEMO_SCHEMA.ITEMS AS I
USING DEMO_DB.DEMO_SCHEMA.ITEMS_STREAM AS S
ON I.item_serial_number = S.item_serial_number
WHEN MATCHED THEN
UPDATE SET
I.country_of_origin_current = S.country_of_origin,
I.country_of_origin_previous = I.country_of_origin_current
WHEN NOT MATCHED THEN
INSERT (item_serial_number, item_name, country_of_origin_current, country_of_origin_previous) VALUES (S.item_serial_number, S.item_name, S.country_of_origin, NULL);

This query inserts household items based on their serial number. Where there’s no match, we add the records with a NULL in the country_of_origin_previous column (as this is the first load, there’s no previous data).

For existing items, we update the country of origin columns. Let’s check the initial results in the final table with SELECT * FROM DEMO_DB.DEMO_SCHEMA.ITEMS;.

NULL values for country_of_origin_previous

As expected, the country_of_origin_previous column is NULL because these rows represent the latest information for each item. Now, let’s load the second CSV into the staging table.

COPY INTO DEMO_DB.DEMO_SCHEMA.ITEMS_STAGING
FROM @DEMO_DB.DEMO_SCHEMA.DEMO_STAGE
file_format= (type = csv field_delimiter=',' skip_header=1)
files = ('household_items_error.csv')
ON_ERROR = 'CONTINUE';

Repeat the MERGE statement, and then let’s look at the updated data in the final table with SELECT * FROM DEMO_DB.DEMO_SCHEMA.ITEMS;.

SCD Type 3 performed

You’ll see we still have one row per item, but now, for items like Fan and Heater, both the old and new dimension values are visible. Next, we’ll explore SCD Type 4.

SCD Type 4: Maintain old and new rows in two different tables

SCD Type 4 blends the approaches of Type 1 and Type 2. In this method, we manage two tables: one for the most recent record of an item and another for both old and new records. Let’s understand this with a practical example.

We’ll create two tables:

  • ITEMS_LATEST: This table will hold only the latest record for each item.
  • ITEMS_HIST: This table will store both past and current records. It includes an ingestion_timestamp to mark when data is added and a feed_key to distinguish between different batches of ingestions.

Let’s execute the following queries to set up these tables.

CREATE OR REPLACE TABLE DEMO_DB.DEMO_SCHEMA.ITEMS_LATEST (
item_serial_number INT,
item_name VARCHAR(20),
country_of_origin VARCHAR(15)
);

CREATE OR REPLACE TABLE DEMO_DB.DEMO_SCHEMA.ITEMS_HIST (
item_serial_number INT,
item_name VARCHAR(20),
country_of_origin VARCHAR(15),
ingestion_timestamp TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
feed_key VARCHAR(20) DEFAULT TO_VARCHAR(DATE_PART('YEAR', CURRENT_TIMESTAMP), 'FM0000')||TO_VARCHAR(DATE_PART('MONTH', CURRENT_TIMESTAMP), 'FM00')||TO_VARCHAR(DATE_PART('DAY', CURRENT_TIMESTAMP), 'FM00')||TO_VARCHAR(DATE_PART('HOUR', CURRENT_TIMESTAMP), 'FM00')||TO_VARCHAR(DATE_PART('MINUTE', CURRENT_TIMESTAMP), 'FM00')||TO_VARCHAR(DATE_PART('SECOND', CURRENT_TIMESTAMP), 'FM00')
);

Then, we’ll recreate the staging table and stream.

CREATE OR REPLACE TABLE DEMO_DB.DEMO_SCHEMA.ITEMS_STAGING (
item_serial_number INT,
item_name VARCHAR(20),
country_of_origin VARCHAR(15)
);

CREATE OR REPLACE STREAM DEMO_DB.DEMO_SCHEMA.ITEMS_STREAM ON TABLE DEMO_DB.DEMO_SCHEMA.ITEMS_STAGING;

Now, let’s import data from the first CSV into the staging table.

COPY INTO DEMO_DB.DEMO_SCHEMA.ITEMS_STAGING
FROM @DEMO_DB.DEMO_SCHEMA.DEMO_STAGE
file_format= (type = csv field_delimiter=',' skip_header=1)
files = ('household_items.csv')
ON_ERROR = 'CONTINUE';

First, we’ll load data in the ITEMS_LATEST table with a new MERGE statement.

MERGE INTO DEMO_DB.DEMO_SCHEMA.ITEMS_LATEST AS I
USING DEMO_DB.DEMO_SCHEMA.ITEMS_STREAM AS S
ON I.item_serial_number = S.item_serial_number
WHEN MATCHED AND S.METADATA$ACTION = 'INSERT' THEN
UPDATE SET I.country_of_origin = S.country_of_origin
WHEN NOT MATCHED AND S.METADATA$ACTION = 'INSERT' THEN
INSERT (item_serial_number, item_name, country_of_origin) VALUES (S.item_serial_number, S.item_name, S.country_of_origin);

This MERGE operation looks for the item’s serial number. If it doesn’t find the serial number (meaning the item doesn’t exist), it adds the row. If the item is already there, it updates the country_of_origin.

Next, we’ll add records to the ITEMS_HIST table.

INSERT INTO DEMO_DB.DEMO_SCHEMA.ITEMS_HIST 
SELECT
item_serial_number,
item_name,
country_of_origin,
CURRENT_TIMESTAMP AS ingestion_timestamp,
TO_VARCHAR(DATE_PART('YEAR', CURRENT_TIMESTAMP), 'FM0000')||TO_VARCHAR(DATE_PART('MONTH', CURRENT_TIMESTAMP), 'FM00')||TO_VARCHAR(DATE_PART('DAY', CURRENT_TIMESTAMP), 'FM00')||TO_VARCHAR(DATE_PART('HOUR', CURRENT_TIMESTAMP), 'FM00')||TO_VARCHAR(DATE_PART('MINUTE', CURRENT_TIMESTAMP), 'FM00')||TO_VARCHAR(DATE_PART('SECOND', CURRENT_TIMESTAMP), 'FM00') AS feed_key
FROM DEMO_DB.DEMO_SCHEMA.ITEMS_STAGING;

Here, we simply append data to ITEMS_HIST using the staging table. Then, we prepare for the next data batch by truncating the staging table with TRUNCATE TABLE DEMO_DB.DEMO_SCHEMA.ITEMS_STAGING;.

Now, let’s check the ITEMS_HIST table with SELECT * FROM DEMO_DB.DEMO_SCHEMA.ITEMS_HIST ORDER BY item_serial_number;.

Data in ITEMS_LATEST table after the first batch of ingestion

Now let’s view the final historical table ITEMS_LATEST using the query SELECT * FROM DEMO_DB.DEMO_SCHEMA.ITEMS_HIST ORDER BY item_serial_number;.

Data in ITEMS_HIST table after the first batch of ingestion

Next, we’ll load the second batch of data into the staging table using the COPY command.

COPY INTO DEMO_DB.DEMO_SCHEMA.ITEMS_STAGING
FROM @DEMO_DB.DEMO_SCHEMA.DEMO_STAGE
file_format= (type = csv field_delimiter=',' skip_header=1)
files = ('household_items_error.csv')
ON_ERROR = 'CONTINUE';

We’ll rerun the MERGE statement for ITEMS_LATEST and INSERT INTO query for ITEMS_HIST.

Let’s view the ITEMS_LATEST table’s data.

Latest dimension value for every item present in ITEMS_LATEST

Here, you can see only the latest values for each item’s country_of_origin. Now, let’s check the ITEMS_HIST table.

All values for dimension column present for every item in ITEMS_HIST

In ITEMS_HIST, every record of the country_of_origin for each item is maintained. This concludes our exploration of SCD Type 4.

Lastly, there’s SCD Type 6, which is essentially a mix of Types 1, 2, and 3 (hence 1 + 2 + 3 = 6). It’s not commonly used, so we won’t delve into the details here.

Conclusion

And that brings us to the conclusion of our tutorial. We kicked things off with an introduction to Snowflake and its essential features. We ventured through creating a bucket in Google Cloud Storage and seamlessly integrated it with Snowflake. Our journey took us through the various types of SCD concepts, and we mastered loading data into Snowflake tables using CSVs with the COPY command. We tackled UPSERT operations with the MERGE statement, navigated erroneous data, and got hands-on with the stream object in Snowflake.

I sincerely hope this guide was beneficial for you. Should you have any questions about this tutorial, please don’t hesitate to drop them in the comments below.

References

--

--