“Supercharge Your Data: From MongoDB and RDS to Amazon Redshift — A Migration Journey”

Migrating Data from MongoDB and RDS to Redshift

Ram Ambadas Dandale
Ankercloud Engineering
11 min readAug 17, 2023

--

Architecture

In this article, we embark on a comprehensive exploration of migrating MongoDB databases on EC2 and Amazon RDS to Amazon Redshift. We will provide a step-by-step implementation guide, shed light on potential challenges, and offer effective solutions to ensure a seamless migration process. Our approach involves utilizing AWS Glue for creating an ETL pipeline, leveraging an S3 bucket for intermediate storage, and seamlessly loading data into Redshift using Glue.

Stay tuned as we unveil the intricacies of this migration process and equip you with valuable insights for a successful transition to Amazon Redshift, empowering your organization with scalable, high-performance data warehousing.

Amazon Redshift: Unleashing the Power of Data Warehousing

Amazon Redshift is a powerful and scalable data warehousing solution offered by Amazon Web Services (AWS). Built to handle vast amounts of data, Redshift empowers organizations to efficiently store, process, and analyze their data, enabling data-driven decision-making and unlocking valuable insights.

Advantages

  • Lightning-fast data warehousing.
  • Massive scalability for handling petabytes of data.
  • Seamless integration with other AWS services.
  • Advanced compression techniques for cost-effective storage.
  • Robust security features for data protection.
  • Cost-effective pricing options for optimized budgets.

Networking

The MongoDB database on the EC2 instance and the RDS database are both located within the default VPC. To maintain consistency, we have created the AWS Glue connections and Amazon Redshift cluster in the same default VPC. This ensures a cohesive and integrated network environment for efficient data transfer and communication between the databases and Redshift cluster.

Steps involved in migrating MongoDB database on EC2 and Amazon RDS to Amazon Redshift.

1 Creating IAM Roles and Policies

  1. Access the IAM console by clicking on “Services” in the top menu and selecting “IAM”.
  2. In the console, click on “Roles” in the left navigation pane, then select “Create role” to choose the type, define permissions, provide a name and description, and create the role.

1.1 Roles for Glue

We need to create an IAM role for the purpose of granting access to AWS Glue resources.

Pre-defined Policies
Customized policies

1.2 Roles for Redshift

We need to create an IAM role for the purpose of granting access to AWS Redshift resources.

Pre-defined Policies
Customized policies

2 Creating Redshift Cluster

  1. Access Amazon Redshift through the “Services” menu, select “Create Cluster,” and configure the cluster settings.
  2. Wait for the cluster creation process to finish, and then begin loading data and executing queries using standard SQL clients.
  3. Manage the cluster and its resources as needed, utilizing the provided username and password for access and administration.
Redshift Cluster

3 Glue Connection

AWS Glue connections provide a way to access data from different sources such as Amazon S3, MongoDB, JDBC databases, or other AWS services. A connection is defined by its connection type and configuration parameters.

  1. Access AWS Glue service, navigate to “Connections,” and click “Add connection” to create a new connection .
  2. Select “Database” as the connection type and provide the connection URL.
  3. Enter the username and password for authentication with the database server and save the connection details by clicking “Create.”

3.1 MongoDB Connection

Connection URL → ‘mongodb://<hostname>:<port>/<database>’.

Port No → 27017

3.2 RDS Connection

Connection URL → ‘jdbc:mysql://<hostname>:<port>/<database>’.

Port No → 3306

3.3 Redshift Connection

Connection URL → ‘jdbc:redshift://<cluster-endpoint>:<port>/<database>’.

Port No → 5439

Glue Connections

4 Glue Data Catalog Databases

  1. Access AWS Glue service from the “Services” menu, navigate to “Databases.”
  2. Select “Add database,” provide a name and optional description for the database, and click “Create” to create it.
Glue Data Catalog Databases

As a part of our AWS Glue setup, we have created two databases named ‘mongodb’ and ‘rds’ manually. When the Glue crawler is run, it automatically loads the tables, their metadata into these databases.

5 Glue Crawler

A glue crawler connects to a data store, progresses through a prioritized list of classifiers to determine the schema for our data, and then creates metadata tables in the AWS Glue Data Catalog.

  1. Configure crawler properties, including name and description. Specify data sources and security settings, such as the AWS (IAM) role for the crawler.
  2. Set output and scheduling options, such as the target database and table name in the AWS Glue Data Catalog, and the frequency and timing of the crawler.
  3. Review the configuration details and create the crawler.

5.1 MongoDB Crawler

Crawling MongoDB collections using a single AWS Glue crawler was not possible due to the multiple connections which the crawler was making with the collections. To overcome these challenges, it was necessary to use a single crawler for a single collection with different configurations and settings.

To create a MongoDB crawler in AWS Glue, we followed a specific naming convention in which we used “mongo” as the prefix, followed by a sequential number to identify the table, and ending with the actual table name.

5.2 RDS Crawler

By creating a Glue crawler with the name “ rds-crawler “, we were able to efficiently and accurately scan and catalog all 44 tables within our RDS database.

Glue Crawlers

6 Glue Data Catalog Table

In AWS Glue, tables are automatically created when a Glue crawler is run successfully. Therefore, we did not have to manually create Glue tables. The Glue crawler automatically infers the schema of the data and creates tables with metadata, such as column names and data types, based on the crawled data.

7 Glue Job

AWS Glue is a fully-managed ETL service that simplifies the process of moving data between data stores. Glue Jobs are a feature of AWS Glue that allow us to create, schedule, and run ETL scripts using Apache Spark or Python.

7.1 Data Source(MongoDB) to intermediate S3 bucket

  1. Access AWS Glue console and go to “Jobs.”
  2. Create a new job using Spark script editor, import libraries, and initialize GlueContext.
  3. Create DynamicFrames for MongoDB source data, add additional_options for MongoDB database and collection name.
  4. Utilize checkpoint operation for fault tolerance and resume capability.
  5. Perform data transformations, including relationalize operation for nested data.
  6. Write processed data to S3 intermediate location, ensuring data integrity.
  7. Configure job details: description, versions, workers, connections.
  8. Manually execute the job to trigger transformation process.

Code Sample with checkpoint and relationalize operation

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, max
from datetime import datetime
import boto3

args = getResolvedOptions(sys.argv, ["JOB_NAME"])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args["JOB_NAME"], args)

glue_temp_storage = "s3://artificats/glue_tmp/"
dfc_root_table_name = "root"

# Get the maximum created_at date from the previous run
s3 = boto3.client('s3')
previous_checkpoint_path = "checkpoint_folder/test/test_checkpoint.txt"
bucket_name = "production-data"

# Check if the checkpoint file exists
try:
response = s3.get_object(Bucket=bucket_name, Key=f"{previous_checkpoint_path}")
max_created_at = response['Body'].read().decode()
max_created_at = datetime.strptime(max_created_at, "%Y-%m-%d %H:%M:%S.%f")
except s3.exceptions.NoSuchKey:
# If the checkpoint file doesn't exist, set max_created_at to None
max_created_at = None

# Script generated for node Data Catalog table
DataCatalogtable_node1 = glueContext.create_dynamic_frame.from_catalog(
database="mongodb",
table_name="dev_test",
additional_options={
"database": "dev",
"collection": "test"
},
transformation_ctx="DataCatalogtable_node1"
)

# Filter the data based on the maximum created_at date from the previous run
if max_created_at:
DataCatalogtable_node1 = Filter.apply(
frame=DataCatalogtable_node1,
f=lambda x: x["created_at"] > max_created_at
)

if DataCatalogtable_node1.count() > 0:
# Update the max_created_at value
max_created_at = DataCatalogtable_node1.toDF().select(max(col("created_at"))).collect()[0][0]
max_created_at_str = max_created_at.strftime("%Y-%m-%d %H:%M:%S.%f")

# Save the new max_created_at value as the checkpoint
s3.put_object(Body=max_created_at_str, Bucket=bucket_name, Key=previous_checkpoint_path)

test_relationalize = Relationalize.apply(frame=DataCatalogtable_node1, staging_path=glue_temp_storage, name=dfc_root_table_name, transformation_ctx="test_relationalize")

test_select = test_relationalize.select(dfc_root_table_name)

test = test_select.toDF()

columns = test.columns

# iterate through column names and replace dot with underscore
new_columns = [col_name.replace(".", "_") for col_name in columns]

# assign new column names to the dataframe
test = test.toDF(*new_columns)

print("Columns detected in test are :: ", len(test.columns))

test.printSchema()

print("Records detected in test are :: ", test.count())

test.show(n=1, truncate=False)

# convert it back to glue context
datasource1 = DynamicFrame.fromDF(test, glueContext, "datasource1")

# Script generated for node S3 bucket
S3bucket_node3 = glueContext.write_dynamic_frame.from_options(
frame=datasource1,
connection_type="s3",
format="glueparquet",
connection_options={
"path": "s3://production-data/mongodb-data/test/",
"partitionKeys": [],
},
format_options={"compression": "snappy"},
transformation_ctx="S3bucket_node3",
)

print("Data loaded Successfully in S3 bucket ::- ", "s3://production-data/mongodb-data/test/")

else:
print("No new data available. Skipping data loading.")

job.commit()

Checkpoint Benefit:

  • Checkpoints in a Glue job enable fault tolerance by saving the state of the job, allowing it to resume from the last successfully completed checkpoint in case of failures or interruptions.
  • Checkpoints help ensure data integrity and prevent data duplication by resuming processing from the exact point of failure, eliminating the need to reprocess previously processed data.

Relationalize Operation Benefit:

  • The relationalize operation in Glue simplifies working with complex nested data structures, allowing you to transform and flatten nested data into relational tables.
  • Relationalize operation helps improve query performance and enables easier data analysis by providing a structured, tabular format that is compatible with SQL-based tools and analytics frameworks.

7.2 Data Source(RDS) to intermediate S3 bucket

  1. Access AWS Glue console and go to “Jobs.”
  2. Create a new job using Spark script editor, import libraries, initialize GlueContext.
  3. Create DynamicFrames for RDS source data, specify intermediate S3 location.
  4. Perform schema mapping, data transformations, and save transformed data to S3 intermediate location.
  5. Configure job specifics: description, versions, workers, connections.
  6. Enable job bookmark feature for incremental data processing.
  7. Manually run the job to initiate the transformation process.
RDS Glue Job

Job Bookmark Benefit:

  • Job bookmarking in AWS Glue allows for incremental data processing by keeping track of the last processed data, enabling efficient updates to the target dataset.
  • By utilizing job bookmarks, subsequent job runs can resume from where they left off, reducing processing time and optimizing resource utilization.

7.3 Intermediate S3 bucket to Amazon Redshift table

  1. Access the AWS Glue console and navigate to “Jobs.”
  2. To create a new job using the visual ETL editor, for source select “Amazon S3” and for target select “Amazon Redshift” and click “Create”.
  3. Select the source as S3 bucket and configure the necessary s3 path, file format as parquet and click on inferschema.
  4. Select the apply mapping, so we can verify the schema of the table.
  5. Select the target as the Amazon Redshift table and provide the connection details, schema name, table name.
  6. Configure the Redshift merge (upsert) operation for table and in matching key add the name of column which is having unique data.
  7. Review and validate the job settings, including source, target, and transformation steps.
  8. Configure job specifics such as description, versions, workers, and connections.
  9. Save the job configuration and run the job to perform the data load from the S3 bucket to the Redshift table.
Redshift Glue Job

Scheduling Glue Jobs

To ensure real-time data is populated in the target Redshift database, it is necessary to run Glue jobs more frequently. However, running these jobs more often can lead to increased service costs. Consequently, in order to address this issue, we decided to run the Glue jobs from Data Source (MongoDB and RDS) to intermediate S3 bucket on a daily basis at 3:00 AM and from intermediate S3 bucket to Amazon Redshift table. By scheduling the jobs at this specific time, we can effectively minimize the associated costs.

  1. Access the AWS Glue console and navigate to “Jobs.”
  2. Go to the job for which we want to apply the schedule in that job we can find the “Schedules” and there click on “Create “schedule”.
  3. Specify the scheduling options for the job, such as frequency, and start time.
  4. Review and validate the job schedule settings. Save the job schedule, and it will automatically execute based on the defined schedule.
Glue Job Schedule

Note: If the desired run time is 3:00 AM local time, the schedule should be set to 21:30 UTC due to the time zone difference.

Issues faced during Migration

1 Data Retrieval Failure in MongoDB for Large Datasets

Issue: The job was continuously failing while attempting to read data from MongoDB due to excessively large data in certain tables.

Solution: In order to address this issue effectively, we implemented a solution that involved incorporating a sample size while reading the data thereby reducing the strain on system resources and ensuring the successful execution of the job.

2 Nested MongoDB Data Integration with Relational Databases

Issue: The nested structure of MongoDB data posed challenges when attempting to incorporate it into a relational database such as Redshift.

Solution: To address this issue, we employed the relationalize operation to unnest the structure of the MongoDB table, resulting in a structured format that was compatible with relational databases.

3 Integration of Array-Structured Data into Redshift

Issue: In the nested structure, the data is presented in an array format, making it incompatible for direct insertion into Redshift.

Solution: To overcome the challenge of inserting data with an array structure into Redshift, we implemented a solution that involved utilizing Super datatype. Super is a datatype that enables seamless integration of complex data structures, including arrays, into Redshift.

4 Redshift Load Errors Due to String Length Exceeding DDL Limit

Issue: Encounter of “Exception thrown in awaitResult” errors while loading data from both MongoDB and RDS (Relational Database Service). Detailed examination of the logs revealed that the error was due to string length exceeding the DDL (Data Definition Language) length.

Solution: Handling Redshift Load Errors and Modifying Column Data Type for String Length Compatibility.

To address this issue, we employed the following solution:

· Analyzing Load Errors: We executed the query “SELECT * FROM STL_LOAD_ERRORS” in Redshift, which provided us with specific column names where the length of the VARCHAR exceeded the default limit. This step allowed us to identify the problematic columns causing the string length issues during the data loading process.

· Modifying Column Data Type: In the Glue job responsible for creating the table schema in Redshift, for the identified columns, we modified the data type to VARCHAR(MAX). By doing so, we ensured that data with a length of up to the maximum limit could be successfully added to the Redshift table.

--

--