Shitcode Wednesday is back! Operation “Postgres 2 Elastic” in full swing

Marina Lohova
n3rdiii blog
Published in
4 min readApr 5, 2019
Actual footage of us syncing our data sources to Elastic Search

Shitcode Wednesday is back! Back with a bang! For those of you who are new here I’d like to take a moment to clarify the meaning of the word “shit”.

From Oxford dictionary:

shit — faeces., a contemptible or worthless person., something worthless; rubbish; nonsense, things or stuff, especially personal belongings.

By extension, the code that is overly complex, “boasts” poor readability and lack of insight, the code that truly STINKS shall be known from now onwards for all time as shit code.

Today I’m gonna entertain you with one fine piece of shité code that I found on my daily grind as The Greatest Software Developer of all time(tm). This particular time I was hoping to trace how sync between Postgres and ElasticSearch is executed within our system.

It’s worth saying that our app boasts four databases hosted by two separate AWS accounts. One of the databases is a remnant from the Ruby days. The eldest dates all the way back to when the codebase was written in ASP.NET ( ASP.NET, you guys!). Elastic Search has been introduced at one point to speed up read operations, the benefits of this upgrade vastly diminished by the necessity to sync all our data sources between each other, and now with Elastic, too.

I’m not gonna delve into the nitty gritty details of establishing a RabbitMQ service to queue the sync jobs, or a Celery app written in Python, or another tool written in TypeScript, nor will I mention the body of stored procedures found within each database to facilitate the sync. I’ll save the journey through this whole beautiful pipeline of code for another day.

Today I’m only gonna show you this little snippet:

export function _queryGenerator(tableName, start) {
switch (tableName) {
case 'accounts':
return {
text: `SELECT * FROM accounts WHERE id < $1 AND id >= $2
ORDER BY id ASC;`,
values: [start, start - SEARCHLIMIT],
}
...
}

Essentially, a SQL query,

SELECT * FROM accounts WHERE id < $1 AND id >= $2 ORDER BY id ASC

to be utilized as such,

const maxId = await _getMaxId(this.tableName.toLowerCase())while (true) {
const migrationQuery = _queryGenerator(this.tableName, start)
const results = await _DBGetData(migrationQuery) await bulkIndex(results, this.indexName, this.tableName) if (start > maxId + SEARCHLIMIT) {
await newDb.end()
break
} else {
start += SEARCHLIMIT
}
} //end while

Where maxId is the maximum primary key value,

SELECT max(id) FROM accounts

What happens next is us looping through the data chunks returned by _queryGenerator function in a mighty while statement,

while(true) {  const migrationQuery = _queryGenerator(this.tableName, start)  const results = await _DBGetData(migrationQuery)  await bulkIndex(results, this.indexName, this.tableName)}

Function bulkIndex being a call to Elastic API,

export async function bulkIndex(rows: any[], index: string, type: string) {   if (rows.length === 0) {
debug(chalk.yellow('Empty result set...'))
return []
}
const body = formatBulkCommands(rows, index, type) try {
const result = (await elastic.bulk({ body, _source: true })) as BulkResponse
...
}
}

The loop runs in increments of SEARCHLIMIT (1500) from start = 1 and breaks when the counter exceeds the maximum primary key value,

if (start > maxId + SEARCHLIMIT) {
await newDb.end()
break
} else {
start += SEARCHLIMIT
}
}

Let it sink in. Let it sink, you guys.

Here we see the completely fucking MANUAL implementation of SQL LIMIT and OFFSET with JavaScript “while” loop. It’s just so wacky it might even be remarkable.

Remarkable in full bloom again:

async migrate(start) {
const maxId = await _getMaxId(this.tableName.toLowerCase())

while (true) {
const migrationQuery = _queryGenerator(this.tableName, start)
const results = await _DBGetData(migrationQuery)
await bulkIndex(results, this.indexName, this.tableName)

if (start > maxId + SEARCHLIMIT) {
await newDb.end()
break
} else {
start += SEARCHLIMIT
}
}
}

Besides the extreme case of re-inventing the wheel, the code doesn’t even work as described.

It is totally cool that we are getting max primary key value, but in real life primary keys are hardly sequential, meaning there can be large gaps in values due to deleting records, and other table manipulations.

In our case,

SELECT max(id) FROM accounts;

would output

54'524'497

while

select count(*) from accounts;

would result in

1'010'436

Peep this, 54 million something something to only 1 million actual records.

54 million (max id) / 1500 (increment) = 36K iterations, or 36K potential elastic.bulk operations, oftentimes to insert 1, 2, 3, or some other minuscule number of rows, which ids existed within the current iterator values.

I guess what I’m trying to say is you need to learn to crawl before you learn to walk. You need to learn your SQL before attempting to iterate through an entire million records table with JavaScript.

A few minutes later, I was able to cut down PG >>> ES import time from literally ALL NIGHT to an hour. If I only knew that LIMIT and OFFSET existed in SQL…

export function _queryGenerator(tableName, start) {
switch (tableName) {
case 'accounts':
return {
text: `
SELECT *
FROM elastic_transfer
ORDER BY id ASC
OFFSET $1
LIMIT 5000;
`
,
values: [start],
}

Having increased SEARCHLIMIT to 5K, I was now inserting 1'000'000 records in chunks of 5K, which totaled roughly 200 elastic.bulk operations.

Quick ‘n’dirty:

let start = 0;

while (true) {
const migrationQuery = _queryGenerator(this.tableName, start)
const results = await _DBGetData(migrationQuery)

await bulkIndex(results, this.indexName, this.tableName)

if (results.length == 0) {
await newDb.end()
debug('done')
break
} else {
start += 5000
}
}
}

And thus, I haz once again confirmed my status as the Smartest Female Dev of all time. JK. Hope you had fun and learned something important today.

Much love,
Marina

--

--