Transferring Large Datasets Between Postgres Instances Using Python and Psycopg2

Barrett Carpenter
Inside the Embassy
Published in
6 min readApr 17, 2020

Recently the Ambassador Core engineering team has been working on scaling and strengthening our app infrastructure. A big part of this has involved breaking off pieces of our api and main database and distributing them as microservices. One challenge has been finding a way to transfer potentially massive amounts of data from our primary datastore into new databases. To solve this we came up with a light weight Python script that is easy to use and can be adapted for different use cases. This article will cover the basics of how to create a similar script using Python, the Psycopg2 library, and Postgresql’s foreign data wrapper.

To go about transferring our data between databases we want to make a pg_dump of the table schemas to import into our new database, https://www.postgresql.org/docs/devel/app-pgdump.html. I won’t be going over the steps to set up a separate database in this tutorial, but in our use case at Ambassador we used an RDS Postgres database from Amazon Web Services.

To create the pg_dump run these commands from the terminal:

pg_dump \  -U “<old db username>” \  -h “<old db host>” \-s <old db name> > schema.sql

pg_dump will create a SQL file in the directory where you run the command. To target specific tables rather than creating a pg_dump of the entire datastore use -t <table_name>.

Once the pg_dump is finished import the schemas into the new database with:

psql -U new_db_user_name -d new_db_name -h new_db_host -f schema.sql

Now that the database is set up with the table schemas we can create a Postgres foreign server and foreign data wrapper (https://www.postgresql.org/docs/9.5/postgres-fdw.html). This will allow us to query our original datastore directly from the new database instance. Run the following queries on the new database.

-- necessary extensionsCREATE EXTENSION IF NOT EXISTS postgres_fdw;CREATE EXTENSION IF NOT EXISTS dblink;CREATE EXTENSION IF NOT EXISTS hstore;-- connect to the DB via SERVER and USER MAPPINGCREATE SERVER foreign_server
FOREIGN DATA WRAPPER postgres_fdw
OPTIONS (host 'new_db_host', port 'new_db_port', dbname 'new_db_name', sslmode 'require');
CREATE USER MAPPING FOR CURRENT_USER
SERVER foreign_server
OPTIONS (user '', password '');
-- create new schema if it doesn't exist and import from old DB into this DBCREATE SCHEMA IF NOT EXISTS fdw_sch;IMPORT FOREIGN SCHEMA public FROM SERVER foreign_server INTO fdw_sch;

Now if you check the new db you should see two sets of table schemas — public and fdw_sch. All of the tables in public should be empty but in fdw_sch you’ll notice it is populated with all the data from your old database. Pretty cool huh? Postgres’ foreign data wrapper creates a connection to the old db and makes the data available in your new db. It doesn’t actually live in the new db yet, but it is easy to access and run queries on.

Once our database is set up and connected to the old database we can start writing some code. First we will want to install Psycopg2, an awesome Python library for interacting with Postgres https://www.psycopg.org/docs/index.html. Start your Python project with your favorite virtual environment tool. Once your environment ready install Psycopg2.

$ pip install psycopg2

create a file in your project directory named:

transfer-data.py

at the top of transfer-data.py import Psycopg2

import psycopg2

Our project directory is set up with our environment and libraries and its finally time to write our first bit of code. Start with a db connection method:

def connect_to_db():  # create db connection and return db cursor  db = psycopg2.connect(    dbname='',    user='',    password='',    host='',)  db.set_session(autocommit=True)  # Open a cursor to perform database operations  cur = db.cursor()  print('Connected to db')  return cur

The above block of code will create a “db” object using the psycopg2.connect()method. Remember to pass the proper connection info to the variable in the .connect(). Then, using that db object, we can create and return a cursor to perform operations on the database.

The line db.set_session(autocommit=True) tells Psycopg2 to call db.commit() after each database operation. The alternative would be to call db.commit() manually after every cur.execute() call.

Next we can construct our base SQL query using the INSERT INTO clause (https://www.postgresql.org/docs/9.5/sql-insert.html).

"INSERT INTO public.table_name SELECT * FROM fdw_sch.table_name;"

This is where Postgresql’s foreign data wrapper makes this super easy. We can now query fdw_sch in our new database to get data from the old one! The above INSERT INTO SQL query will select all of the rows from the targeted fdw_sch table and copy them into the public table. This type of insert using “*” to select all rows will only work if the table schemas match up exactly. If you want to target specific rows the query would be:

"INSERT INTO public.table_name (list_row_names_here) 
SELECT list_row_names_here
FROM fdw_sch.table_name;"

To get more specific you can add a WHERE clause

"INSERT INTO public.table_name (list_row_names_here) 
SELECT list_row_names_here FROM fdw_sch.table_name
WHERE some_condition=something;"

In order to execute our SQL we use a simple python script.

def main():
# create db cursor using the connect_to_db method
cur = connect_to_db()
SQL = "INSERT INTO public.table_name SELECT * FROM fdw_sch.table_name WHERE user_id=1;"
cur.execute(SQL)
cur.close
if __name__ == "__main__":
main()

Here we transfer all of the rows from some table into our new database’s matching table where user_id is 1. We use psychopg2.execute()to run our SQL query and then close the database connection. Boom! Data transfer complete.

At this point we haven’t done anything that couldn’t be done by directly running queries on the database. Let’s write a script that takes advantage of the dynamic power we get from combining Python, Psycopg2, and Postgresql.

This time let’s assume we need to transfer all of the data from a list of tables for a set of users. Psycopg2 allows us to make our SQL dynamic while still being safe from SQL injection attacks with its sql.SQL module (https://www.psycopg.org/docs/sql.html).

from psycopg2 import sqlSQL = sql.SQL("
INSERT INTO public.{t}
SELECT * FROM fdw_sch.{t}
WHERE user_id=%s;"
).format(t=sql.Identifier(table))

The sql.SQL module provides .format and sql.Identifier so we can safely pass Python string variables to our SQL. The %s is used for passing integers. Now the cur.execute call will look like:

cur.execute(SQL, (user,))

passing the integer values, in this case user, to fill in %s as a tuple in the second argument.

Now our script will be

import psycopg2
from psycopg2 import sql
user_ids = [1, 2, 3, 4, 5]tables = [contact_info, transactions, pizza_spots, important_data_stored_here]def transfer_data(user, cur):
for table in tables:
SQL = sql.SQL("INSERT INTO public.{t} SELECT * FROM fdw_sch.{t} WHERE user_id=%s;").format(t=sql.Identifier(table))
cur.execute(SQL, (user,))
def main():
cur = connect_to_db()
for user in user_ids:
transfer_data(user, cur)

cur.close()

We’ve got a useful program that can process a lot of data dynamically! However, in a real world scenario many tables could potentially have millions of rows that need to be transferred. If we attempt to transfer that much data in one query we’re going to timeout our database. The solution is to run the transfer in batches. We can do this by implementing keyset pagination in the SQL query, and recursion with Python.

The new SQL will look like this

sql.SQL(
"INSERT INTO public.{t}
SELECT * FROM fdw_sch.{t}
WHERE (user_id=%s AND id>%s)
ORDER BY id ASC
LIMIT 20000
RETURNING id;"
).format(t=sql.Identifier(table))

Let’s break this down. We use the table’s id (index) field as our keyset and limit the pages to 20,000, so now we process only 20,000 rows per query. Order by id in ascending order so we can start with an id of 0, grab the first 20,000 rows with and id greater than 0, and then return the processed ids so we know where to start the next page.

To implement our new SQL dynamically we need a way to keep track of the highest id returned from the query in order to get the next page of data. To accomplish this we can write a method called insert_chunked_data using recursion. Finally! An actual application for recursion in the wild!

def insert_chunked_data(user, cur, key=0):
SQL = sql.SQL("INSERT INTO public.{t} SELECT * FROM fdw_sch.{t} WHERE (user_id=%s AND id>%s) ORDER BY id ASC LIMIT 20000 RETURNING id;").format(t=sql.Identifier(table))
# Execute SQL passing user and key int vars
cur.execute(SQL, (user, key))
# fetchall() fetches the returned ids
results = cur.fetchall()

# If there were less than 20000 rows then we've moved our last page of data
if len(results) < BATCH_SIZE:
return
else:
# Find highest returned uid in results to get next key
next_key = results[0]
return insert_chunked_data(user, cur, next_key)

Finally our script will look like this:

from psycopg2 import sqluser_ids = [1, 2, 3, 4, 5]tables = [contact_info, transactions, pizza_spots, important_data_stored_here]def connect_to_db():def insert_chunked_data(user, cur, key=0):def main():
cur = connect_to_db()
for user in user_ids:
insert_chunked_data(user, cur)

cur.close()

There you go, a dynamic, easily adaptable script to move your data around between databases!

--

--