Automated Ingestion into Snowflake via DMS/s3

Matt Feeser
In the weeds
Published in
11 min readAug 4, 2023

Ingestion of large amounts of change data capture (CDC) data from s3 into Snowflake can be a challenge. How would you manage that ingestion for nearly 300 tables from 8 different AWS RDS instances populated to s3 via AWS Database Migration Services (DMS)? It seems like a daunting task, but with some good planning and design, it can be easy to implement and support. Here’s how we did it at Greenhouse.

If you’re not familiar with AWS DMS, it’s a tool that can be used to read from a source system, typically a database, and propagate the data from that system into a target system. In our case we use AWS DMS to read from the Write-Ahead-Log (WAL) in PostgreSQL RDS instances and populate parquet files in s3. The specifics of the DMS implementation are for another post. In this post we’ll focus on the ingestion of the s3 parquet files into Snowflake.

Frequency of Ingestion

The first decision point we made on ingestion was whether to load in near real-time or via batch.

Initially we looked into near real-time using Snowpipe which can be triggered by s3 bucket notifications. Using Snowpipe the ingestion can be initiated as soon as the source file arrives in s3. However, we found a limitation that AWS only supports up to 100 event notifications per bucket. With our 8 different RDS instances and ~300 tables all writing to the same s3 bucket, we would need 2,400 bucket event notifications. There are workarounds we considered: 1) such as using a single bucket event notification to call a lambda function to identify the specific file and route it to the correct SNS topic or SQS, 2) splitting the DMS target across multiple buckets, but we ultimately we chose to pursue Snowflake Tasks and process daily batches of data from s3. The complete daily ingestion using Tasks averages just over 20 minutes. This is performant enough for us to increase the frequency of ingestion should such a business case arise.

The use of Snowflake Tasks proved to be a good decision for cost containment as well. Since tasks utilize a virtual warehouse, we can closely control our credit consumption. This could be more difficult to control with an out of the box Snowpipe implementation that scales up based on data volume.

Here’s our final ingestion pipeline design using Snowflake Tasks.

Ingestion Pipeline

Task Design

Snowflake Tasks are SQL based units of work that can execute either based on a schedule or based on the completion of a predecessor task (when building a DAG). We chose to use User-Managed tasks where we could fully utilize a specified warehouse and run all tasks concurrently.

Each task is responsible for loading a single table in our Datalake database, which pulls in data from the same logical table across all RDS instances. For example, the user table exists in all 8 RDS instances. A single task is responsible for ingesting the user table from each of the 8 instances. This is achieved by using a consistent file prefix for all of the DMS tasks so that the RDS instance and table name can easily be identified.

Stored Procedures

We built a series of stored procedures which are called by the tasks. The primary ingestion procedure performs the following steps for a single logical table:

  1. Dynamically generate a “COPY INTO” statement with a file pattern targeting the table within each RDS instance
  2. Execute the “COPY INTO” statement

A single call to the stored procedure results in N number of “COPY INTO” statements executing, 1 per source RDS instance.

The first stored procedure acts like a wrapper or interface between the task and the stored procedure that performs the ingestion. When called in “incremental” mode, this wrapper procedure determines the s3 file prefix date partition to run for and passes that to the second stored procedure. When called in “initial” mode, the wrapper procedure simply calls the second stored procedure. There are a few benefits to this approach:

  1. By determining the s3 prefix date partition, we are able to minimize our cloud services layer credit usage in Snowflake by narrowly focusing the file pattern. When loading files with too general of a file pattern in s3, you will see higher cloud service layer credit usage. This is due to the long list times on buckets with a large number of files.
  2. The stored procedure that performs the ingestion can be called directly during maintenance or support rotations. If a file did not get ingested, the correct date partition can be passed to it and the file will be loaded.

When using s3 as the target endpoint for DMS, the default file naming convention produces different filenames for the full load and CDC files. Full files are stored within the root table prefix and begin with the word LOAD. CDC files on the other hand, use timestamps for the file name based on when the records were written to the file. In addition, we configured DMS to create the CDC files in date-based folders using the DatePartitionEnabled, DatePartitionSequence, and DatePartitionDelimeter endpoint configuration parameters. So the CDC files contain the date they were created in their file prefix. Finally, we also included the source RDS instance as a prefix to the file name. For example, the full file and CDC file can be found the s3 bucket with the following prefix and file name.

Full File:

dms-s3-bucket-name/rds_instance_1/public/users/LOAD000000001.parquet

CDC (aka incremental file):

dms-s3-bucket-name/rds_instance_1/public/users/2023/06/01/20230601-004208649.parquet

Example: Initial Load Dynamically Generated Copy Into Statement (no date partition)

COPY INTO DATALAKE.APP.USERS (
RECORD,
PK,
_OPERATION,
_COMMIT_TIMESTAMP,
_LOAD_TIMESTAMP,
_FILE_NAME,
_FILE_ROW_NUMBER
)
FROM (
SELECT
$1::variant as RECORD,
$1:id::varchar as PK,
$1:Op::varchar as _OPERATION,
$1:_commit_timestamp::TIMESTAMP_TZ as _COMMIT_TIMESTAMP,
current_timestamp::TIMESTAMP_TZ as _LOAD_TIMESTAMP,
metadata$filename as _FILE_NAME,
metadata$file_row_number as _FILE_ROW_NUMBER
FROM @DATALAKE.APP.STAGE_DMS_S3_DATALAKE/rds_instance_1/public/users/LOAD
)
FILE_FORMAT = DATALAKE.APP.PARQUET_FORMAT
ON_ERROR = skip_file;

Example: Incremental Dynamically Generated Copy Into Statement

COPY INTO DATALAKE.APP.USERS (
RECORD,
PK,
_OPERATION,
_COMMIT_TIMESTAMP,
_LOAD_TIMESTAMP,
_FILE_NAME,
_FILE_ROW_NUMBER
)
FROM (
SELECT
$1::variant as RECORD,
$1:id::varchar as PK,
$1:Op::varchar as _OPERATION,
$1:_commit_timestamp::TIMESTAMP_TZ as _COMMIT_TIMESTAMP,
current_timestamp::TIMESTAMP_TZ as _LOAD_TIMESTAMP,
metadata$filename as _FILE_NAME,
metadata$file_row_number as _FILE_ROW_NUMBER
FROM @DATALAKE.APP.STAGE_DMS_S3_DATALAKE/rds_instance_1/public/users/2023/06/05/
)
FILE_FORMAT = DATALAKE.APP.PARQUET_FORMAT
ON_ERROR = skip_file;

For the “COPY INTO” statements to execute successfully, they need access to s3. We accomplished this by configuring an External Stage, Storage Integration, File Format, and the necessary IAM roles/privileges via Terraform.

Triggering the Tasks

Snowflake tasks can be triggered by a cron schedule, or by the successful execution of a predecessor task. In our case, all of the tasks are independent, so we used a cron schedule. All tasks trigger at the same time.

Monitoring/Alerting

By default, Snowflake tasks timeout after 60 minutes. On days with increased data volume from the source system, we occasionally run into this default duration and the task times out or gets canceled.

We decided not to increase the default timeout for the tasks, as this could lead to runaway ingestion tasks that consume excessive credits. Instead, the team decided that an alert sent to our data-alerts slack channel would be sufficient for notifying the support person of the issue. The tasks could then be investigated and manually restarted and monitored as needed.

This was implemented via an error notification integration in Snowflake to an AWS SNS topic which invoked a lambda function to send the alert to the Slack webhook.

Provisioning the Resources

We used Terraform to provision all of the resources to support the ingestion process. This included:

Snowflake

  • Databases
  • Schemas
  • Tables
  • Stored Procedures
  • Tasks
  • Storage Integrations
  • External Stages
  • File Formats

AWS

  • IAM roles/permissions

A module was created to provision all of the resources needed for the ingestion of a single logical table across the RDS instances. With this design, we were able to use a single list variable in Terraform, to store all of the source table names to be ingested and call the module in a for_each loop. If a new table needs to be added to the ingestion in the future, all that needs to happen is to add the table name to the list variable. The task and target table in Snowflake to handle the ingestion will be created on the next Terraform apply command.

locals {
tables = [
"table_1",
"table_2",
"table_3",
"table_4",
]
}
module "snowflake_s3_ingestion" {
source = "../../../../snowflake_s3_ingestion"

for_each = toset(local.tables)
region = "use1"
bucket_arn = "arn:aws:s3:::your-s3-bucket-name"
database = "DATALAKE"
schema = "APP"
stage_name = "STAGE_DMS_S3_DATALAKE"
table_name = upper(each.key)
file_format = "PARQUET_FORMAT"
storage_integration = "PROD_S3_STORAGE_INTEGRATION"
storage_aws_iam_user_arn = "arn:aws:iam::123456789:role/use1-prod-snowflake-storage-integration"
automation_type = "task"
automation_enabled = true
sql_statement = format("CALL %s.%s.%s('%s','%s','incremental')", "DATALAKE", "APP", "INGEST_APP_TABLE_PROC", upper(each.key), "id")
task_warehouse = "DMS_S3_INGEST_APP"
task_schedule = "USING CRON 30 0 * * * UTC"
snowflake_notification_integration_name = "PROD_NOTIFICATION_INGTEGRATION"
}

Snowflake Tasks Lessons Learned

  1. By default, all Snowflake Tasks are deployed in a suspended state. This means that they will not execute on their specific schedule until they are manually enabled.
  2. Due to the task’s state change being made outside of Terraform, we implemented a lifecycle policy to ignore changes on the enabled attribute of the task. This reduced the noise in subsequent Terraform plans and did not overwrite our intended state in future deployments.
  3. Terraform and Snowflake don’t agree on column data types. What you provision in Terraform may show slightly different in Snowflake. For example, specifying VARCHAR(100) will end up being VARCHAR with no length constraint in Snowflake. This will cause constant drift in a Terraform plan. To remedy this, we also put in a lifecycle policy to ignore changes to column data types. Likely this will be fixed in a newer version of the Terraform provider for Snowflake.

Right-Sizing the Ingestion Warehouse

In order to attribute cost and to ensure we could independently scale the ingestion workload, we created a separate warehouse for the sole purpose of ingestion. We ran a number of tests to determine the right-size for the warehouse.

Before embarking on this effort we set a goal that we wanted to complete the entire ingestion in under 30 minutes and minimize amount of credits used. In addition, we took the time to review the actual workload. This is important because in some cases increasing the number clusters or adjusting the scaling policy may not have not have the expected impact on overall runtime.

The Workload

  • All 300 tasks execute daily on the same cron schedule
  • Each task executes 8 “COPY INTO”statements (one for each RDS instance)

The workload is constant, with the same number of statements being run daily. Though the data volume may change in s3, the number of tasks/statements to complete with each ingestion does not fluctuate between runs.

This led us to believe that we would get the best results with a multi-cluster warehouse running in Maximized mode, so that all clusters are available as soon as the warehouse is started.

Warehouse Configurations Tested

For configuration 1, we saw high volume of query queueing and runtimes that exceeded the 60 minute task default timeout.

With configuration 2 we still observed high queueing and runtimes that exceeded the 60 minute task default timeout. This supported our earlier notion that auto-scaling the clusters would not significantly improve performance.

Using configuration 3, there was still some queuing, but it was minimal. Tasks completed at a reasonable rate with the entire ingestion taking about 20 minutes.

At the time of writing this, we are averaging only 4.8 credits used daily for this ingestion warehouse.

Structure of the Datalake tables

One of the challenges in loading CDC changes into a database is schema change over time. If a new column gets added to a table in the source system, that new column also needs to be added to the target table during the ingestion. Fortunately AWS DMS configured with a s3 target can automatically add these new columns to the output CDC files. So the added column is present in the file in s3, now we just need to make our load process incorporate the new column.

To ensure no data loss, and that all columns were present in our raw tables in Snowflake, we use a VARIANT column to store the entire content of the record from the source system. In addition to the VARIANT column, we keep some metadata about the load process and source s3 file. Finally, we parse out some important fields from the variant column and store them as individual columns on the table. This is done to simplify the process of sequencing the CDC changes to identify the current image of a record. Those columns include the primary key column from the source table, the timestamp of when the record was committed in the source system, and the DML operation (Insert, Update, or Delete). This results in all of the tables in our Snowflake Datalake databases having a common structure of:

Datalake Table Structure

We configured the DMS tasks to propagate source table DDL changes to the s3 target endpoint, so the parquet files in s3 include any new columns that get added to the sources. Because the COPY INTO statement in the Snowflake Task maps the entire source record to the RECORD VARIANT column, all new columns show up automatically. There is no need for code changes to the ingestion process for new columns to appear in the Snowflake Datalake.

Key Takeaways

Some of my favorite features from this implementation were:

  • Ease of adding a new table. Simply add the table name to the Terraform list variable.
  • Ease of adding a new RDS instance. Simply add the RDS instance name to a list variable in Terraform that is used by the Snowflake Stored Procedure.
  • Source schema change resilience. Any newly added columns to the source tables automagically show up in Snowflake.
  • Low Snowflake credit usage. I am still amazed that we can ingest so much data from our source application for less than 5 credits per day.

Conclusion

This was a very fun project. It took about 8 weeks from the design and initial POC to implementation. I learned a ton about Terraform, Snowflake, s3, and DMS along the way. I hope this helps you in your development of ingestion patterns into Snowflake.

Update July 15, 2024:

I was asked in the comments to provide a sample of the module code that was used in this example. For context, this version of the module code contains support for serverless tasks, which we found to work better when a large number of tasks execute simultaneously on the same warehouse. Additionally, this version removes support for snowpipe, as the tasks method of ingestion is working fine. This module does nothing more than provision the target table for ingestion and the task based on the details provided in the module call.

locals {
task_schedule = var.task_schedule
task_name = "TASK_${upper(var.table_name)}"
snowflake_notification_integration_name = var.snowflake_notification_integration_name
user_task_managed_initial_warehouse_size = "XSMALL"
}

resource "snowflake_table" "this" {
database = var.database
schema = var.schema
name = var.table_name
comment = "Raw ingest table for ${var.table_name}"
change_tracking = false

data_retention_time_in_days = var.data_retention_time_in_days

column {
name = "PK"
type = "VARCHAR"
}

column {
name = "RECORD"
type = "VARIANT"
}

column {
name = "_OPERATION"
type = "VARCHAR"
}

column {
name = "_COMMIT_TIMESTAMP"
type = "TIMESTAMP_TZ(9)"
}

column {
name = "_FILE_NAME"
type = "VARCHAR"
nullable = false
}

column {
name = "_FILE_ROW_NUMBER"
type = "NUMBER(38,0)"
nullable = false
}

column {
name = "_LOAD_TIMESTAMP"
type = "TIMESTAMP_TZ(9)"
}
}

resource "snowflake_task" "this" {
database = var.database
schema = var.schema
warehouse = var.task_warehouse != "" ? var.task_warehouse : null

name = local.task_name
sql_statement = var.sql_statement
schedule = var.task_schedule
enabled = var.automation_enabled
error_integration = var.snowflake_notification_integration_name

user_task_managed_initial_warehouse_size = var.task_warehouse != "" ? null : local.user_task_managed_initial_warehouse_size

lifecycle {
ignore_changes = [
enabled
]
}
}

--

--