Importing data from Databases into GCS(Via JDBC) using Dataproc Serverless
Managing Server’s while running Spark jobs is always a challenge. Using fully managed on demand servers with Spark jobs is the need of today’s era. It helps developers in concentrating towards core application logic, instead of spending time in managing the framework. Dataproc Serverless is one such product provided by Google Cloud Platform.
Most of the transactional data still resides on Relational Database Server’s, which could be connected by using JDBC driver. MySQL, PostgreSQL and SQL Server are predominantly used for this.
Today’s world is moving towards Cloud based storage services for storing data. It has triggered the use of Google Cloud Storage Buckets.
This article is about transferring the data from JDBC Database’s to GCS Buckets via Dataproc Serverless.
Key Benefits
- Use Dataproc Serverless to run Spark batch workloads without managing Spark framework. Batch size is also configurable in this template.
- JDBCTOGCS Template is open source, configuration driven, and ready to use. Only JDBC and GCS credentials are required to execute the code.
- Supported File formats are csv, Avro, Parquet and ORC.
- GCSTOJDBC Template can be used vice versa i.e. for exporting the data from GCS to Database’s via JDBC.
Usage
- Ensure you have enabled the subnet with Private Google Access, if you are going to use “default” VPC Network generated by GCP. Still you will need to enable private access as below.
gcloud compute networks subnets update default --region=us-central1 --enable-private-ip-google-access
2. Create a GCS bucket and staging location for jar files.
3. Clone git repo in a cloud shell which is pre-installed with various tools. Alternatively use any machine pre-installed with python and Git.
git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/python
4. Obtain authentication credentials (to submit the job).
gcloud auth application-default login
5. Prerequisites
Required JAR files
These templates requires the JDBC jar file to be available in the Dataproc cluster. User has to download the required jar file and host it inside a GCS Bucket, so that it could be referred during the execution of code.
wget command to download JDBC jar file is as follows :-
- MySQL
wget http://dev.mysql.com/get/Downloads/Connector-J/mysql-connector-java-5.1.30.tar.gz
- PostgreSQL
wget https://jdbc.postgresql.org/download/postgresql-42.2.6.jar
- Microsoft SQL Server
wget https://repo1.maven.org/maven2/com/microsoft/sqlserver/mssql-jdbc/6.4.0.jre8/mssql-jdbc-6.4.0.jre8.jar
Once the jar file gets downloaded, please upload the file into a GCS Bucket and export the below variable
export JARS=<gcs-bucket-location-containing-jar-file>
JDBC URL syntax
- MySQL
jdbc:mysql://<hostname>:<port>/<dbname>?user=<username>&password=<password>
- PostgreSQL
jdbc:postgresql://<hostname>:<port>/<dbname>?user=<username>&password=<password>
- Microsoft SQL Server
jdbc:sqlserver://<hostname>:<port>;databaseName=<dbname>;user=<username>;password=<password>
Other important properties
Driver Class
1. MySQL
jdbctogcs.input.driver="com.mysql.cj.jdbc.Driver"
2. PostgreSQL
jdbctogcs.input.driver="org.postgresql.Driver"
jdbctogcs.input.driver="org.postgresql.Driver"
3. Microsoft SQL Server
jdbctogcs.input.driver="com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbctogcs.input.driver="com.microsoft.sqlserver.jdbc.SQLServerDriver"
- You can either specify the source table name or have SQL query within double quotes. Example,
jdbctogcs.input.table="employees"
jdbctogcs.input.table="(select * from employees where dept_id>10) as employees"
- partitionColumn, lowerBound, upperBound and numPartitions must be used together. If one is specified then all needs to be specified.
- Additional execution details refer spark jdbc doc
6. Usage
python main.py --template JDBCTOGCS --helpusage: main.py --template JDBCTOGCS \
--jdbctogcs.input.url JDBCTOGCS.INPUT.URL \
--jdbctogcs.input.driver JDBCTOGCS.INPUT.DRIVER \
--jdbctogcs.input.table JDBCTOGCS.INPUT.TABLE \
--jdbctogcs.output.location JDBCTOGCS.OUTPUT.LOCATION \
--jdbctogcs.output.format {avro,parquet,csv,json} \optional arguments:
-h, --help show this help message and exit
--jdbctogcs.input.partitioncolumn JDBCTOGCS.INPUT.PARTITIONCOLUMN \
--jdbctogcs.input.lowerbound JDBCTOGCS.INPUT.LOWERBOUND \
--jdbctogcs.input.upperbound JDBCTOGCS.INPUT.UPPERBOUND \
--jdbctogcs.numpartitions JDBCTOGCS.NUMPARTITIONS \
--jdbctogcs.output.mode {overwrite,append,ignore,errorifexists} \
--jdbctogcs.output.partitioncolumn JDBCTOGCS.OUTPUT.PARTITIONCOLUMN \
Arguments
jdbctogcs.input.url
: JDBC input URLjdbctogcs.input.driver
: JDBC input driver namejdbctogcs.input.table
: JDBC input table namejdbctogcs.output.location
: GCS location for output files (format:gs://BUCKET/...
)jdbctogcs.output.format
: Output file format (one of: avro,parquet,csv,json)jdbctogcs.input.partitioncolumn
(Optional): JDBC input table partition column namejdbctogcs.input.lowerbound
(Optional): JDBC input table partition column lower bound which is used to decide the partition stridejdbctogcs.input.upperbound
(Optional): JDBC input table partition column upper bound which is used to decide the partition stridejdbctogcs.numpartitions
(Optional): The maximum number of partitions that can be used for parallelism in table reading and writing. Same value will be used for both input and output jdbc connection. Default set to 10jdbctogcs.output.mode
(Optional): Output write mode (one of: append,overwrite,ignore,errorifexists) (Defaults to append)jdbctogcs.output.partitioncolumn
(Optional): Output partition column name
General execution:
export GCP_PROJECT=<gcp-project-id>
export REGION=<region>
export GCS_STAGING_LOCATION=<gcs staging location>
export SUBNET=<subnet>
export JARS="<gcs_path_to_jdbc_jar_files>/mysql-connector-java-8.0.29.jar,<gcs_path_to_jdbc_jar_files>/postgresql-42.2.6.jar,<gcs_path_to_jdbc_jar_files>/mssql-jdbc-6.4.0.jre8.jar"./bin/start.sh \
-- --template=JDBCTOGCS \
--jdbctogcs.input.url="jdbc:mysql://<hostname>:<port>/<dbname>?user=<username>&password=<password>" \
--jdbctogcs.input.driver=<jdbc-driver-class-name> \
--jdbctogcs.input.table=<input table name or subquery with where clause filter> \
--jdbctogcs.input.partitioncolumn=<optional-partition-column-name> \
--jdbctogcs.input.lowerbound=<optional-partition-start-value> \
--jdbctogcs.input.upperbound=<optional-partition-end-value> \
--jdbctogcs.numpartitions=<optional-partition-number> \
--jdbctogcs.output.location=<gcs-output-location> \
--jdbctogcs.output.mode=<optional-write-mode> \
--jdbctogcs.output.format=<output-write-format> \
--jdbctogcs.output.partitioncolumn=<optional-output-partition-column-name>
Example execution:
export GCP_PROJECT=my-gcp-proj
export REGION=us-central1
export GCS_STAGING_LOCATION=gs://my-gcp-proj/staging
export SUBNET=projects/my-gcp-proj/regions/us-central1/subnetworks/default
export JARS="gs://my-gcp-proj/jars/mysql-connector-java-8.0.29.jar,gs://my-gcp-proj/jars/postgresql-42.2.6.jar,gs://my-gcp-proj/jars/mssql-jdbc-6.4.0.jre8.jar"
- MySQL to GCS
./bin/start.sh \
-- --template=JDBCTOGCS \
--jdbctogcs.input.url="jdbc:mysql://1.1.1.1:3306/mydb?user=root&password=password123" \
--jdbctogcs.input.driver="com.mysql.cj.jdbc.Driver" \
--jdbctogcs.input.table="(select * from employees where id <10) as employees" \
--jdbctogcs.input.partitioncolumn="id" \
--jdbctogcs.input.lowerbound="11" \
--jdbctogcs.input.upperbound="20" \
--jdbctogcs.numpartitions="4" \
--jdbctogcs.output.location="gs://output_bucket/output/" \
--jdbctogcs.output.mode="overwrite" \
--jdbctogcs.output.format="csv" \
--jdbctogcs.output.partitioncolumn="department_id"
- PostgreSQL to GCS
./bin/start.sh \
-- --template=JDBCTOGCS \
--jdbctogcs.input.url="jdbc:postgresql://1.1.1.1:5432/postgres?user=postgres&password=password123" \
--jdbctogcs.input.driver="org.postgresql.Driver" \
--jdbctogcs.input.table="(select * from employees) as employees" \
--jdbctogcs.input.partitioncolumn=id \
--jdbctogcs.input.lowerbound="11" \
--jdbctogcs.input.upperbound="20" \
--jdbctogcs.numpartitions="4" \
--jdbctogcs.output.location="gs://output_bucket/output/" \
--jdbctogcs.output.mode="overwrite" \
--jdbctogcs.output.format="csv" \
--jdbctogcs.output.partitioncolumn="department_id"
- Microsoft SQL Server to GCS
./bin/start.sh \
-- --template=JDBCTOGCS \
--jdbctogcs.input.url="jdbc:sqlserver://1.1.1.1:1433;databaseName=mydb;user=sqlserver;password=password123" \
--jdbctogcs.input.driver="com.microsoft.sqlserver.jdbc.SQLServerDriver" \
--jdbctogcs.input.table="employees" \
--jdbctogcs.input.partitioncolumn=id \
--jdbctogcs.input.lowerbound="11" \
--jdbctogcs.input.upperbound="20" \
--jdbctogcs.numpartitions="4" \
--jdbctogcs.output.location="gs://output_bucket/output/" \
--jdbctogcs.output.mode="overwrite" \
--jdbctogcs.output.format="csv" \
--jdbctogcs.output.partitioncolumn="department_id"
NOTE: It will ask you to enable Dataproc Api, if not enabled already.
Setting additional spark properties
In case you need to specify spark properties supported by Dataproc Serverless like adjust the number of drivers, cores, executors etc.
You can edit the OPT_PROPERTIES values in start.sh file.
References