Database Migration Service on Google Cloud — using REST API with Python

Sneha Choudhary
Google Cloud - Community
5 min readSep 23, 2023

This article is around exploring Database Migration Service on Google Cloud using one of the most widely used programming language — Python!

Unlike its Successor, this is a true quick read! Have faith in me😇, coz…

For details on sample source database configuration or to get acquainted with Database Migration Service, refer to my earlier article(linked as “Successor” above☝️) when you have time … coz that one needs patience to read, and a LOT of it!😎

For those who are already familiar with Database Migration service on Google Cloud Platform and assuming you have Google Cloud SDK installed … Its👇👇

Test authorise gcloud to access the Cloud Platform from the terminal of your choice

gcloud auth login
(or)
gcloud auth application-default login

Note: Reference documentation for gcloud initialisation here.

We will be referencing gcloud login credentials and default project_id(set by us) in this article. Refer to the sample code block below to get that done:

import google.auth

# Getting the credentials and project details for gcp project
credentials, project_id = google.auth.default(
scopes=['https://www.googleapis.com/auth/cloud-platform']
)

Up next, let’s create a function to retrive a database migration service resource. Sample snippet for the same:

from googleapiclient import discovery

def get_service(service_name) -> discovery.Resource:
"""Create discovery resource for database migration service.
"""
dms_service = discovery.build(service_name, 'v1', credentials=credentials)
return dms_service

#Test the function
print("Database migration service resource:", get_service('datamigration'))

Since the APIs we will invoke here are mostly asynchronous, they will return the details of long running operation(LRO) on its invocation. We need to validate the completion of operation before moving to the next dependent step. Let’s get that piece figured out:

from googleapiclient import errors
import time


def get_lro_details(lro_detail: str) -> str:
"""Fetches details of a long running operation by operationId.
Input lro_detail should be a resource name ending with
operations/{unique_id}.
"""
name = lro_detail #supply sample operationId here
service = get_service('datamigration')
request_get_lro_state = (
service.projects()
.locations()
.operations()
.get(
name=f'{name}',
)
)
responseget_lro_state = request_get_lro_state.execute()
check_lro_state = responseget_lro_state['done']
while not check_lro_state:
time.sleep(5) #adding sleep to poll at frequent interval
request_get_lro_state = (
service.projects()
.locations()
.operations()
.get(
name=f'{name}',
)
)
responseget_lro_state = request_get_lro_state.execute()
check_lro_state = responseget_lro_state['done']
if responseget_lro_state['done']:
if 'response' in responseget_lro_state:
return (
'SUCCESS:'
+ '200'
+ ':'
+ responseget_lro_state['response']['name']
)
elif 'error' in responseget_lro_state:
return (
'ERROR:'
+ str(responseget_lro_state['error']['code'])
+ ':'
+ responseget_lro_state['error']['message']
)

Feel free to test the above code with a sample operationId.

If you made it this far … Good Job!

Now that we know how to validate LRO, lets go ahead and create a connection profile. Remember that the source and target connect profile for DMS differ only in the json configuration. Quick recap of sample json for source connection profile is here and that for target is here. Here is the code for creating a connection profile once you’ve got that figured out:

def create_connection_profile(profile_name: str) -> str:
"""Creates connection profile.
"""

service = get_service('datamigration')
gcp_project_id = "" #add project id here
location = "" #add region here
config = "" #add the source/target config json here

request_create_connprofile = (
service.projects()
.locations()
.connectionProfiles()
.create(
parent=f'projects/{gcp_project_id}/locations/{location}',
connectionProfileId=profile_name,
body=config,
)
)

try:
response_create_connprofile = request_create_connprofile.execute()
except errors.HttpError as err:
return (
'ERROR:'
+ str(err.resp.status)
+ ':'
+ err.resp.reason
)


if response_create_connprofile['metadata']['@type'].endswith(
'OperationMetadata'
):
cp_name = response_create_connprofile['name']
return get_lro_details(cp_name)
else:
return (
'ERROR:'
+ '404'
+ ':'
+ 'OperationMetadata not found for Connection profile creation'
)

In order to validate the status of the connection profile creation, see the following snippet:

def get_connection_profile(profile_detail: str) -> str:
"""Get details of a specific database migration connection profile.
"""
service = get_service('datamigration')
request_get_connprofile = (
service.projects()
.locations()
.connectionProfiles()
.get(
name=f'{profile_detail}',
)
)

try:
response_get_connprofile = request_get_connprofile.execute()
except errors.HttpError as err:
return (
'ERROR:'
+ str(err.resp.status)
+ ':'
+ err.resp.reason
)

return (
'SUCCESS:'
+ '200'
+ ':'
+ response_get_connprofile['cloudsql']['publicIp']
)

After the profile is successfully created, a migration job is created next. Refer to the sample json here and use the code below to accomplish the same:

def create_migration_job() -> str:
"""Creates migration job.
"""

service = get_service('datamigration')
gcp_project_id="" #add project id here
job_name="" #add a job name identifier here
job_request_id="" #add a unique job request id here
location="" #add pregion here
config"" #add the migration job config json here


request_create_migjob = (
service.projects()
.locations()
.migrationJobs()
.create(
parent=f'projects/{gcp_project_id}/locations/{location}',
migrationJobId=job_name,
requestId=job_request_id,
body=config,
)
)

try:
response_create_migjob = request_create_migjob.execute()
except errors.HttpError as err:
return (
'ERROR:'
+ str(err.resp.status)
+ ':'
+ err.resp.reason
)


if response_create_migjob['metadata']['@type'].endswith('OperationMetadata'):
mj_name = response_create_migjob['name']
return get_lro_details(mj_name)
else:

return (
'ERROR:'
+ '404'
+ ':'
+ 'OperationMetadata not found for Migration Job creation'
)

Once the migration job is created and let’s say we want to verify the status for the same before starting it, code below will help achieve that:

def verify_migration_job(job_name: str) -> str:
"""Verifies migration job. Ensure the configuration at source is updated to allow incoming connections
from target CloudSQL instance.
"""
service = get_service('datamigration')
request_verify_migjob = (
service.projects()
.locations()
.migrationJobs()
.verify(
name=f'{job_name}',
) #use "start" or "promote" in place of "verify"
)

try:
response_verify_migjob = request_verify_migjob.execute()
except errors.HttpError as err:
return (
'ERROR:'
+ str(err.resp.status)
+ ':'
+ err.resp.reason
)

if response_verify_migjob['metadata']['@type'].endswith('OperationMetadata'):
mj_name = response_verify_migjob['name']
return get_lro_details(mj_name)
else:

return (
'ERROR:'
+ '404'
+ ':'
+ 'OperationMetadata not found for Migration Job verification'
)

Once the migration job is verified, the same code as above can be used to start the migration by invoking migrationJobs().start() function with job name as input parameter, like so 👇:

request_start_migjob = (
service.projects()
.locations()
.migrationJobs()
.start(
name=f'{job_name}',
)
)

The same code as start of migration job applies for the job’s final promotion step, which separates the source and destination Cloud SQL instances and turns the read replica instance into a primary instance. For that, the request segment needs modification as such:

request_promote_migjob = (
service.projects()
.locations()
.migrationJobs()
.promote(
name=f'{job_name}',
)
)

The task of kneading all this together, is now yours to explore!😉

As a bonus, however, we can build this logic on the response each of the functions we’ve just covered, is returning to us. For that, pseudo-logic would resemble this:

def dms() -> str:
"""Innvoke DMS Rest APIs.
"""

create_sp_response = create_connection_profile("<name_of_source_profile>")

create_sp_message = create_sp_response[create_sp_response.rfind(':') + 1 :]
create_sp_status_code = create_sp_response[create_sp_response.find(':') + 1 : create_sp_response.rfind(':')]

create_tp_response = ''
create_tp_status_code = 0
create_tp_message = ''

if create_sp_status_code == '200':
create_tp_response = create_connection_profile("<name_of_source_profile>")

create_tp_message = create_tp_response[create_tp_response.rfind(':') + 1 :]
create_tp_status_code = create_tp_response[create_tp_response.find(':') + 1 : create_tp_response.rfind(':')]

else:
return create_sp_response

#and so on for calling other functions in succession

That’s all there is to it for now!

Happy Learning! 🤓

--

--