everyday data engineering — data replication from postgresql to snowflake with python
Everyday Data Engineering: Databand’s blog series — A guide on common data engineering activities, written for data engineers by data engineers.

Everyday Data Engineering: Data Replication to Snowflake with Python

Kelvin Ma
Databand, an IBM Company

--

A big portion of what data engineers do is move around data from one database to another — also known as data replication and migration. Every data team has gone through a data replication project — whether it’s for the purpose of moving data to a database better suited for analytical querying, protecting an operational database from high analytical load, or doing a cloud migration process.

In these kinds of processes, it’s critical to know that data is being consistently and accurately moved. Otherwise, analytics will be broken, ML models will underperform, and engineers will spend all day troubleshooting. In this guide, we will demonstrate how to replicate data to Snowflake with Python using Snowflake best practices, while ensuring data integrity. The data replication process will take data from an on-premise PostgreSQL database to a Snowflake cloud database.

staging and data ingestion
Workflow covered in this guide

Setting up the Prerequisites

We will be using SQLAlchemy to interact with the on-premise PostgreSQL database, Snowflake’s Python connector to interact with Snowflake, and Databand’s open source library (“DBND”) to track our data and check for data integrity. We will also be using Pandas to efficiently perform transformations.

You can install all the tools used in this guide with the following command:

If any dependencies are missing, you can find more detailed installation instructions on the following documentation pages:

You will also need a PostgreSQL database, and a Snowflake account with privileges to create tables and stages. For information on account privileges, visit https://docs.snowflake.com/en/user-guide/security-access-control-privileges.html.

Connecting to the Databases

We will need two connections established: one connection to our PostgreSQL database (using SQLAlchemy engine) and another to Snowflake (using Snowflake Python Connector). We can do this with SQL alchemy and Snowflake Python Connector:

The connection string is used by DBND to track Snowflake resource usages and tables. The connection string should be in the format of:

snowflake://<user>:<password>@<full account name>/?account=<full account name>

Where the full account name will be your full Snowflake account name, including the region and cloud platform, if applicable. For example, if your account name is xyz12345 and your region and cloud platform are US East (N. Virginia) and AWS respectively, then your full account name would be

xyz12345.us-east-2.aws

Exploring the Example Data

Before we continue with the guide on data replication, let’s take a quick look at the example data used in this guide. This will help you adapt the steps in this guide to better suit your workflows.

The table columns are:

  • index
    – The indices of PostgreSQL table.
  • invoice_ID
    – Unique ID generated for each transaction.
  • transaction_amt
    – Amount exchanged in the transaction.
  • transaction_time
    – POSIX timestamp of transaction
  • transaction_fulfilled
    – Whether the transactions has been fulfilled

Querying Data from PostgreSQL

Next we query data from our on-premise PostgreSQL database, we can do this by creating a function that will use the COPY command to store a piece of our table or the full table into a local file. We will use this file for staging in a later section. For larger datasets, it is recommended to COPY the data incrementally instead.

After copying the data to a file, we will track our data in the form of a dataframe using DBND’s log_dataframe function. This dataframe will be used to perform a data integrity check at the end of our workflow. We will also change the index column to pg_index. This will prevent any confusion in the future. If the index column is no longer required for your application, you can drop that column here or modify the SQL query to not select the index column.

Creating the Snowflake Tables

Before we stage the file to be loaded into a Snowflake table, we have to first create the tables we need. Snowflake’s architecture allows for row-level updates, making for easier delta data migration processes. The best way to do this is by loading the extracted Postgres data to an intermediate table, then updating or adding rows to the final table as required. Snowflake provides support for three types of tables: temporary tables, transient tables, and permanent tables. Temporary tables are session-based and only exist until the session in which they are created has ended. When the session has ended, data that on these temporary tables are completely purged. Transient tables are more similar to permanent tables in that they exist past the session expiration time and must be explicitly dropped. Transient tables differ in that there is no Fail-safe period available for the data stored within them. Temporary and transient tables are useful for storing data that does not need to be maintained for extended periods of time. In this guide, our intermediate table will be a temporary table used to load data into our final, permanent, table. To create these tables in Snowflake:

Snowflake supports a variety of data types, throughout this guide we use simple data types.

Staging

There are two types of stages — internal and external. In either case, a stage is a reference to a location where you can store files to load or unload from Snowflake tables.

Internal Staging

An internal stage is a specified location on the local machine where data file(s) are stored to be loaded into a table. An internal stage has the advantage of being easier to set up but it does not support automated continuous data ingestion. For a one-shot bulk data migration, it is often easier to create an internal stage.

After creating an internal stage, we will load the contents onto an intermediate table — this will allow us to update our final table according.

We will also be keeping track of the Query ID of each of our queries — this will later be used to discover the Snowflake resource usage (credits, etc.) and any performance bottlenecks of our queries.

External Staging

An external stage is a specified cloud storage location where data file(s) are stored so that they can be loaded into a table. At the time of writing, Snowflake supports the following cloud storage services:

  • Amazon S3 Buckets
  • Google Cloud Storage Buckets
  • Microsoft Azure Containers

In this guide, we are using an AWS S3 Bucket as our external stage. For the required parameters of your cloud storage service, visit Snowflake’s official documentation.

External stages require only a few extra steps but are more versatile. External stages support continuous data ingestion that can be automated with SnowPipe.

We can also create a permanent storage integration for our Snowflake database that will allow Snowflake to read data from and write data to our AWS bucket. Only one storage integration can exist for each database, and by creating a new storage integration, the previously associated storage integration links will be overwritten.

As with the internal staging process, we will also be keeping track of the Query ID of each of our queries — this will later be used to discover the Snowflake resource usage (credits, etc.) and any performance bottlenecks of our queries.

Tracking Our Data

Before we complete the data replication process by updating the final Snowflake table, we will use Pandas and Databand DBND to make sure our data was replicated successfully into the intermediate table. To do this, we can select all the data from our intermediate table, then compare the contents in the intermediate table with the contents extracted from our PostgreSQL database. If the contents of both tables match, DBND will log True; otherwise, DBND logs False. To keep things simple in this guide, we will allow unmatched data to pass onto the final table, but you can fail your pipeline here or report the discrepancy to an alerting system if you require your data to be completely identical.

The Full Workflow

Now that we have all the pieces set up, we simply have to connect them together. Here is an example of what a batch replication workflow would look like with an external stage:

log_snowflake_resource_usage allows us to log the resources consumed throughout the whole process – including credits used, compilation time, bytes scanned, and more! log_snowflake_table allows us to log the full Snowflake table with schema information, and previews.

Conclusion

This guide covers how to set up a data replication using best practices from Snowflake, and good fundamentals for tracking data integrity. It uses the example of PostgreSQL to Snowflake, but the same logic can be applied to multiple databases or tables with slight modifications to the queries. Likewise, the data integrity checks and tracking functionality would operate similarly in other cloud databases like Google BigQuery and AWS Redshift.

What are the next steps? If you’re dealing with large scale, you may want to tune your approach to integrity checks, like only collecting schema information or more specific metrics, rather than matching entire tables. This will have performance benefits, but come at the cost of full assurance of 1:1 data replication. You also may want to explore operational monitoring or alerting tools to let you know when the job itself or your integrity checks fail, which is where Databand can come in to help.

Happy engineering!

To learn more on how Databand can help with data replication and data integrity — sign up for a free trial or request a demo.

--

--