Python Solution for Data Migration— Technical Issues and how we Resolved them

Shantanu Dahiya
Cashify Engineering
3 min readSep 26, 2018

In the previous article, I described how the data team at Cashify built an in-house python solution for migrating data from S3 to a PostgreSQL Data Warehouse. Now I will cover some of the major technical issues we encountered and how we solved them.

Data Migration

A couple of issues we encountered are explained below, along with the solutions we came up with for them.

(1) Inserting values with carriage return (\r\n) into database

Our data had many CRLF sequences in certain columns. They could not be inserted into our PostgreSQL database, as these sequences lack PostgreSQL support.

The only solution we could find is to simply remove the carriage return (\r) from our data wherever it exists, leaving only the newline (\n):

# Input and Output are in bytestream formatdef remove_carriage_return(inp):
inp.seek(0)
replaced_str = inp.getvalue().replace('\r', '')
output = io.BytesIO()
output.write(replaced_str)
output.flush()
output.seek(0)
return output

(2) Flattening of JSON data

Data from MongoDB was exported into our data lake as CSV files with JSON fields in each row. Since our Data Warehouse is relational (i.e. stores tabular data), we had to flatten the JSON objects into columns:

def flatten_json(df, file_type):
if data_source == 'mongodb':
return pd.io.json.json_normalize(df._doc.apply(json.loads))
else:
return df

(3) Bulk insertion into the Data Warehouse

Our modified data, present in a Pandas data-frame had to be inserted into the Data Warehouse. Pandas provides an inbuilt to_sql() method to insert a dataframe using an SQLAlchemy engine. The only problem was that the insertion would take a long time to complete. After some digging around, we found that this was a known problem that occurred due to the data being inserted row by row rather than in bulk by Pandas.

The workaround we used was to export the data-frame into a CSV object (in memory) and then bulk insert the CSV object using Psycopg’s copy_expert() method:

def load_dataframe_into_db_table(df, table_name):
# Create in-memory CSV object from dataframe

inp = io.BytesIO()
df.to_csv(inp, sep='\t', header=False, index=False,
quoting=csv.QUOTE_MINIMAL, encoding='utf-8')
inp.seek(0)
output = remove_carriage_return(inp)

# Generate copy query

copy_query = """COPY %s FROM STDIN WITH
(FORMAT CSV, DELIMITER '\t',
HEADER FALSE)"""
% table_name
# Execute copy expert
conn = psycopg2.connect(host=hostname,
user=username,
password=password,
database=database)
cur = conn.cursor()
try:
cur.copy_expert(query, output)
conn.commit()
logger.info('Copied into table successfully!')
except:
conn.rollback()
logger.error('Copy into table failed. Rolling back')
raise
finally
:
cur.close()
conn.close()

As these examples illustrate, the best part about our Python solution is that it is completely customisable since we developed it in-house. So, any technical difficulties can be rectified then and there, instead of raising issues with a third party and waiting their resolution. Likewise, any new feature can be added with ease.

This solution has been deployed for more than two months now, and so far the performance has been stellar. No unforeseen issues have cropped up, and data migrates smoothly as butter.

Thanks for reading!

--

--