Implementing Petabyte Scale Cloud Data Migration

Sandipayan
Analytics Vidhya
Published in
8 min readDec 26, 2019
Migrating data platform to cloud
Cloud Data Migration

A very big first food retail chain was migrating their data platform to AWS cloud. Currently they had IBM DataStage as structured ETL tool , Teradata & Microsoft SQL Server as OLAP databases, Tableau as reporting layer and Hortonworks Hadoop distribution in their on premise data center. In the new platform they intended to build a data lake using S3, Redshift as the OLAP database, Talend for data integration and rewire tableau to Redshift to serve reporting layer. A 10 node r4.2xlarge long running EMR cluster was commissioned to process semi structured data — like XML & JSON for Clickstream, Kitchen Video System etc. data processing — this was replacement for on premise HortonWorks Hadoop cluster.

The nature of the migration was to be “lift-and-shift”. I will not discuss strategical/management considerations here, but would discuss the implementation part. There were four main tasks to be implemented to do this migration –

  • Rewrite all the necessary DataStage data integration jobs in Talend.
  • Migrate existing data from on premise to cloud — to S3 and then copy to Redshift.
  • Set up migrating ETL pipeline to run for both on prem and cloud platform and validate data for certain period before turning off on prem pipeline.
  • Rewire reporting layer to new data source as needed.

While there had to be extensive migration plans, I am particularly going to discuss second task from above list — and what challenges we faced and how we created a nifty generic solution which saved us significant time/effort and helped accomplishing the task smoothly.

Problem:

Petabyte scale data was locked in Teradata & SQL Server, which had to be extracted by partitions, zipped and copied to S3. In next step data from S3 is to be loaded to Redshift — all through there should be validations steps to make sure data traveled correctly throughout the process. Initially the team started doing this one table at a time and copying data over VPC private/public subnet using internet, creating DataStage ETL jobs for each individual tables with specific metadata, partition key, validation logic etc. This required too much time for creating pipeline for each tables, slow data transfer and babysitting the whole process from start to end. We did not find a good generic solution available in the market to meet our specific need, so we needed a bespoke highly configurable solution to do this task seamlessly — which can run on its own with configured values, do validation and send automated report after completion of tasks. We categorized our need as below:

  • Need to create a highly configurable generic process to do the end to end task.
  • Need to use bulk data transfer method, like AWS snowball along with using public internet.

Solution:

We created a piece of software mainly using unix and python. I will discuss below how this was used to accomplish each individual tasks:

Data Migration — Task Flow

Data Extraction to File:

Major database vendors provide utility to bulk import/export data to/from their databases. Teradata & SQL Server too have their utilities to bulk export data — FastExport and bcp respectively. Most of our data was in Teradata and a small percentage was in SQL Server, so Teradata is picked to explain the process, but the same process can be run using ‘bcp’ configuration or any other databases bulk export utility as needed.

Fastexport exports data from a Teradata table in 64K blocks, this is very useful for extracting large volume of data. This utility needs specific column names in ‘varchar’ format to be provided and does not work with “select * “ . Which is not a problem as the column names can be gotten from system table and in the configuration file there can be place to omit or add a derived columns or add filters. One more problem is the default delimiter in the exported data is ‘\t’, which may not be the one we always want as the data itself can contain ‘\t’ making it difficult to distinguish between data and delimiter. Fortunately, Teradata supports user written OUTMOD routines which can be used to preprocess data before writing to a file, so this can be used to replace delimiter as desired. I found a routine written in C and compiled as dlmt_vchar.so which does the job.

This project source code can be found in the below github link:

https://github.com/sandipayan/Onprem2Aws

Major components of this process are:

snowball.ini: This is the master configuration file, which contains credentials for source databases, different paths, delimiter, zip option, columns exclusion, custom sql, copy to snowball or via internet (for small table, as snowball is a physical device which needs to be manually shipped, more on this later) etc.

input_list: list of tables in each line with comma separated filter conditions if any.

snowball_main.ksh: This is the master script which is to be called to run the process . This reads table names (and other additional info if given) in the ‘input_list’ file and iterates over the list. At the beginning it creates a master log file and write the summary information for each table like — start time, end time, time taken, raw file size, zipped size ,file path etc. In each iteration it finds the table’s metadata from dbc.columns, form the select query with columns casted in Varchar, add filter information if any or take a custom sql query for which data to be extracted.

snowball_cs_trfm.ksh: This script helps in formatting metadata retrieved from dbc.columns system table and creates an intermediate {variable}.cs.dat table, which is used in the main script.

snowball_cs.fe: This takes the above created sql file and inputs from snowball.ini and creates an intermediate file snowball_cs_seded{FE_LOG_SUFFIX}.fe with all parameter replacement which will be used in next step to run the Fastexport extraction process from a table.

snowball_main.fe: This is the final Fastexport script which uses the output of the previous process and actually produce the data extract file.

dlmt_vchar.so: This is the shared object (compiled C library) which is used in the above script to aid in preprocessing extracted data, like changing delimiter. This works with environment variable FEXP_DELIMITER, which is configured in the ‘.ini’ file.

zip_move_to_s3.ksh: Based on defined configuration value this script zips extracted data file and move to derived s3 path either via public internet or write to snowball device.

Challenges:

While this process worked smoothly for over ~95% percentage of data, there was a small fraction of data primarily in SQL server which had columns like ‘comment’ or other free form text columns for which it was difficult to use a reliable delimiter. For these cases we needed to export/import the data in a format which comes with schema along with the data. We used Sqoop in these cases for the data extraction process and used Avro format. Rest of the process goes as usual.

Snowball Setup:

Snowball Device

For large volume of data transfer using public internet may take days/weeks and be very unreliable. For small tables we transferred data using public internet and for large volume we ordered snowball devices as needed. While ordering snowball devices disk spaces (like: 50/80 GB) and s3 bucket name needs to be provided. The device itself is a rugged box with its own shipping label. Once the device shipment is received it needs to be connected in the data center and then snowball client is needed to be configured and used to write data to the device. As now the connection is local to the on premise data center, write is very fast. Once writing is done, the device can be detached and shipped back to AWS and they will move the data to same s3 bucket and keys as it is written to the snowball device. This is an straight process and well documented in ‘AWS Snowball Developer Guide’.

Loading to Redshift:

Now the data is in S3, this needs to be loaded to corresponding Redshift tables. There needs to be certain consideration to load to S3:

  • A corresponding table (DDL) is needed in Redshift where data will be copied.
  • While loading to Redshift individual columns need to be mentioned, so the columns are explicitly mapped to avoid unintentional wrong data load/errors.
  • Need to have capability to delete and reload data.
  • Log activities including successful record count load.

Redshift table creation was a mix of automated script and manual editing to include some extra columns with default values as needed. Below are the scripts used in this process:

input_list: A list of tables to be loaded.

redshift.conf: Redshift credentials information in JSON format.

s3.conf: Key & secret access key in JSON format from where s3 data to be read.

getcols.sh: Read header information from extracted data and prepares column list which to be passed in COPY command.

Redshift_Loader.py: This is the main script which uses ‘psycopg2’ library, and uses above files to iteratively load data to corresponding tables as in input_list. This also generates logs which is used for final validation.

Some times we needed to use different S3 paths to load a particular redshift table, in this case a manifest file was created to mention all S3 paths (supports wildcards), which was used for redshift table load. Data to be loaded in Redshift should be distributed across many files, instead of having only one gigantic file. This will help Redshift to load data from many different files simultaneously and speed up loading process greatly. To fully utilize maximum degree of parallelism number of files should be a multiple of number of slices available in Redshift. We don’t want the files to be too small as this will add overhead or too big because of increased processing time. Approximately file sizes should be close to 128 MB and data equally distributed among files as close as possible.

Automated Status Reporting & Validation:

There are two major steps where we want status report to be generated, saved and emailed. Two steps because data extraction and load happened in two different times (or days).

For first step some key metrics reported are — table name, partition key, extraction start ts, extraction end ts, extracted file size, extracted row count, S3 file path, File size at S3 etc. A sample report in this stage looks like below:

Extraction Report Sample

For second step while loading the same data to S3 we need similar log with basic validation like record count match. Some logs to be tracked are — Table name, S3/manifest path, time start and end, duration, number of records inserted, beginning records count, end records count etc. A sample report in this stage looks like below.

More specific criteria based on table could be implemented as needed, e.g. generating and matching a ‘R’ style table summary for both tables. But we found that its always better to do a manual validation for individual table/table sets before the cut over time — i.e. when officially we are done with history data migration and switch on simultaneous run for both cloud and on prem data pipeline. Both processes need to be monitored for some time before sunsetting on prem process.

How did you do your data platform migration to cloud — did you use proprietary tools or developed in house solution like we did? Would love to hear your experiences.

--

--