Using AWS DMS For Migrating Data Into Snowflake
By Joshua Gitter
Migrating Transactional Data
Transactional data is often at the heart of meaningful analysis. At Endeavor, transactional data helps us to predict the likelihood a fan will watch a pay-per-view UFC fight card or attend a match at the Miami Open. It helps us understand what car someone may be interested in seeing at a Barrett-Jackson car auction, or what event might be the hot ticket item at this year’s Olympic Summer Games in Paris. But before our data scientists are able to do impactful analysis, data needs to be made available in our data warehouse. The process of ingestion is the bedrock of a powerful data warehouse, and that’s where our data engineering team comes in.
One of the core responsibilities of data engineering at Endeavor is to ingest data from transactional systems into our data warehouse for analysis. This ingestion can come from a plethora of places including Amazon AWS, FTP servers, or SQL servers, hosted both on-prem and on the cloud.
To help better serve our fans, Endeavor continually integrates with new partners and vendors. Some of these partners have data in transactional databases that we need to migrate data over from. Database migrations can be technically challenging, expensive, and time consuming. The difficulty, cost, and time sink all go up when migrating data at scale and in real time. Amazon Database Migration Services (DMS) is a great option for this type of real time migration and this article will discuss how we are able to migrate data to Snowflake using DMS.
Why We Choose DMS
There are several database migration solutions out there such as AWS DMS, Fivetran, and Google Cloud Database Migration Service. Any of them could be a potential fit for a use case but for those already in the AWS ecosystem, DMS is an attractive choice. DMS can also be cheaper when compared to no-code solutions such as FiveTran, or Airbyte. No-code solutions can be excellent migration choices but quickly become expensive with increasing monthly active rows. DMS offers options to run replication services using either on-demand or serverless compute resources, let’s compare the two.
AWS On-Demand
Pros:
- Good choice for predictable migration workloads and for those who want granular control over resource allocation.
- More documentation available as on-demand launched in 2016 and serverless in 2023.
- More databases are supported for target and source endpoints. A full list can be found here and here.
Cons:
- Generally more expensive — Users need to provision a replication instance. This means choosing the capacity (DCUs) of the instance and paying for it hourly.
- More complex to setup — Users need to setup a migration task and create a source and target endpoint for the task to use.
AWS Serverless
Pros:
- Cheaper — Users don’t need to provision replication instances as the capacity is automatically provisioned and managed. You only pay for what you use.
- Simple setup — Migration tasks and endpoints are created as part of the serverless instance and don’t need to be created separately.
Cons:
- Less databases are supported, a full list can be found here.
- Documentation isn’t as readily available.
Whichever you choose, DMS allows us to migrate data in near real time. Real time data has become critical for running a modern data-driven operation. It allows us to put information about transactions and inventory directly in front of analysts, enabling them to make timely decisions and get to value faster. This also means that both analytical and operational workloads can be supported using Snowflake.
The Solution
While DMS generally works as an excellent off-the-shelf solution, there are a limited number of supported destinations. Our data warehouse of choice is Snowflake, which is not one of the supported target databases for DMS. The good news is that DMS can write data to S3 buckets which we can leverage to our advantage.
Configuring DMS to migrate data to an S3 bucket is trivial as it already has built-in capability to do this. Once the data is inside of the bucket, Snowflake has its own set of objects which can aid us in continuing the migration.
A Snowflake stage is a temporary holding area for files being loaded into Snowflake from outside sources such as S3 or other storage locations. In order for stages to work we also need a storage integration. A storage integration is a Snowflake object that allows users to store an IAM policy to grant the stage access to files in storage locations.
Once data hits the stage, dynamic tables and tasks are the two main objects that aid in migration as they allow for the scheduling of queries. We can use these queries to move data from the staging area to our final destination.
Snowpipe and streams also offer viable options for migrating data into Snowflake, and we will discuss the differences in greater depth later in this article.
The solution we came up with needed to handle both full loads and change data capture (CDC). Full loads are a snapshot of a table at a certain time. CDC is a way of capturing changes made to a table over time. It allows for streaming inserts, updates, and deletes in real time as records are changed in the source database.
These two work well in tandem because the full load provides a starting point for the data migration and then CDC takes over for any changes made to that data once the full load is complete. Full loads on their own may be useful for datasets that are finalized and will not change. CDC on its own may be beneficial if a dataset is already migrated and ongoing changes need to be captured. Thankfully, Snowflake offers a range of tools that enable us to easily load both full datasets and capture data being written in real time to an S3 bucket.
Architecture
We have found four possible architectures that work for this migration. The first difference between the architectures is whether to use a Snowflake task or Snowpipe with an Amazon Simple Notification Service (SNS) Topic for loading data into the staging table. The second is whether to use dynamic tables or streams and tasks to move data to its final destination. The path the data takes from the source database to the Snowflake stage is the same irrespective of choice as is the need for a staging table. All four architectures are viable and the choice to use which will depend on your use case. We will explain the differences more in depth below.
Task vs Snowpipe with SNS Topic
Task
A Snowflake task is a way to schedule SQL queries to run on a given schedule. Imagine we had the following query:
CREATE TASK example_task
Warehouse = my_warehouse
Schedule = '1 Minute'
AS
COPY INTO my_table
FROM @my_stage
FILE_FORMAT = my_file_format
PATTERN = my_file_pattern;
ALTER TASK example_task RESUME;
This SQL query will create a task that will continuously execute the COPY INTO statement every 1 minute. Tasks are created in a SUSPENDED state so we need to make sure to RESUME the task after creation. Tasks are very powerful and can help us perform complex data transformations at scale.
In our architecture, we use tasks to run automated COPY INTO statements every 1 minute. These queries copy files from the stage and load the data into a staging table. The difference between the stage and staging table is that the stage is where the files from our S3 bucket are loaded while the staging table is where we actually unload that data from those files. As new files are added to the S3 bucket, they are automatically picked up by the stage.
Pros:
- All COPY INTO parameters are supported
- SELECT statements using a WHERE clause are supported. This is useful if you want to select certain rows from the stage instead of doing a COPY INTO.
Cons:
- The task will run every minute whether or not a new file was actually dropped off. This is a waste of compute resources as tasks run on virtual warehouses and consume credits which will incur a cost.
Snowpipe with SNS Topic
Snowpipe is another way to automate COPY INTO statements with the added benefit of being able to run only when new files are added to an S3 bucket. If data is being written as a file object to S3, then Snowpipe can automatically be notified of new files with the use of an Amazon SNS Topic.
An SNS topic acts as a communication channel that can broadcast messages about new files in an S3 bucket to its subscribers. Snowpipe can subscribe to this topic and will start the copy into job whenever it is notified about a new file in the S3 bucket. The following code shows how to create a Snowpipe that automates a copy into statement from an SNS topic:
CREATE PIPE my_pipe
AUTO_INGEST = TRUE
AWS_SNS_TOPIC = '<sns_topic_arn>'
AS
COPY INTO my_table
FROM @my_stage
FILE_FORMAT = my_file_format;
An SNS topic still needs to be configured on the AWS side with this approach.
Pros:
- Snowpipe will only run when there is a new file to process, so compute is not wasted.
Cons:
- Snowpipe doesn’t allow the use of certain parameters in its copy into statements including: ON_ERROR, PURGE, FORCE, and SIZE_LIMIT.
- Only simple SELECT statements are supported. Filtering using a WHERE clause is not supported.
Dynamic Tables vs Streams and Tasks
Next, lets discuss the method of materializing the migrated data. When using CDC, the DMS migration files include ongoing changes made to a target table. Along with these changes will be a column called ‘Op’ that denotes what happened to a row. ‘D’ indicates that a row was deleted, ‘I’ indicates that a row was inserted, and ‘U’ indicates that a row was updated which tells us which transformation to apply to a given row.
Handling this transformation and materializing the data in a final table can be done using dynamic tables or streams with tasks.
Dynamic Tables
A dynamic table is a table that materializes from the results of a query. By default, data in the table is refreshed incrementally but it can be configured to refresh with full loads. The refresh works by using change tracking applied on base table(s) to track which rows were modified since the last refresh. Akin to tasks, this will run at a given cadence automatically. Here is an example of a dynamic table creation query:
CREATE DYNAMIC TABLE my_table
TARGET_LAG = '1 minute'
WAREHOUSE = my_warehouse
AS
SELECT t.* exclude ("Op")
FROM my_staging_table t
qualify "datetime_column" = max("datetime_column") over (partition by "primary_key") and "Op" != 'D';
This query will materialize the results of the SELECT query as a dynamic table. The data in the dynamic table will be no more than 1 minute older than the base table. 1 minute is the absolute minimum lag possible for a dynamic table or a task. Also akin to tasks, dynamic tables are very helpful for data transformation as the table creation and data transformation are done in the same step.
During our dynamic table creation we write the SELECT portion of the query to get the newest row for every primary key. In plain English, we are trying to find the most up to date row for every unique customer/transaction in the database. We also only select from the staging table where the ‘Op’ is ‘U’ or ‘I’ so that deletes are not included in the dynamic table. Because we are doing this, all tables migrated with this solution require a primary key and a datetime column.
Pros:
- Snowflake recommends dynamic tables as the preferred way to build out data pipelines
- Transformation of data is done in the same step that loads it into the final table.
Cons:
- Change tracking needs to be enabled on the base table(s) that the dynamic table selects from.
Streams and Tasks
We have already discussed tasks and their effectiveness but they can’t replace dynamic tables on their own. Streams are Snowflake objects that track changes made to underlying objects. The following query will create a stream on table my_table:
CREATE STREAM table_stream ON TABLE my_table;
Next, a task will need to be created. When creating the task we can have it check every minute to see if the stream has any new data:
CREATE TASK example_task
WAREHOUSE = my_warehouse
SCHEDULE = '1 MINUTE'
WHEN SYSTEM$STREAM_HAS_DATA('table_stream')
AS
MERGE INTO final_table f
USING (
SELECT t.*
FROM table_stream t
QUALIFY "datetime_column" = max("datetime_column") over (partition by "primary_key")
) s
ON f."primary_key" = s."primary_key"
when matched
and s."Op" = 'D'
then delete
when matched
and s."Op"= 'U'
then update
set f."primary_key" = s."primary_key",
...
when not matched
and s."Op" ='I'
then insert
("primary_key",...)
values
(s."primary_key",...);
ALTER TASK example_task RESUME;
Whenever the stream has data we merge from the staging table into the final table. Similar to the dynamic table query, we get the newest row for each primary key and use that rows ‘Op’ to perform the data transformation.
Pros:
- Can be used without enabling change tracking on base table(s)
Cons:
- The merge query here is more complex then the query used for the dynamic table.
Closing Remarks
Migrating data easily, at scale, in near real time, and without significant cost is truly a substantial challenge. Amazon is able to provide a practical solution for this with AWS DMS.
Though Snowflake is not a DMS supported database, built-in Snowflake objects such as dynamic tables and tasks help us bridge the gap. Completing migrations such as these help Endeavor succeed by aiding in making real time data available for applications such as operational reporting, machine learning, and business analytics.
We believe this post can work as a guide for those looking to migrate data from various databases into Snowflake and to do so in real time.
To learn more — reach out to the author here.