How to migrate from Redshift to Snowflake using Airflow

Javier Lopez
Tech at PromoFarma by DocMorris

--

Making such a migration is painful. Things get harder than you foresaw initially and Murphy makes sure that his law is applying. In Promofarma by Docmorris we have learned (the hard way) some useful tips that we want to share with the community, along with our approach to this process. In the end, completing this migration was a several months process (not full time though, we had other tasks) involving the whole team working together to achieve the final goal.

Given the title, you might have already guessed that we took advantage of Airflow to make our lives easier when migrating.

We have followed a five steps process:
1-. Getting ready: preparation of custom pieces of code, tables, infrastructure…
2-. Data migration: moving all the data from one data warehouse, Redshift, to the other, Snowflake.
3-. DAGs migration: creating “duplicates” of all the processes so that they were running in parallel, in both Snowflake and Redshift
4-. Validation: checking that all the new data in Snowflake was indeed correct, using DAGs that compare the same tables in both databases using pandas and SnowflakeHook
5-. Cleaning up: deleting all Redshift-related code in Airflow, turning off Redshift…

In this blog post, we are covering steps 1 and 2.

Getting ready

The first step is, obviously, setting up your Snowflake. We recommend managing it through Terraform, and the key insight is to use Terraform right from the start. Otherwise, if you’ve already done some of the setup manually in the Snowflake interface, you’ll have to spend more time making Terraform work. Before reading the linked article, we had no prior experience with Terraform, but thanks to the expertise of our DevOps colleagues and other materials on the Internet we managed to configure it in a couple of days, so don’t hesitate, go for it.

Then, we wanted to create in Snowflake all the tables in our existing Redshift data warehouse. Since we had all the Create statements scripts for all our tables in a git repository, it was as simple as creating a Python script that read them all and executed the code in Snowflake. However, we had been following the best practices for Redshift, and hence, we had encodings, distkeys, sortkeys, etc in our create statements which don’t work with Snowflake. For that reason, we created a conversion function in Python using Regex, that converts all the SQL code from Redshift syntax to Snowflake syntax. Please note that our function is based on our coding style, so you might have to change some lines to get it fully working with your SQL queries. Reading carefully the code is also a way to know several syntax differences:

Convert syntax function

For managing this migration and our day-to-day creation of tables, alters of those tables, and so on, we started using the open-source tool YoYo-migrations, a database schema migration tool. Although it is not mandatory, we strongly encourage the use of it, because, among other advantages, you get code versioning of the changes made to the tables in your Snowflake. We won’t discuss how to use it in this article because it is out of scope, but you can find more information here.

Our convert_sql function was integrated into our yoyo-migrations flow, so having the create statements from Redshift, we got automatically converted and executed all the creates in Snowflake, but you can use it as a normal function in a Python script. If you don’t have the create statements either, don’t worry, you can get them from Redshift directly, so in pseudo-code you should do something like this:

import psycopg2
import snowflake-python-connector

tables = get tables from redshift
for table in tables:
ddl = get_ddl
snowflake.run(convert_sql(ddl))

Once we had all our tables created, we had to, obviously, start filling them with data, using Airflow.

We created a personalized SnowflakeOperator and S3ToSnowflakeOperator with more features than the vanilla ones. The operators also included a boolean argument called convert_sql, which when True, passes the queries to our function convert_sql before running them in Snowflake. We added this argument because while in limbo, not yet finished with this migration, any new processes that we created we wrote directly in Snowflake SQL, and hence that SQL did not need to be converted. However, for those already existing processes that we “duplicated”, we had to maintain the Redshift version running at the same time as the Snowflake one. You can find this code and extensive documentation of each operator in our GitHub repository:

S3ToSnowflake
Snowflake

Moving data

Now that we had our support functions and Airflow plugins, the infrastructure correctly created and managed in Snowflake using Terraform, and all our tables created, it was about time to start migrating actual data to our brand new Snowflake data warehouse. For that, we created an Airflow DAG, which unloaded all the data from Redshift to S3 and then copied it into Snowflake. The code is the following:

with DAG(process_name,
default_args=default_args,
schedule_interval=None
) as dag:

for schema in ['public']: # schema to be modified manually
files_route = f'/opt/airflow/dags/migrate_redshift_to_snowflake/mig_{schema}'
with open(files_route) as f:
tables_list = f.read()
tables_list = tables_list.split('\n')

operators = []
for table in tables_list:

unload = RedshiftToS3Operator(
task_id=f'unload_{schema}_{table}',
s3_key=f'migration/{schema}/{table}/{table}',
table=table,
schema=schema
)
operators.append(unload)

copy = S3ToSnowflakeOperator(
task_id=f'copy_{schema}_{table}',
s3_key=f'migration/{schema}/{table}/{table}',
table=table,
schema=schema,
database='dwh_es'
)
operators.append(copy)

for i in range(len(operators) - 1):
operators[i] >> operators[i + 1]

We started with that tiny naive DAG, but then we had to expand it a lot to cover all the weird cases that happen when transferring data between two different data warehouse technologies. One thing we had to do was to use a lot of different Snowflake file formats and Redshift unload options, so we had to modify our DAG a little bit. Additionally, because of timeouts when migrating the data in our largest tables, we had to make up a way of migrating by periods of time according to some column(s). And finally, we even added a functionality which processes the data before and/or after unloading it to S3. Because you know, in real life, s**h happens and the data is far from ideal. You can find our final DAG with the mentioned above features in this link:

dag_migrate_by_period
dag_migrate (simpler)

Also, we had to create our own RedshiftToS3Operator to include some features that you had seen in the migration DAGs:

from typing import List, Optional, Union

from airflow.models import BaseOperator
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.utils.decorators import apply_defaults


class RedshiftToS3Operator(BaseOperator):

template_fields = ('s3_key', 's3_bucket', 'table', 'redshift_conn_id', 'query',)
template_ext = ('.sql',)
ui_color = '#ededed'

@apply_defaults
def __init__( # pylint: disable=too-many-arguments
self,
s3_key: str,
schema: Optional[str] = None,
table: Optional[str] = None,
query: Optional[str] = None,
s3_bucket: str = "{{ var.json.get('variables_secret').s3_bucket }}",
redshift_conn_id: str = "{{ var.json.get('variables_secret').redshift_conn_id }}",
aws_conn_id: str = 'aws_default',
verify: Optional[Union[bool, str]] = None,
unload_options: Optional[List] = None,
autocommit: bool = True,
include_header: bool = False,
include_load_datetime: bool = False,
load_datetime_value: str = 'NULL',
*args, **kwargs) -> None:
super().__init__(*args, **kwargs)
self.s3_bucket = s3_bucket
self.s3_key = s3_key
self.redshift_conn_id = redshift_conn_id
self.aws_conn_id = aws_conn_id
self.verify = verify
self.unload_options = unload_options or [] # type: List
self.autocommit = autocommit
self.include_header = include_header
self.schema = schema
self.table = table
self.query = query
self.include_load_datetime = include_load_datetime
self.load_datetime_value = load_datetime_value

if self.include_header and 'HEADER' not in [uo.upper().strip() for uo in self.unload_options]:
self.unload_options = list(self.unload_options) + ['HEADER', ]

def execute(self, context):
postgres_hook = PostgresHook(postgres_conn_id=self.redshift_conn_id)

unload_options = '\n\t\t\t'.join(self.unload_options)

if not self.query:
if self.include_load_datetime:
self.query = f"SELECT *, {self.load_datetime_value} AS load_datetime FROM {self.schema}.{self.table}"
if not self.include_load_datetime:
self.query = f"SELECT * FROM {self.schema}.{self.table}"

unload_query = """
UNLOAD ($${select_query}$$)
TO 's3://{s3_bucket}/{s3_key}'
iam_role 'arn:aws:iam::role'
{unload_options};
""".format(select_query=self.query,
s3_bucket=self.s3_bucket,
s3_key=self.s3_key,
unload_options=unload_options)

self.log.info('Executing UNLOAD command...')
postgres_hook.run(unload_query, self.autocommit)
self.log.info("UNLOAD command complete...")

Once we finished this, we finally had data in all our Snowflake tables. The next step was to migrate the Airflow DAGs from Redshift to Snowflake.

All the code of the post can be found in PromoFarma’s GitHub repository. I hope you found this article helpful and don’t hesitate to post below any doubt or comment that you might have.

--

--