Oracle to BigQuery: Migrate Oracle to BigQuery using Vertex AI notebooks and GCP Dataproc Serverless

Neerajshivhare
Google Cloud - Community
8 min readApr 11, 2023

Dataproc Templates, in conjunction with VertexAI notebook and Dataproc Serverless, provide a one-stop solution for migrating data directly from Oracle Database to GCP BigQuery.

We have developed a Vertex AI notebook (Jupyter notebook) solution which uses dataproc serverless for migration. You just need to provide necessary parameters and the notebook will help you migrate a complete Oracle data-Warehouse to Cloud BigQuery.

This notebook solution is built on top of Vertex AI Jupyter Notebook and Google Cloud’s Dataproc tech stack provided by GCP.

Here are some key benefits of this notebook :

  • Automatically Generate list of tables from metadata. Alternatively, user should be able to supply list of tables.
  • Identify current primary key column name, and partitioned read properties.
  • Automatically uses partition reads if exceeds threshold.
  • Divides migration into batches and parallely migrates multiple tables.
  • Notebook allow you to choose modes i.e. appending data or overwrite.
  • BigQuery load automatically created table if table does not exists.

Prerequisites :

1. Enable below services in GCP project from API Console:

  • Compute Engine API
  • Dataproc API
  • Vertex-AI API
  • Vertex Notebooks API

2. Create a User-Managed Notebook in Vertex AI Workbench.

3. Clone Dataproc Template GitHub repo using the GIT tab as shown in the below screenshot

or open a terminal from the launcher window and clone using git clone.

git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git

4. From the folders tab, open OracleToBigQuery_notebook.ipynb notebook present in the path: dataproc-templates/notebooks/oracle2bq

5. Notebook contains a step by step process to help migrate data from Oracle to BigQuery. Run each step one by one. Detailed instructions are present in the notebook. Below are the steps for migration:

Step 1: Install required packages

Some packages needed for the migration need to be installed separately as they are not available in the notebook e.g. SQLAlchemy, JDK, etc.

! pip3 install SQLAlchemy
! pip3 install --upgrade google-cloud-pipeline-components kfp --user -q
! pip3 install cx-Oracle

Oracle client Installation

%%bash
sudo mkdir -p /opt/oracle
sudo rm -fr /opt/oracle/instantclient*
cd /opt/oracle
sudo wget --no-verbose https://download.oracle.com/otn_software/linux/instantclient/instantclient-basic-linuxx64.zip
sudo unzip instantclient-basic-linuxx64.zip
INSTANT_CLIENT_DIR=$(find /opt/oracle -maxdepth 1 -type d -name "instantclient_[0-9]*_[0-9]*" | sort | tail -1)
test -n "${INSTANT_CLIENT_DIR}" || echo "ERROR: Could not find instant client"
test -n "${INSTANT_CLIENT_DIR}" || exit 1
sudo apt-get install libaio1
sudo sh -c "echo ${INSTANT_CLIENT_DIR} > /etc/ld.so.conf.d/oracle-instantclient.conf"
sudo ldconfig
export LD_LIBRARY_PATH=${INSTANT_CLIENT_DIR}:$LD_LIBRARY_PATH

Step 2: Import Libraries : This notebook requires the Oracle driver and BigQuery Connector jar. Installation information is present in the notebook.

import sqlalchemy
import google.cloud.aiplatform as aiplatform
from kfp import dsl
from kfp.v2 import compiler
from datetime import datetime
import time
import copy
import json
import math
import pandas as pd
from google_cloud_pipeline_components.experimental.dataproc import DataprocPySparkBatchOp
import os
from pathlib import Path

Step 3: Assign Parameters : Below configurations need to be set before running the notebook:

Step 3.1: Common Parameters

  • PROJECT : GCP project-id
  • REGION : GCP region
  • GCS_STAGING_LOCATION : GCS staging location to be used for this notebook to store artifacts
  • SUBNET : VPC subnet
  • JARS : list of jars. For this notebook Oracle driver and BigQuery connector with the Dataproc template jars
  • MAX_PARALLELISM : Parameter for number of jobs to run in parallel default value is 2
  • SERVICE_ACCOUNT : Custom service account email to use for vertex ai pipeline and dataproc job with permissions mentioned in notebook
PROJECT = "<project-id>"
REGION = "<region>"
GCS_STAGING_LOCATION = "gs://path"
SUBNET = "projects/{project}/regions/{region}/subnetworks/{subnet}"
MAX_PARALLELISM = 5 # default value is set to 5
SERVICE_ACCOUNT = "<Service-Account>"

# Do not change this parameter unless you want to refer below JARS from new location
JARS = [GCS_STAGING_LOCATION + "/jars/ojdbc8-21.7.0.0.jar",
GCS_STAGING_LOCATION + "/jars/spark-bigquery-with-dependencies_2.12-0.27.0.jar"]
# If SERVICE_ACCOUNT is not specified it will take the one attached to Notebook
if SERVICE_ACCOUNT == '':
shell_output = !gcloud auth list 2>/dev/null
SERVICE_ACCOUNT = shell_output[2].replace("*", "").strip()
print("Service Account: ",SERVICE_ACCOUNT)

Step 3.2: Oracle Parameters

  • ORACLE_HOST : Oracle instance ip address
  • ORACLE_PORT : Oracle instance port
  • ORACLE_USERNAME : Oracle username
  • ORACLE_PASSWORD : Oracle password
  • ORACLE_DATABASE : Name of database/service for Oracle connection
  • ORACLETABLE_LIST : List of tables you want to migrate eg: ['table1','table2'] else provide empty list for migration whole database eg : []
ORACLE_HOST = ""
ORACLE_PORT = "1521"
ORACLE_USERNAME = ""
ORACLE_PASSWORD = ""
ORACLE_DATABASE = ""
ORACLETABLE_LIST = [] # leave list empty for migrating complete database else provide tables as ['table1','table2']

Step 3.3: BigQuery Parameters

  • BIGQUERY_DATASET : BigQuery Target Dataset
  • BIGQUERY_MODE : Mode of operation at target append/overwrite

Step 4: Generate Oracle Table List

This step generates the list of tables for migration. All tables from the database are selected if ORACLE_TABLE_LIST parameter in step 3.2 is an empty list.

if len(ORACLETABLE_LIST) == 0:
DB = sqlalchemy.create_engine(ORACLE_URL)
with DB.connect() as conn:
print("connected to database")
results = DB.execute('SELECT table_name FROM user_tables').fetchall()
print("Total Tables = ", len(results))
for row in results:
ORACLETABLE_LIST.append(row[0])

print("list of tables for migration :")
print(ORACLETABLE_LIST)

Step 5: Get Primary Keys for tables from ORACLE source

This step fetches primary key of tables listed in ORACLETABLE_LIST from ORACLE_DATABASE

SQL_TABLE_PRIMARY_KEYS = {} #dict for storing primary keys for each table
DB = sqlalchemy.create_engine(ORACLE_URL)
with DB.connect() as conn:
for table in ORACLETABLE_LIST:
primary_keys = []
results = DB.execute("SELECT cols.column_name FROM sys.all_constraints cons, sys.all_cons_columns cols WHERE cols.table_name = '{}' AND cons.constraint_type = 'P' AND cons.status = 'ENABLED' AND cols.position = 1 AND cons.constraint_name = cols.constraint_name AND cons.owner = cols.owner".format(table)).fetchall()
for row in results:
primary_keys.append(row[0])
if primary_keys:
SQL_TABLE_PRIMARY_KEYS[table] = ",".join(primary_keys)
else:
SQL_TABLE_PRIMARY_KEYS[table] = ""
pkDF = pd.DataFrame({"table" : ORACLETABLE_LIST, "primary_keys": list(SQL_TABLE_PRIMARY_KEYS.values())})
print("Below are identified primary keys for migrating ORACLE table to Bigquery:")
pkDF

Step 6: Get Row Count of Tables and identify Partition Columns

This step uses PARTITION_THRESHOLD(default value is 1 million) parameter and any table having rows greater than PARTITION_THRESHOLD will be used for partitioned read based on Primary Keys

  • CHECK_PARTITION_COLUMN_LIST : List will have table and its partitioned column if exceeds threshold
PARTITION_THRESHOLD = 1000000
CHECK_PARTITION_COLUMN_LIST={}
with DB.connect() as conn:
for table in ORACLETABLE_LIST:
results = DB.execute("SELECT count(1) FROM {}".format(table)).fetchall()
if results[0][0]>int(PARTITION_THRESHOLD) and len(SQL_TABLE_PRIMARY_KEYS.get(table).split(",")[0])>0:
column_list=SQL_TABLE_PRIMARY_KEYS.get(table).split(",")
column = column_list[0]
results_datatype = DB.execute("select DATA_TYPE from sys.all_tab_columns where TABLE_NAME = '{0}' and COLUMN_NAME = '{1}'".format(table,column)).fetchall()
if results_datatype[0][0]=="NUMBER" or results_datatype[0][0]=="INTEGER":
lowerbound = DB.execute("SELECT min({0}) from {1}".format(column,table)).fetchall()
upperbound = DB.execute("SELECT max({0}) from {1}".format(column,table)).fetchall()
numberPartitions = math.ceil((upperbound[0][0]-lowerbound[0][0])/PARTITION_THRESHOLD)
CHECK_PARTITION_COLUMN_LIST[table]=[column,lowerbound[0][0],upperbound[0][0],numberPartitions]

print(CHECK_PARTITION_COLUMN_LIST)

Step 7: Calculate Parallel Jobs for Oracle to BigQuery

This step calculates parallel jobs for Oracle to GCS migration & GCS to BigQuery migration based on the MAX_PARALLELISM parameter. The number of Oracle tables that will be migrated to BigQuery in parallel.

# calculate parallel jobs:
COMPLETE_LIST = copy.deepcopy(ORACLETABLE_LIST)
PARALLEL_JOBS = len(ORACLETABLE_LIST)//MAX_PARALLELISM
JOB_LIST = []
while len(COMPLETE_LIST) > 0:
SUB_LIST = []
for i in range(MAX_PARALLELISM):
if len(COMPLETE_LIST)>0 :
SUB_LIST.append(COMPLETE_LIST[0])
COMPLETE_LIST.pop(0)
else:
break
JOB_LIST.append(SUB_LIST)
print("list of tables for execution : ")
print(JOB_LIST)

Step 8: Create JAR files and Upload to GCS (only required to run one-time)

This step creates required JAR files and uploads it to GCS_STAGING_LOCATION defined in prior steps.

%cd $WORKING_DIRECTORY

Downloading JDBC Oracle Driver and Bigquery Spark Connector Jar files

%%bash
wget --no-verbose https://repo1.maven.org/maven2/com/oracle/database/jdbc/ojdbc8/21.7.0.0/ojdbc8-21.7.0.0.jar
wget --no-verbose https://repo1.maven.org/maven2/com/google/cloud/spark/spark-bigquery-with-dependencies_2.12/0.27.0/spark-bigquery-with-dependencies_2.12-0.27.0.jar
! python3 ./setup.py bdist_egg --output=$PACKAGE_EGG_FILE

Copying JARS files to GCS_STAGING_LOCATION

! gsutil cp main.py $GCS_STAGING_LOCATION/dependencies/
! gsutil cp -r $PACKAGE_EGG_FILE $GCS_STAGING_LOCATION/dependencies/
! gsutil cp spark-bigquery-with-dependencies_2.12-0.27.0.jar $GCS_STAGING_LOCATION/jars/spark-bigquery-with-dependencies_2.12-0.27.0.jar
! gsutil cp ojdbc8-21.7.0.0.jar $GCS_STAGING_LOCATION/jars/ojdbc8-21.7.0.0.jar

Step 9: Execute Pipeline to Migrate tables from MsSql to BigQuery

This step starts the Vertex AI pipeline execution for Oracle table migration. Pipeline link is generated for each job. We can also see the job running in Dataproc UI in Batches section.

oracle_to_gcs_jobs = []
BIGQUERY_DATASET="<bigquery-dataset-name>"
BIGQUERY_MODE = "overwrite" # append/overwrite
TEMP_GCS_BUCKET="<temp-bucket-name>"
PYTHON_FILE_URIS = [ GCS_STAGING_LOCATION + "/dependencies/dataproc_templates_distribution.egg" ]
MAIN_PYTHON_CLASS = GCS_STAGING_LOCATION + "/dependencies/main.py"
def migrate_oracle_to_bigquery(EXECUTION_LIST):
EXECUTION_LIST = EXECUTION_LIST
aiplatform.init(project=PROJECT,staging_bucket=TEMP_GCS_BUCKET)

@dsl.pipeline(
name="python-oracle-to-bigquery-pyspark",
description="Pipeline to get data from Oracle to PySpark",
)
def pipeline(
PROJECT_ID: str = PROJECT,
LOCATION: str = REGION,
MAIN_PYTHON_CLASS: str = MAIN_PYTHON_CLASS,
JAR_FILE_URIS: list = JARS,
SUBNETWORK_URI: str = SUBNET,
SERVICE_ACCOUNT: str = SERVICE_ACCOUNT,
PYTHON_FILE_URIS: list = PYTHON_FILE_URIS
):
for table in EXECUTION_LIST:
BATCH_ID = "oracle2bigquery-{}".format(datetime.now().strftime("%s"))
oracle_to_gcs_jobs.append(BATCH_ID)
if table in CHECK_PARTITION_COLUMN_LIST.keys():
TEMPLATE_SPARK_ARGS = [
"--template=JDBCTOBIGQUERY",
"--jdbc.bigquery.input.url={}".format(JDBC_URL),
"--jdbc.bigquery.input.driver={}".format(JDBC_DRIVER),
"--jdbc.bigquery.input.table={}".format(table),
"--jdbc.bigquery.input.fetchsize={}".format(JDBC_FETCH_SIZE),
"--jdbc.bigquery.output.mode={}".format(BIGQUERY_MODE),
"--jdbc.bigquery.output.table={}".format(table),
"--jdbc.bigquery.temp.bucket.name={}".format(TEMP_GCS_BUCKET),
"--jdbc.bigquery.output.dataset={}".format(BIGQUERY_DATASET),
"--jdbc.bigquery.input.partitioncolumn={}".format(CHECK_PARTITION_COLUMN_LIST[table][0]),
"--jdbc.bigquery.input.lowerbound={}".format(CHECK_PARTITION_COLUMN_LIST[table][1]),
"--jdbc.bigquery.input.upperbound={}".format(CHECK_PARTITION_COLUMN_LIST[table][2]),
"--jdbc.bigquery.numpartitions={}".format(CHECK_PARTITION_COLUMN_LIST[table][3])
]
else:
TEMPLATE_SPARK_ARGS = [
"--template=JDBCTOBIGQUERY",
"--jdbc.bigquery.input.url={}".format(JDBC_URL),
"--jdbc.bigquery.input.driver={}".format(JDBC_DRIVER),
"--jdbc.bigquery.input.table={}".format(table),
"--jdbc.bigquery.input.fetchsize={}".format(JDBC_FETCH_SIZE),
"--jdbc.bigquery.output.mode={}".format(BIGQUERY_MODE),
"--jdbc.bigquery.output.table={}".format(table),
"--jdbc.bigquery.temp.bucket.name={}".format(TEMP_GCS_BUCKET),
"--jdbc.bigquery.output.dataset={}".format(BIGQUERY_DATASET)
]
if JDBC_SESSION_INIT_STATEMENT:
TEMPLATE_SPARK_ARGS.append("--jdbc.bigquery.input.sessioninitstatement={}".format(JDBC_SESSION_INIT_STATEMENT))

_ = DataprocPySparkBatchOp(
project=PROJECT_ID,
location=LOCATION,
batch_id=BATCH_ID,
main_python_file_uri=MAIN_PYTHON_CLASS,
jar_file_uris=JAR_FILE_URIS,
python_file_uris=PYTHON_FILE_URIS,
subnetwork_uri=SUBNETWORK_URI,
service_account=SERVICE_ACCOUNT,
runtime_config_version="1.1", # issue 665
args=TEMPLATE_SPARK_ARGS
)
time.sleep(3)

compiler.Compiler().compile(pipeline_func=pipeline, package_path="pipeline.json")

pipeline = aiplatform.PipelineJob(
display_name="pipeline",
template_path="pipeline.json",
pipeline_root=PIPELINE_ROOT,
enable_caching=False,
)
pipeline.run(service_account=SERVICE_ACCOUNT)
for execution_list in JOB_LIST:
print(execution_list)
migrate_oracle_to_bigquery(execution_list)

Step 10: Get status for tables migrated from ORACLE to BIGQUERY

This step gets the status of jobs that are executed in step 9

def get_bearer_token():

try:
#Defining Scope
CREDENTIAL_SCOPES = ["https://www.googleapis.com/auth/cloud-platform"]

#Assining credentials and project value
credentials, project_id = google.auth.default(scopes=CREDENTIAL_SCOPES)

#Refreshing credentials data
credentials.refresh(requests.Request())

#Get refreshed token
token = credentials.token
if token:
return (token,200)
else:
return "Bearer token not generated"
except Exception as error:
return ("Bearer token not generated. Error : {}".format(error),500)
from google.auth.transport import requests
import google
token = get_bearer_token()
if token[1] == 200:
print("Bearer token generated")
else:
print(token)
import requests

oracle_to_bq_status = []
job_status_url = "https://dataproc.googleapis.com/v1/projects/{}/locations/{}/batches/{}"
for job in oracle_to_gcs_jobs:
auth = "Bearer " + token[0]
url = job_status_url.format(PROJECT,REGION,job)
headers = {
'Content-Type': 'application/json; charset=UTF-8',
'Authorization': auth
}
response = requests.get(url, headers=headers)
oracle_to_bq_status.append(response.json()['state'])
statusDF = pd.DataFrame({"table" : ORACLETABLE_LIST,"oracle_to_gcs_job" : oracle_to_gcs_jobs, "oracle_to_bq_status" : oracle_to_bq_status})
statusDF

Step 11: Validate row counts of migrated tables from ORACLE to BigQuery

This steps helps in validating numbers of records of each table that is migrated from Oracle to BogQuery

oracle_row_count = []
bq_row_count = []
# Get Oracle table counts
DB = sqlalchemy.create_engine(ORACLE_URL)
with DB.connect() as conn:
for table in ORACLETABLE_LIST:
results = DB.execute("select count(*) from {}".format(table)).fetchall()
for row in results:
oracle_row_count.append(row[0])
from google.cloud import bigquery

# Construct a BigQuery client object.
client = bigquery.Client()

for table in ORACLETABLE_LIST:
results = client.query("SELECT row_count FROM {}.__TABLES__ where table_id = '{}'".format(BIGQUERY_DATASET,table))
for row in results:
bq_row_count.append(row[0])
statusDF['oracle_row_count'] = oracle_row_count 
statusDF['bq_row_count'] = bq_row_count
statusDF

For any queries/suggestions reach out to: dataproc-templates-support-external@googlegroups.com

--

--