Oracle to BigQuery: Migrate Oracle to BigQuery using Vertex AI notebooks and GCP Dataproc Serverless
Dataproc Templates, in conjunction with VertexAI notebook and Dataproc Serverless, provide a one-stop solution for migrating data directly from Oracle Database to GCP BigQuery.
We have developed a Vertex AI notebook (Jupyter notebook) solution which uses dataproc serverless for migration. You just need to provide necessary parameters and the notebook will help you migrate a complete Oracle data-Warehouse to Cloud BigQuery.
This notebook solution is built on top of Vertex AI Jupyter Notebook and Google Cloud’s Dataproc tech stack provided by GCP.
Here are some key benefits of this notebook :
- Automatically Generate list of tables from metadata. Alternatively, user should be able to supply list of tables.
- Identify current primary key column name, and partitioned read properties.
- Automatically uses partition reads if exceeds threshold.
- Divides migration into batches and parallely migrates multiple tables.
- Notebook allow you to choose modes i.e. appending data or overwrite.
- BigQuery load automatically created table if table does not exists.
Prerequisites :
1. Enable below services in GCP project from API Console:
- Compute Engine API
- Dataproc API
- Vertex-AI API
- Vertex Notebooks API
2. Create a User-Managed Notebook in Vertex AI Workbench.
3. Clone Dataproc Template GitHub repo using the GIT tab as shown in the below screenshot
or open a terminal from the launcher window and clone using git clone.
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
4. From the folders tab, open OracleToBigQuery_notebook.ipynb notebook present in the path: dataproc-templates/notebooks/oracle2bq
5. Notebook contains a step by step process to help migrate data from Oracle to BigQuery. Run each step one by one. Detailed instructions are present in the notebook. Below are the steps for migration:
Step 1: Install required packages
Some packages needed for the migration need to be installed separately as they are not available in the notebook e.g. SQLAlchemy, JDK, etc.
! pip3 install SQLAlchemy
! pip3 install --upgrade google-cloud-pipeline-components kfp --user -q
! pip3 install cx-Oracle
Oracle client Installation
%%bash
sudo mkdir -p /opt/oracle
sudo rm -fr /opt/oracle/instantclient*
cd /opt/oracle
sudo wget --no-verbose https://download.oracle.com/otn_software/linux/instantclient/instantclient-basic-linuxx64.zip
sudo unzip instantclient-basic-linuxx64.zip
INSTANT_CLIENT_DIR=$(find /opt/oracle -maxdepth 1 -type d -name "instantclient_[0-9]*_[0-9]*" | sort | tail -1)
test -n "${INSTANT_CLIENT_DIR}" || echo "ERROR: Could not find instant client"
test -n "${INSTANT_CLIENT_DIR}" || exit 1
sudo apt-get install libaio1
sudo sh -c "echo ${INSTANT_CLIENT_DIR} > /etc/ld.so.conf.d/oracle-instantclient.conf"
sudo ldconfig
export LD_LIBRARY_PATH=${INSTANT_CLIENT_DIR}:$LD_LIBRARY_PATH
Step 2: Import Libraries : This notebook requires the Oracle driver and BigQuery Connector jar. Installation information is present in the notebook.
import sqlalchemy
import google.cloud.aiplatform as aiplatform
from kfp import dsl
from kfp.v2 import compiler
from datetime import datetime
import time
import copy
import json
import math
import pandas as pd
from google_cloud_pipeline_components.experimental.dataproc import DataprocPySparkBatchOp
import os
from pathlib import Path
Step 3: Assign Parameters : Below configurations need to be set before running the notebook:
Step 3.1: Common Parameters
PROJECT
: GCP project-idREGION
: GCP regionGCS_STAGING_LOCATION
: GCS staging location to be used for this notebook to store artifactsSUBNET
: VPC subnetJARS
: list of jars. For this notebook Oracle driver and BigQuery connector with the Dataproc template jarsMAX_PARALLELISM
: Parameter for number of jobs to run in parallel default value is 2SERVICE_ACCOUNT
: Custom service account email to use for vertex ai pipeline and dataproc job with permissions mentioned in notebook
PROJECT = "<project-id>"
REGION = "<region>"
GCS_STAGING_LOCATION = "gs://path"
SUBNET = "projects/{project}/regions/{region}/subnetworks/{subnet}"
MAX_PARALLELISM = 5 # default value is set to 5
SERVICE_ACCOUNT = "<Service-Account>"
# Do not change this parameter unless you want to refer below JARS from new location
JARS = [GCS_STAGING_LOCATION + "/jars/ojdbc8-21.7.0.0.jar",
GCS_STAGING_LOCATION + "/jars/spark-bigquery-with-dependencies_2.12-0.27.0.jar"]
# If SERVICE_ACCOUNT is not specified it will take the one attached to Notebook
if SERVICE_ACCOUNT == '':
shell_output = !gcloud auth list 2>/dev/null
SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()
print("Service Account: ",SERVICE_ACCOUNT)
Step 3.2: Oracle Parameters
ORACLE_HOST
: Oracle instance ip addressORACLE_PORT
: Oracle instance portORACLE_USERNAME
: Oracle usernameORACLE_PASSWORD
: Oracle passwordORACLE_DATABASE
: Name of database/service for Oracle connectionORACLETABLE_LIST
: List of tables you want to migrate eg: ['table1','table2'] else provide empty list for migration whole database eg : []
ORACLE_HOST = ""
ORACLE_PORT = "1521"
ORACLE_USERNAME = ""
ORACLE_PASSWORD = ""
ORACLE_DATABASE = ""
ORACLETABLE_LIST = [] # leave list empty for migrating complete database else provide tables as ['table1','table2']
Step 3.3: BigQuery Parameters
BIGQUERY_DATASET
: BigQuery Target DatasetBIGQUERY_MODE
: Mode of operation at target append/overwrite
Step 4: Generate Oracle Table List
This step generates the list of tables for migration. All tables from the database are selected if ORACLE_TABLE_LIST
parameter in step 3.2 is an empty list.
if len(ORACLETABLE_LIST) == 0:
DB = sqlalchemy.create_engine(ORACLE_URL)
with DB.connect() as conn:
print("connected to database")
results = DB.execute('SELECT table_name FROM user_tables').fetchall()
print("Total Tables = ", len(results))
for row in results:
ORACLETABLE_LIST.append(row[0])
print("list of tables for migration :")
print(ORACLETABLE_LIST)
Step 5: Get Primary Keys for tables from ORACLE source
This step fetches primary key of tables listed in ORACLETABLE_LIST from ORACLE_DATABASE
SQL_TABLE_PRIMARY_KEYS = {} #dict for storing primary keys for each table
DB = sqlalchemy.create_engine(ORACLE_URL)
with DB.connect() as conn:
for table in ORACLETABLE_LIST:
primary_keys = []
results = DB.execute("SELECT cols.column_name FROM sys.all_constraints cons, sys.all_cons_columns cols WHERE cols.table_name = '{}' AND cons.constraint_type = 'P' AND cons.status = 'ENABLED' AND cols.position = 1 AND cons.constraint_name = cols.constraint_name AND cons.owner = cols.owner".format(table)).fetchall()
for row in results:
primary_keys.append(row[0])
if primary_keys:
SQL_TABLE_PRIMARY_KEYS[table] = ",".join(primary_keys)
else:
SQL_TABLE_PRIMARY_KEYS[table] = ""
pkDF = pd.DataFrame({"table" : ORACLETABLE_LIST, "primary_keys": list(SQL_TABLE_PRIMARY_KEYS.values())})
print("Below are identified primary keys for migrating ORACLE table to Bigquery:")
pkDF
Step 6: Get Row Count of Tables and identify Partition Columns
This step uses PARTITION_THRESHOLD(default value is 1 million) parameter and any table having rows greater than PARTITION_THRESHOLD will be used for partitioned read based on Primary Keys
- CHECK_PARTITION_COLUMN_LIST : List will have table and its partitioned column if exceeds threshold
PARTITION_THRESHOLD = 1000000
CHECK_PARTITION_COLUMN_LIST={}
with DB.connect() as conn:
for table in ORACLETABLE_LIST:
results = DB.execute("SELECT count(1) FROM {}".format(table)).fetchall()
if results[0][0]>int(PARTITION_THRESHOLD) and len(SQL_TABLE_PRIMARY_KEYS.get(table).split(",")[0])>0:
column_list=SQL_TABLE_PRIMARY_KEYS.get(table).split(",")
column = column_list[0]
results_datatype = DB.execute("select DATA_TYPE from sys.all_tab_columns where TABLE_NAME = '{0}' and COLUMN_NAME = '{1}'".format(table,column)).fetchall()
if results_datatype[0][0]=="NUMBER" or results_datatype[0][0]=="INTEGER":
lowerbound = DB.execute("SELECT min({0}) from {1}".format(column,table)).fetchall()
upperbound = DB.execute("SELECT max({0}) from {1}".format(column,table)).fetchall()
numberPartitions = math.ceil((upperbound[0][0]-lowerbound[0][0])/PARTITION_THRESHOLD)
CHECK_PARTITION_COLUMN_LIST[table]=[column,lowerbound[0][0],upperbound[0][0],numberPartitions]
print(CHECK_PARTITION_COLUMN_LIST)
Step 7: Calculate Parallel Jobs for Oracle to BigQuery
This step calculates parallel jobs for Oracle to GCS migration & GCS to BigQuery migration based on the MAX_PARALLELISM parameter. The number of Oracle tables that will be migrated to BigQuery in parallel.
# calculate parallel jobs:
COMPLETE_LIST = copy.deepcopy(ORACLETABLE_LIST)
PARALLEL_JOBS = len(ORACLETABLE_LIST)//MAX_PARALLELISM
JOB_LIST = []
while len(COMPLETE_LIST) > 0:
SUB_LIST = []
for i in range(MAX_PARALLELISM):
if len(COMPLETE_LIST)>0 :
SUB_LIST.append(COMPLETE_LIST[0])
COMPLETE_LIST.pop(0)
else:
break
JOB_LIST.append(SUB_LIST)
print("list of tables for execution : ")
print(JOB_LIST)
Step 8: Create JAR files and Upload to GCS (only required to run one-time)
This step creates required JAR files and uploads it to GCS_STAGING_LOCATION
defined in prior steps.
%cd $WORKING_DIRECTORY
Downloading JDBC Oracle Driver and Bigquery Spark Connector Jar files
%%bash
wget --no-verbose https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/21.7.0.0/ojdbc8-21.7.0.0.jar
wget --no-verbose https://repo1.maven.org/maven2/com/google/cloud/spark/spark-bigquery-with-dependencies_2.12/0.27.0/spark-bigquery-with-dependencies_2.12-0.27.0.jar
! python3 ./setup.py bdist_egg --output=$PACKAGE_EGG_FILE
Copying JARS files to GCS_STAGING_LOCATION
! gsutil cp main.py $GCS_STAGING_LOCATION/dependencies/
! gsutil cp -r $PACKAGE_EGG_FILE $GCS_STAGING_LOCATION/dependencies/
! gsutil cp spark-bigquery-with-dependencies_2.12-0.27.0.jar $GCS_STAGING_LOCATION/jars/spark-bigquery-with-dependencies_2.12-0.27.0.jar
! gsutil cp ojdbc8-21.7.0.0.jar $GCS_STAGING_LOCATION/jars/ojdbc8-21.7.0.0.jar
Step 9: Execute Pipeline to Migrate tables from MsSql to BigQuery
This step starts the Vertex AI pipeline execution for Oracle table migration. Pipeline link is generated for each job. We can also see the job running in Dataproc UI in Batches section.
oracle_to_gcs_jobs = []
BIGQUERY_DATASET="<bigquery-dataset-name>"
BIGQUERY_MODE = "overwrite" # append/overwrite
TEMP_GCS_BUCKET="<temp-bucket-name>"
PYTHON_FILE_URIS = [ GCS_STAGING_LOCATION + "/dependencies/dataproc_templates_distribution.egg" ]
MAIN_PYTHON_CLASS = GCS_STAGING_LOCATION + "/dependencies/main.py"
def migrate_oracle_to_bigquery(EXECUTION_LIST):
EXECUTION_LIST = EXECUTION_LIST
aiplatform.init(project=PROJECT,staging_bucket=TEMP_GCS_BUCKET)
@dsl.pipeline(
name="python-oracle-to-bigquery-pyspark",
description="Pipeline to get data from Oracle to PySpark",
)
def pipeline(
PROJECT_ID: str = PROJECT,
LOCATION: str = REGION,
MAIN_PYTHON_CLASS: str = MAIN_PYTHON_CLASS,
JAR_FILE_URIS: list = JARS,
SUBNETWORK_URI: str = SUBNET,
SERVICE_ACCOUNT: str = SERVICE_ACCOUNT,
PYTHON_FILE_URIS: list = PYTHON_FILE_URIS
):
for table in EXECUTION_LIST:
BATCH_ID = "oracle2bigquery-{}".format(datetime.now().strftime("%s"))
oracle_to_gcs_jobs.append(BATCH_ID)
if table in CHECK_PARTITION_COLUMN_LIST.keys():
TEMPLATE_SPARK_ARGS = [
"--template=JDBCTOBIGQUERY",
"--jdbc.bigquery.input.url={}".format(JDBC_URL),
"--jdbc.bigquery.input.driver={}".format(JDBC_DRIVER),
"--jdbc.bigquery.input.table={}".format(table),
"--jdbc.bigquery.input.fetchsize={}".format(JDBC_FETCH_SIZE),
"--jdbc.bigquery.output.mode={}".format(BIGQUERY_MODE),
"--jdbc.bigquery.output.table={}".format(table),
"--jdbc.bigquery.temp.bucket.name={}".format(TEMP_GCS_BUCKET),
"--jdbc.bigquery.output.dataset={}".format(BIGQUERY_DATASET),
"--jdbc.bigquery.input.partitioncolumn={}".format(CHECK_PARTITION_COLUMN_LIST[table][0]),
"--jdbc.bigquery.input.lowerbound={}".format(CHECK_PARTITION_COLUMN_LIST[table][1]),
"--jdbc.bigquery.input.upperbound={}".format(CHECK_PARTITION_COLUMN_LIST[table][2]),
"--jdbc.bigquery.numpartitions={}".format(CHECK_PARTITION_COLUMN_LIST[table][3])
]
else:
TEMPLATE_SPARK_ARGS = [
"--template=JDBCTOBIGQUERY",
"--jdbc.bigquery.input.url={}".format(JDBC_URL),
"--jdbc.bigquery.input.driver={}".format(JDBC_DRIVER),
"--jdbc.bigquery.input.table={}".format(table),
"--jdbc.bigquery.input.fetchsize={}".format(JDBC_FETCH_SIZE),
"--jdbc.bigquery.output.mode={}".format(BIGQUERY_MODE),
"--jdbc.bigquery.output.table={}".format(table),
"--jdbc.bigquery.temp.bucket.name={}".format(TEMP_GCS_BUCKET),
"--jdbc.bigquery.output.dataset={}".format(BIGQUERY_DATASET)
]
if JDBC_SESSION_INIT_STATEMENT:
TEMPLATE_SPARK_ARGS.append("--jdbc.bigquery.input.sessioninitstatement={}".format(JDBC_SESSION_INIT_STATEMENT))
_ = DataprocPySparkBatchOp(
project=PROJECT_ID,
location=LOCATION,
batch_id=BATCH_ID,
main_python_file_uri=MAIN_PYTHON_CLASS,
jar_file_uris=JAR_FILE_URIS,
python_file_uris=PYTHON_FILE_URIS,
subnetwork_uri=SUBNETWORK_URI,
service_account=SERVICE_ACCOUNT,
runtime_config_version="1.1", # issue 665
args=TEMPLATE_SPARK_ARGS
)
time.sleep(3)
compiler.Compiler().compile(pipeline_func=pipeline, package_path="pipeline.json")
pipeline = aiplatform.PipelineJob(
display_name="pipeline",
template_path="pipeline.json",
pipeline_root=PIPELINE_ROOT,
enable_caching=False,
)
pipeline.run(service_account=SERVICE_ACCOUNT)
for execution_list in JOB_LIST:
print(execution_list)
migrate_oracle_to_bigquery(execution_list)
Step 10: Get status for tables migrated from ORACLE to BIGQUERY
This step gets the status of jobs that are executed in step 9
def get_bearer_token():
try:
#Defining Scope
CREDENTIAL_SCOPES = ["https://www.googleapis.com/auth/cloud-platform"]
#Assining credentials and project value
credentials, project_id = google.auth.default(scopes=CREDENTIAL_SCOPES)
#Refreshing credentials data
credentials.refresh(requests.Request())
#Get refreshed token
token = credentials.token
if token:
return (token,200)
else:
return "Bearer token not generated"
except Exception as error:
return ("Bearer token not generated. Error : {}".format(error),500)
from google.auth.transport import requests
import google
token = get_bearer_token()
if token[1] == 200:
print("Bearer token generated")
else:
print(token)
import requests
oracle_to_bq_status = []
job_status_url = "https://dataproc.googleapis.com/v1/projects/{}/locations/{}/batches/{}"
for job in oracle_to_gcs_jobs:
auth = "Bearer " + token[0]
url = job_status_url.format(PROJECT,REGION,job)
headers = {
'Content-Type': 'application/json; charset=UTF-8',
'Authorization': auth
}
response = requests.get(url, headers=headers)
oracle_to_bq_status.append(response.json()['state'])
statusDF = pd.DataFrame({"table" : ORACLETABLE_LIST,"oracle_to_gcs_job" : oracle_to_gcs_jobs, "oracle_to_bq_status" : oracle_to_bq_status})
statusDF
Step 11: Validate row counts of migrated tables from ORACLE to BigQuery
This steps helps in validating numbers of records of each table that is migrated from Oracle to BogQuery
oracle_row_count = []
bq_row_count = []
# Get Oracle table counts
DB = sqlalchemy.create_engine(ORACLE_URL)
with DB.connect() as conn:
for table in ORACLETABLE_LIST:
results = DB.execute("select count(*) from {}".format(table)).fetchall()
for row in results:
oracle_row_count.append(row[0])
from google.cloud import bigquery
# Construct a BigQuery client object.
client = bigquery.Client()
for table in ORACLETABLE_LIST:
results = client.query("SELECT row_count FROM {}.__TABLES__ where table_id = '{}'".format(BIGQUERY_DATASET,table))
for row in results:
bq_row_count.append(row[0])
statusDF['oracle_row_count'] = oracle_row_count
statusDF['bq_row_count'] = bq_row_count
statusDF
For any queries/suggestions reach out to: dataproc-templates-support-external@googlegroups.com