How to safely migrate millions of rows across Postgres Production tables?

Dmytro Harazdovskiy
Shelf Engineering
Published in
5 min readNov 28, 2023

--

TL;DR: Most of the time a couple of SQL queries can be enough to move your data from one table to another. But when there are millions of rows things get rough.

Why did we need it?

As was mentioned in my previous article. We faced challenges scaling out the Postgres table while reading 🍋 of events, especially with numerous joins.

Suppose you have figured out how to tune the table and it’s time to test it. But how to do that?

Since the approach that has been used requires a whole new table, another issue arises — how to move 50🍋+ records and about 10GB from one table to another?

It can be solved easily, with a couple of 3-month range queries, and all the data are synced. But what about production? This kind of query would simply time out or at least put tons of pressure on the cluster, especially during working hours. For us, a 3-month insert consumed 30–40% CPU, and user requests started timing out.

Since there are various data syncs at night — migration has to be load-save. We don’t want our table improvement to be the complete opposite 🙂

Custom script

To control every aspect of execution we decided to create a simple custom script that could be easily run on a bastion instance or some kind of remote machine.

The smallest adequate time range we could use for a single SQL insert was 1 day ~30k rows. This range was found experimentally since it did not put too much pressure on the database cluster and had a decent response time. Also, it can be like a single unit of migration work because daily event counts are pretty stable.

Therefore you just need a script to iterate over a couple of years and all of their days to meticulously insert a range of rows day by day from source table to destination.

When migration fails we should also add a state-saving mechanism that would persist the last processed date into a file for example. So that even when an error arises — the script can recover from the last saved date in the file.

In our scenario, there was no need to read and modify data during migration so we used the INSERT INTO SELECT query combination. Therefore Progress would select the 1-day range and put it into the new table. It lets us manipulate data purely on a cluster without downloading and uploading it back.

To complete this logic, we’ve written on Typescript for Node.js script that you can easily adapt to fit your specific needs. Let’s dive in and examine the details:

import fs from 'fs';
import {getClient} from './knex-connection'; //instance of knex client with injected credentials

const finalTable = 'events_partitioned_by_date';
const initialTable = 'events';

export const move = async (): Promise<void> => {
const {startCreationDate} = await getClient()
.table(`events.${initialTable}`)
.min('created_at as startCreationDate')
.first();

const {endCreationDate} = await getClient()
.table(`events.${initialTable}`)
.max('created_at as endCreationDate')
.first();

const initialTableCount = await getClient().table(`events.${initialTable}`).count('*');

console.log({initialTableCount});

// Read progress from JSON file
let progress;
try {
const rawProgress = fs.readFileSync('progress.json');
// @ts-ignore
progress = JSON.parse(rawProgress);
} catch (err) {
// If no progress file found, use the startCreationDate as progress
progress = startCreationDate;
}

// +2 days from now just to be sure all data were synced in timeframe
const terminationDate = new Date();
terminationDate.setDate(terminationDate.getDate() + 2);

console.log({startCreationDate, endCreationDate, terminationDate, progress});

// Loop over all days from progress to terminationDate

for (let day = new Date(progress); day <= terminationDate; day.setDate(day.getDate() + 1)) {
try {
// Convert to yyyy-mm-dd format
const standardFormatDate = day.toISOString().split('T')[0];

// Execute the query
const query = getClient().raw(
`
INSERT INTO events.${finalTable}
SELECT customer_id,
event_id,
event_name,
...
FROM events.${initialTable}
WHERE DATE(created_at) = ?
on conflict do nothing
`,
[standardFormatDate]
);

console.time(`Inserted data for ${standardFormatDate}`);
await query;
console.timeEnd(`Inserted data for ${standardFormatDate}`);

// Calculate the days left to insert
const left = terminationDate - Number(day);
const remainingDays = Math.ceil(left / (1000 * 60 * 60 * 24));
console.log(`Inserted data for ${standardFormatDate}. ${remainingDays} day(s) left to insert.`);

// Save progress to the JSON file
fs.writeFileSync('progress.json', JSON.stringify(standardFormatDate));

//log initial table count and target table count every 10 days
if (remainingDays % 10 === 0) {
const finalTableCount = await getClient().table(`events.${finalTable}`).count('*');
console.log({initialTableCount, finalTableCount});
}
} catch (err) {
console.error(`Failed to handle day ${day.toISOString()}. Error: ${err.message}`);
// You can rethrow if the error must stop the execution
// throw err;
}
}

const finalTableCount = await getClient().table(`events.${finalTable}`).count('*');
console.log('Process completed!', {initialTableCount, finalTableCount});
};
  1. Our task is to move data from an initial ‘events’ table to a final ‘events_partitioned_by_date’ table where the data is partitioned by date. We employ the use of the Knex.js library and the ‘@shelf/rds’ module, which is a powerful SQL builder, to interact with the database and facilitate this task.
  2. The initial part of the move()function calculates the range of data you have to work with, defined by startCreationDate and endCreationDate. These represent the earliest and the latest dates we have records for in the database, respectively.
  3. One important feature of this script is its progress tracking. This takes place in a try-catch block where it tries to read a file progress.json. If the file exists, it gets the date where the script last left off and continues from that date onward. If the file doesn’t exist, we start the process from the very startCreationDate ones that were acquired before.
  4. A loop runs for each day within the range from your progress to the termination date, which is defined as two days in the future from the time of script execution.
  5. Within the loop, construct a daily data partitioning query to move the data from the initial ‘events’ table to the final ‘events_partitioned_by_date’ table. In this query, we select our desired columns from the initial table and insert them into the final table. Noteworthy is our use of the ON_CONFLICT clause to avoid inserting duplicated data into the final table.
  6. After each day’s operation, the script logs the remaining days left to process, thereby providing a rough estimate of the script’s completion time. The script then updates progress.json with the current date (as ‘day’), ensuring that progress is saved after every day’s operation. This allows the script to be re-run from this point if it is halted or interrupted for any reason.
  7. To keep an eye on the data count in both tables and to make sure that the migration is going well, every 10 days it logs the count of records in both the initial table and the final table. This is followed by a similar logging exercise once the script completes its execution.

The exception handling within the loop ensures that if any day’s operation fails due to an error, the script just logs the error message and moves on to the next day. This is crucial for keeping the script running even in the face of unexpected errors.

Conclusions

As was tested on production this script did not increase the CPU and RAM by more than 10–15% but it took about 10 hours to complete. I think It was worth it since minor pressure was put on the database. Consequently, the whole team could work productively without worrying something would fall off or degrade users' experience.

Hope your migrations won't put any stress on you 🙃Have a great day!

--

--