Fast export large database tables — using GCP Serverless Dataproc

Shashank Agarwal
Google Cloud - Community
6 min readMay 26, 2022

Import large tables from MySQL (any JDBC) to Cloud Spanner (or GCS, BigQuery)

Spark to export large tables

Introduction

Are you looking to export/import large tables with 100s of GBs-TBs fast?

Approach to export/import data out in multiple threads parallelly ?

Prefer to preserve the schema of the export the data ?

Using a robust, proven, open source and hardened mechanism ?

A. The JDBCToGCS template which uses GCP’s Serverless Dataproc can help you in exporting the tables in a fast, efficient and multi threaded fashion.

B. Use GCSToSpanner template to import the data from GCS to Cloud Spanner.

Key Benefits

  1. Dataproc Serverless is fully managed, serverless and autoscaling.
  2. Both the templates are open source, fully customizable and ready to use for simple jobs.
  3. Spark has built in jdbc dialects for MySQL, Postgresql, DB2, Derby, H2, SQL Server, Oracle, Postgresql and Teradata databases.
  4. Ephemeral, gcp resources are release once job has ended.
  5. You can export data in avro, parquet, csv and orc formats.
  6. Allows table partitions, using which you can read the data in chunks parallelly.
  7. GCSToSpanner template can read multiple files in parallel (1 file per thread at a time) and load huge volumes of data with blazing fast speed into Cloud Spanner.

Simple Usage

This usage works well for small tables (<1 GB in size) as it will not be multi-threaded.

  1. Ensure you have enabled the subnet with Private Google Access, if you are going to use “default” VPC Network generated by GCP. Still you will need to enable private access as below. (Refer here for details)
gcloud compute networks subnets update default --region=us-central1 --enable-private-ip-google-access

2. [Preferred] Pause writes to the source database for consistency purposes. This can be done by creating read replica and pausing replication on it. Alternatively you can clone the active instance.

3. Ensure your source database is reachable from the VPC network.
Note: Cloud SQL instances are reachable using Private IP when peered with the VPC Network.

4. Create a GCS bucket and staging location for jar files.
Also download the JDBC Driver jar for the respective source database and upload into the GCS Bucket.

5. Clone git repo in a cloud shell which is pre-installed with various tools. Alternatively use any machine pre-installed with JDK 8+, Maven3+ and Git.

git clone https://github.com/GoogleCloudPlatform/dataproc-templates.git
cd dataproc-templates/java

6. Obtain authentication credentials (to submit the job).

gcloud auth application-default login

7. Execute the template, refer JDBCToGCS documentation for more details.
Replace environment specific values (gcp project, region, jdbc url, path of driver jar etc).
Eg:

export GCP_PROJECT=my-gcp-proj \
export REGION=us-central1 \
export SUBNET=projects/my-gcp-proj/regions/us-central1/subnetworks/default \
export GCS_STAGING_LOCATION=gs://my-gcp-proj/mysql-export/staging \
export JARS=gs://my-gcp-proj/mysql-export/mysql-connector-java-8.0.17.jar

bin/start.sh \
-- --template JDBCTOGCS \
--templateProperty 'jdbctogcs.jdbc.url=jdbc:mysql://192.168.16.3:3306/MyCloudSQLDB?user=root&password=root' \
--templateProperty jdbctogcs.jdbc.driver.class.name=com.mysql.cj.jdbc.Driver \
--templateProperty jdbctogcs.output.location=gs://my-gcp-proj/mysql-export/export/table1_export \
--templateProperty jdbctogcs.output.format=parquet \
--templateProperty jdbctogcs.write.mode=Overwrite \
--templateProperty 'jdbctogcs.sql=SELECT * FROM MyDB.employee'

NOTE: It will ask you to enable Dataproc Api, if not enabled already.

8. Import the data into Cloud Spanner using GCSToSpanner template. Each executor/worker loads 1 file at a time, therefore having multiple files is key to parallelism. Below configuration uses wildcard “table1_export/part*.parquet” which can with one or more files.
Eg:

export GCP_PROJECT=my-gcp-proj \
export REGION=us-central1 \
export SUBNET=projects/my-gcp-proj/regions/us-central1/subnetworks/default \
export GCS_STAGING_LOCATION=gs://my-gcp-proj/mysql-export/staging
bin/start.sh \
-- --template GCSTOSPANNER \
--templateProperty project.id=my-gcp-proj \
--templateProperty gcs.spanner.input.format=parquet \
--templateProperty gcs.spanner.input.location=gs://my-gcp-proj/mysql-export/export/table1_export/part*.parquet \
--templateProperty gcs.spanner.output.instance=spanner-inst \
--templateProperty gcs.spanner.output.database=spanner-db \
--templateProperty gcs.spanner.output.table=employee \
--templateProperty gcs.spanner.output.saveMode=Overwrite \
--templateProperty gcs.spanner.output.primaryKey=id

Advance Usage (multi threaded export/import)

Assuming you have a table schema of Employee in mysql database as below:

CREATE TABLE `employee` (
`id` bigint(20) unsigned NOT NULL,
`name` varchar(100) NOT NULL,
`email` varchar(100) NOT NULL,
`current_salary` int unsigned DEFAULT NULL,
`account_id` bigint(20) unsigned NOT NULL,
`department` varchar(100) DEFAULT NULL,
`created_at` datetime NOT NULL,
`updated_at` datetime NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;

Assuming the max employee id is 100 million (used for upperBound parameter).

Perform steps 1–4 as described in previous section.
Change step 5 by specifying the partition properties.

Execute spark job along with partition parameters, example below:

export GCP_PROJECT=my-gcp-proj \
export REGION=us-central1 \
export SUBNET=projects/my-gcp-proj/regions/us-central1/subnetworks/default \
export GCS_STAGING_LOCATION=gs://my-gcp-proj/mysql-export/staging \
export JARS=gs://my-gcp-proj/mysql-export/mysql-connector-java-8.0.17.jar

bin/start.sh \
-- --template JDBCTOGCS \
--templateProperty 'jdbctogcs.jdbc.url=jdbc:mysql://192.168.16.3:3306/MyCloudSQLDB?user=root&password=root' \
--templateProperty jdbctogcs.jdbc.driver.class.name=com.mysql.cj.jdbc.Driver \
--templateProperty jdbctogcs.output.location=gs://my-gcp-proj/mysql-export/export/table1_export \
--templateProperty jdbctogcs.output.format=parquet \
--templateProperty jdbctogcs.write.mode=OVERWRITE \
--templateProperty 'jdbctogcs.sql=select * FROM employee' \
--templateProperty jdbctogcs.sql.partitionColumn=id \
--templateProperty jdbctogcs.sql.lowerBound=0 \
--templateProperty jdbctogcs.sql.upperBound=100000000 \
--templateProperty jdbctogcs.sql.numPartitions=400

Continue step 6 same as simple usage.

Gotchas !

  1. Deciding on #partitions

Spark SQL executes query and loads entire result-set in memory. Therefore each partition should only fetch data small enough to be held in memory. For Dataproc Serverless by default each worker gets 8 GB of memory. Hence fetching 500K rows with each row size of 1KB leads to 500MB memory usage is likely to work alright.

Having more partitions along with higher number of executors can further improve speed by increasing parallelism.
You can specify executors using below configuration:

export SPARK_PROPERTIES=spark.executor.instances=50,spark.dynamicAllocation.maxExecutors=200

2. Handling of “unsigned bigint” data types

In above example, some columns are unsigned bigint like “id”, “account id”. It is common practice for DBAs to create unsigned bigint columns. However, Spark SQL by default when reads “unsigned bigint” it converts them into “decimal” datatype. Therefore when parquet files are written on gcs, generated schema will contain “decimal(20,0)” data types correspondingly.

Often this is not desirable when importing data into Spanner as Numeric datatype which it will automatically infers.

If the data in such columns is lower than signed limit, then simply casting can solve the issue. One side effect of casting is, existing indexes will not work and make the sql query very slow. To mitigate is, we should also add the current column as and then partition on it. Later on when data is imported into Cloud Spanner, drop this extra column.

Eg:

export GCP_PROJECT=my-gcp-proj \
export REGION=us-central1 \
export SUBNET=projects/my-gcp-proj/regions/us-central1/subnetworks/default \
export GCS_STAGING_LOCATION=gs://my-gcp-proj/mysql-export/staging \
export JARS=gs://my-gcp-proj/mysql-export/mysql-connector-java-8.0.17.jar

bin/start.sh \
-- --template JDBCTOGCS \
--templateProperty 'jdbctogcs.jdbc.url=jdbc:mysql://192.168.16.3:3306/MyCloudSQLDB?user=root&password=root' \
--templateProperty jdbctogcs.jdbc.driver.class.name=com.mysql.cj.jdbc.Driver \
--templateProperty jdbctogcs.output.location=gs://my-gcp-proj/mysql-export/export/table1_export \
--templateProperty jdbctogcs.output.format=parquet \
--templateProperty jdbctogcs.write.mode=OVERWRITE \
--templateProperty 'jdbctogcs.sql=select CAST(id as SIGNED) id, name, email, current_salary, CAST(account_id as SIGNED) account_id, department, created_at, updated_at, id as tbl_id FROM employee' \
--templateProperty jdbctogcs.sql.partitionColumn=tbl_id \
--templateProperty jdbctogcs.sql.lowerBound=0 \
--templateProperty jdbctogcs.sql.upperBound=100000000 \
--templateProperty jdbctogcs.sql.numPartitions=400

3. Insert batch size
You can fine tune the insert batch size for GCSToSpanner job by specifying the “gcs.spanner.output.batchInsertSize” property as below. Please note that Cloud Spanner limits 20,000 mutations per write request (as of today).

Eg:

export GCP_PROJECT=my-gcp-proj \
export REGION=us-central1 \
export SUBNET=projects/my-gcp-proj/regions/us-central1/subnetworks/default \
export GCS_STAGING_LOCATION=gs://my-gcp-proj/mysql-export/staging
bin/start.sh \
-- --template GCSTOSPANNER \
--templateProperty project.id=my-gcp-proj \
--templateProperty gcs.spanner.input.format=parquet \
--templateProperty gcs.spanner.input.location=gs://my-gcp-proj/mysql-export/export/table1_export/part*.parquet \
--templateProperty gcs.spanner.output.instance=spanner-inst \
--templateProperty gcs.spanner.output.database=spanner-db \
--templateProperty gcs.spanner.output.table=employee \
--templateProperty gcs.spanner.output.saveMode=Overwrite \
--templateProperty gcs.spanner.output.primaryKey=id \
--templateProperty gcs.spanner.output.batchInsertSize=2000

4. No Lock Reads
You may choose to instruct mysql (source db)to read uncommitted to make select faster. However, dirty read might not be acceptable in many cases, so not ideal approach.
Better approach is to use stationary clone instead.

5. Order of Table Import
Ideally you should not create constraints, indexes, foreign key references on target database. Hence order of import should not cause any issue. Else, you will need to manually take care relationships when importing table data.

Alternative Targets

  1. BigQuery
    Since database table is already exported into GCS, you can ingest data into BigQuery using GCSToBigQuery template.
  2. Another database
    Spark JDBC natively supports following databases MySQL / MariaDB, Postgresql, DB2 and Oracle. Using GCSToJDBC template (blogpost)you can ingest data into any of them.

--

--