Migrating data from one Databases into another(Via JDBC) using Dataproc Serverless
Managing Server’s while running Spark jobs is always a challenge. Using fully managed on demand servers with Spark jobs is the need of today’s era. It helps developers in concentrating towards core application logic, instead of spending time in managing the framework. Dataproc Serverless is one such product provided by Google Cloud Platform.
Most of the transactional data still resides on Relational Database Server’s, which could be connected by using JDBC driver. MySQL, PostgreSQL, and SQL Server are predominantly used for this.
Today’s world is moving towards Cloud based storage services for storing data. It has triggered the use of Google Cloud Storage Buckets.
This article is about transferring the data from one JDBC Database to another via Dataproc Serverless.
Key Benefits
- Use Dataproc Serverless to run Spark batch workloads without managing Spark framework. Batch size is also configurable in this template.
- JDBCTOJDBC Template is open source, configuration driven, and ready to use. Only JDBC and GCS credentials are required to execute the code.
Usage
- Ensure you have enabled the subnet with Private Google Access, if you are going to use “default” VPC Network generated by GCP. Still you will need to enable private access as below.
gcloud compute networks subnets update default --region=us-central1 --enable-private-ip-google-access
2. Create a GCS bucket and staging location for jar files.
3. Clone git repo in a cloud shell which is pre-installed with various tools. Alternatively use any machine pre-installed with python and Git.
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python
4. Obtain authentication credentials (to submit the job).
gcloud auth application-default login
5. Prerequisites
Required JAR files
These templates requires the JDBC jar file to be available in the Dataproc cluster. User has to download the required jar file and host it inside a GCS Bucket, so that it could be referred during the execution of code.
wget command to download JDBC jar file is as follows :-
- MySQL
wget http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.30.tar.gz
- PostgreSQL
wget https://jdbc.postgresql.org/download/postgresql-42.2.6.jar
- Microsoft SQL Server
wget https://repo1.maven.org/maven2/com/microsoft/sqlserver/mssql-jdbc/6.4.0.jre8/mssql-jdbc-6.4.0.jre8.jar
Once the jar file gets downloaded, please upload the file into a GCS Bucket and export the below variable
export JARS=<gcs-bucket-location-containing-jar-file>
JDBC URL syntax
- MySQL
jdbc:mysql://<hostname>:<port>/<dbname>?user=<username>&password=<password>
- PostgreSQL
jdbc:postgresql://<hostname>:<port>/<dbname>?user=<username>&password=<password>
- Microsoft SQL Server
jdbc:sqlserver://<hostname>:<port>;databaseName=<dbname>;user=<username>;password=<password>
Other important properties
Driver Class
1. MySQL
jdbctojdbc.input.driver="com.mysql.cj.jdbc.Driver"
2. PostgreSQL
jdbctojdbc.input.driver="org.postgresql.Driver"
jdbctojdbc.input.driver="org.postgresql.Driver"
3. Microsoft SQL Server
jdbctojdbc.input.driver="com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbctojdbc.input.driver="com.microsoft.sqlserver.jdbc.SQLServerDriver"
- You can either specify the source table name or have SQL query within double quotes. Example,
jdbctojdbc.input.table="employees"
jdbctojdbc.input.table="(select * from employees where dept_id>10) as employees"
- partitionColumn, lowerBound, upperBound and numPartitions must be used together. If one is specified then all needs to be specified.
- Additional execution details refer spark jdbc doc
6. Usage
$ python main.py --template JDBCTOJDBC --help
usage: main.py --template JDBCTOJDBC \
--jdbctojdbc.input.url JDBCTOJDBC.INPUT.URL \
--jdbctojdbc.input.driver JDBCTOJDBC.INPUT.DRIVER \
--jdbctojdbc.input.table JDBCTOJDBC.INPUT.TABLE \
--jdbctojdbc.output.url JDBCTOJDBC.OUTPUT.URL \
--jdbctojdbc.output.driver JDBCTOJDBC.OUTPUT.DRIVER \
--jdbctojdbc.output.table JDBCTOJDBC.OUTPUT.TABLE \
optional arguments:
-h, --help show this help message and exit
--jdbctojdbc.input.partitioncolumn JDBCTOJDBC.INPUT.PARTITIONCOLUMN \
--jdbctojdbc.input.lowerbound JDBCTOJDBC.INPUT.LOWERBOUND \
--jdbctojdbc.input.upperbound JDBCTOJDBC.INPUT.UPPERBOUND \
--jdbctojdbc.numpartitions JDBCTOJDBC.NUMPARTITIONS \
--jdbctojdbc.output.create_table.option JDBCTOJDBC.OUTPUT.CREATE_TABLE.OPTION \
--jdbctojdbc.output.mode {overwrite,append,ignore,errorifexists} \
--jdbctojdbc.output.batch.size JDBCTOJDBC.OUTPUT.BATCH.SIZE \
Arguments
jdbctojdbc.input.url
: JDBC input URLjdbctojdbc.input.driver
: JDBC input driver namejdbctojdbc.input.table
: JDBC input table namejdbctojdbc.output.url
: JDBC output urljdbctojdbc.output.driver
: JDBC output driver namejdbctojdbc.output.table
: JDBC output table namejdbctojdbc.input.partitioncolumn
(Optional): JDBC input table partition column namejdbctojdbc.input.lowerbound
(Optional): JDBC input table partition column lower bound which is used to decide the partition stridejdbctojdbc.input.upperbound
(Optional): JDBC input table partition column upper bound which is used to decide the partition stridejdbctojdbc.numpartitions
(Optional): The maximum number of partitions that can be used for parallelism in table reading and writing. Same value will be used for both input and output jdbc connection. Default set to 10jdbctojdbc.output.create_table.option
(Optional): This option allows setting of database-specific table and partition options when creating a output tablejdbctojdbc.output.mode
(Optional): Output write mode (one of: append,overwrite,ignore,errorifexists)(Defaults to append)jdbctojdbc.output.batch.size
(Optional): JDBC output batch size. Default set to 1000
Note:
You can specify the target table properties such as partition column using below property. This is useful when target table is not present or when write mode=overwrite and you need the target table to be created as partitioned table.
- MySQL
jdbctojdbc.output.create_table.option="PARTITION BY RANGE(id) (PARTITION p0 VALUES LESS THAN (5),PARTITION p1 VALUES LESS THAN (10),PARTITION p2 VALUES LESS THAN (15),PARTITION p3 VALUES LESS THAN MAXVALUE)"
- PostgreSQL
jdbctojdbc.output.create_table.option="PARTITION BY RANGE(id);CREATE TABLE po0 PARTITION OF <table_name> FOR VALUES FROM (MINVALUE) TO (5);CREATE TABLE po1 PARTITION OF <table_name> FOR VALUES FROM (5) TO (10);CREATE TABLE po2 PARTITION OF <table_name> FOR VALUES FROM (10) TO (15);CREATE TABLE po3 PARTITION OF <table_name> FOR VALUES FROM (15) TO (MAXVALUE);"jdbctojdbc.output.create_table.option="PARTITION BY RANGE(id);CREATE TABLE po0 PARTITION OF <table_name> FOR VALUES FROM (MINVALUE) TO (5);CREATE TABLE po1 PARTITION OF <table_name> FOR VALUES FROM (5) TO (10);CREATE TABLE po2 PARTITION OF <table_name> FOR VALUES FROM (10) TO (15);CREATE TABLE po3 PARTITION OF <table_name> FOR VALUES FROM (15) TO (MAXVALUE);"
General execution:
export GCP_PROJECT=<gcp-project-id>
export REGION=<region>
export GCS_STAGING_LOCATION=<gcs staging location>
export SUBNET=<subnet>
export JARS="<gcs_path_to_jdbc_jar_files>/mysql-connector-java-8.0.29.jar,<gcs_path_to_jdbc_jar_files>/postgresql-42.2.6.jar,<gcs_path_to_jdbc_jar_files>/mssql-jdbc-6.4.0.jre8.jar"
./bin/start.sh \
-- --template=JDBCTOJDBC \
--jdbctojdbc.input.url="jdbc:mysql://<hostname>:<port>/<dbname>?user=<username>&password=<password>" \
--jdbctojdbc.input.driver=<jdbc-driver-class-name> \
--jdbctojdbc.input.table=<input table name or subquery with where clause filter> \
--jdbctojdbc.input.partitioncolumn=<optional-partition-column-name> \
--jdbctojdbc.input.lowerbound=<optional-partition-start-value> \
--jdbctojdbc.input.upperbound=<optional-partition-end-value> \
--jdbctojdbc.numpartitions=<optional-partition-number> \
--jdbctojdbc.output.url="jdbc:mysql://<hostname>:<port>/<dbname>?user=<username>&password=<password>" \
--jdbctojdbc.output.driver=<jdbc-driver-class-name> \
--jdbctojdbc.output.table=<output-table-name> \
--jdbctojdbc.output.create_table.option=<optional-output-table-properties> \
--jdbctojdbc.output.mode=<optional-write-mode> \
--jdbctojdbc.output.batch.size=<optional-batch-size>
Example execution:
export GCP_PROJECT=my-gcp-proj
export REGION=us-central1
export GCS_STAGING_LOCATION=gs://my-gcp-proj/staging
export SUBNET=projects/my-gcp-proj/regions/us-central1/subnetworks/default
export JARS="gs://my-gcp-proj/jars/mysql-connector-java-8.0.29.jar,gs://my-gcp-proj/jars/postgresql-42.2.6.jar,gs://my-gcp-proj/jars/mssql-jdbc-6.4.0.jre8.jar"
- MySQL to MySQL
./bin/start.sh \
-- --template=JDBCTOJDBC \
--jdbctojdbc.input.url="jdbc:mysql://1.1.1.1:3306/mydb?user=root&password=password123" \
--jdbctojdbc.input.driver="com.mysql.cj.jdbc.Driver" \
--jdbctojdbc.input.table="(select * from employees where id <10) as employees" \
--jdbctojdbc.input.partitioncolumn=id \
--jdbctojdbc.input.lowerbound="1" \
--jdbctojdbc.input.upperbound="10" \
--jdbctojdbc.numpartitions="4" \
--jdbctojdbc.output.url="jdbc:mysql://1.1.1.1:3306/mydb?user=root&password=password123" \
--jdbctojdbc.output.driver="com.mysql.cj.jdbc.Driver" \
--jdbctojdbc.output.table="employees_out" \
--jdbctojdbc.output.create_table.option="PARTITION BY RANGE(id) (PARTITION p0 VALUES LESS THAN (5),PARTITION p1 VALUES LESS THAN (10),PARTITION p2 VALUES LESS THAN (15),PARTITION p3 VALUES LESS THAN MAXVALUE)" \
--jdbctojdbc.output.mode="overwrite" \
--jdbctojdbc.output.batch.size="1000"
- PostgreSQL to PostgreSQL
./bin/start.sh \
-- --template=JDBCTOJDBC \
--jdbctojdbc.input.url="jdbc:postgresql://1.1.1.1:5432/postgres?user=postgres&password=password123" \
--jdbctojdbc.input.driver="org.postgresql.Driver" \
--jdbctojdbc.input.table="(select * from employees) as employees" \
--jdbctojdbc.input.partitioncolumn=id \
--jdbctojdbc.input.lowerbound="11" \
--jdbctojdbc.input.upperbound="20" \
--jdbctojdbc.numpartitions="4" \
--jdbctojdbc.output.url="jdbc:postgresql://1.1.1.1:5432/postgres?user=postgres&password=password123" \
--jdbctojdbc.output.driver="org.postgresql.Driver" \
--jdbctojdbc.output.table="employees_out" \
--jdbctojdbc.output.create_table.option="PARTITION BY RANGE(id);CREATE TABLE po0 PARTITION OF employees_out FOR VALUES FROM (MINVALUE) TO (5);CREATE TABLE po1 PARTITION OF employees_out FOR VALUES FROM (5) TO (10);CREATE TABLE po2 PARTITION OF employees_out FOR VALUES FROM (10) TO (15);CREATE TABLE po3 PARTITION OF employees_out FOR VALUES FROM (15) TO (MAXVALUE);" \
--jdbctojdbc.output.mode="overwrite" \
--jdbctojdbc.output.batch.size="1000"
- Microsoft SQL Server to Microsoft SQL Server
./bin/start.sh \
-- --template=JDBCTOJDBC \
--jdbctojdbc.input.url="jdbc:sqlserver://1.1.1.1:1433;databaseName=mydb;user=sqlserver;password=password123" \
--jdbctojdbc.input.driver="com.microsoft.sqlserver.jdbc.SQLServerDriver" \
--jdbctojdbc.input.table="employees" \
--jdbctojdbc.input.partitioncolumn=id \
--jdbctojdbc.input.lowerbound="11" \
--jdbctojdbc.input.upperbound="20" \
--jdbctojdbc.numpartitions="4" \
--jdbctojdbc.output.url="jdbc:sqlserver://1.1.1.1:1433;databaseName=mydb;user=sqlserver;password=password123" \
--jdbctojdbc.output.driver="com.microsoft.sqlserver.jdbc.SQLServerDriver" \
--jdbctojdbc.output.table="employees_out" \
--jdbctojdbc.output.mode="overwrite" \
--jdbctojdbc.output.batch.size="1000"
- MySQL to PostgreSQL
./bin/start.sh \
-- --template=JDBCTOJDBC \
--jdbctojdbc.input.url="jdbc:mysql://1.1.1.1:3306/mydb?user=root&password=password123" \
--jdbctojdbc.input.driver="com.mysql.cj.jdbc.Driver" \
--jdbctojdbc.input.table="employees" \
--jdbctojdbc.input.partitioncolumn=id \
--jdbctojdbc.input.lowerbound="11" \
--jdbctojdbc.input.upperbound="20" \
--jdbctojdbc.numpartitions="4" \
--jdbctojdbc.output.url="jdbc:postgresql://1.1.1.1:5432/postgres?user=postgres&password=password123" \
--jdbctojdbc.output.driver="org.postgresql.Driver" \
--jdbctojdbc.output.table="employees_out" \
--jdbctojdbc.output.mode="overwrite" \
--jdbctojdbc.output.batch.size="1000"
- MySQL to Microsoft SQL Server
./bin/start.sh \
-- --template=JDBCTOJDBC \
--jdbctojdbc.input.url="jdbc:mysql://1.1.1.1:3306/mydb?user=root&password=password123" \
--jdbctojdbc.input.driver="com.mysql.cj.jdbc.Driver" \
--jdbctojdbc.input.table="employees" \
--jdbctojdbc.input.partitioncolumn=id \
--jdbctojdbc.input.lowerbound="11" \
--jdbctojdbc.input.upperbound="20" \
--jdbctojdbc.numpartitions="4" \
--jdbctojdbc.output.url="jdbc:sqlserver://1.1.1.1:1433;databaseName=mydb;user=sqlserver;password=password123" \
--jdbctojdbc.output.driver="com.microsoft.sqlserver.jdbc.SQLServerDriver" \
--jdbctojdbc.output.table="employees_out" \
--jdbctojdbc.output.mode="overwrite" \
--jdbctojdbc.output.batch.size="1000"
NOTE: It will ask you to enable Dataproc Api, if not enabled already.
Setting additional spark properties
In case you need to specify spark properties supported by Dataproc Serverless like adjust the number of drivers, cores, executors etc.
You can edit the OPT_PROPERTIES values in start.sh file.
References