Moving Data with Apache Sqoop in Google Cloud Dataproc

Have a relational database? — Want to use Apache Sqoop to export this database into Cloud Storage, BigQuery, or Apache Hive? — Want to do this all via Cloud Dataproc so you only pay for what you use?

Hope you answered YES because that’s what this post is about!

Cloud Dataproc is awesome because it quickly creates a Hadoop cluster which you can then use to run your Hadoop jobs (specifically Sqoop job in this post), and then as soon as your jobs finish you can immediately delete the cluster. This is a great example of leveraging Dataproc’s ephemeral and pay-per-use model to cut costs since now you can quickly create/delete hadoop clusters, and never again leave a cluster running idle!

Here’s the quick scoop on using Sqoop (see what I did there 😏) to move data:

  • Sqoop imports data from a relational database system or a mainframe into HDFS (Hadoop Distributed File System).
  • Running Sqoop on a Dataproc Hadoop cluster gives you access to the built-in Cloud Storage connector which lets you use the Cloud Storage gs:// file prefix instead of the Hadoop hdfs:// file prefix.
  • The two previous points mean you can use Sqoop to import data directly into Cloud Storage, skipping HDFS altogether!
  • Once your data is in Cloud Storage you can simply load the data into BigQuery using the Cloud SDK bq command-line tool. Alternatively, you can have Sqoop import data directly into your Dataproc cluster’s Hive warehouse which can be based on Cloud Storage instead of HDFS by pointing hive.metastore.warehouse.dir to a GCS bucket.

You can use two different methods to submit Dataproc jobs to a cluster:

Method 1.) Manual Dataproc Job Submission

Method 2.) Automated Dataproc Job Submission using Workflow Templates

  • Create a workflow template which automatically executes the 3 previous manual steps (create, submit, delete).

It’s easier to troubleshoot job errors using the manual job submission method because you control when to delete the cluster. The automated method, using workflow templates, is ideal once you’re ready to run jobs in production since it takes care of the cluster creation, job submission, and deletion. Both methods are actually very similar to set up and are covered in more detail below.


Wait! Before you read on…

The examples below demonstrate using Sqoop to connect to a MySQL database. You can quickly and easily create your own test MySQL database in GCP by following the online Quickstart for Cloud SQL for MySQL. If you complete the quickstart, don’t forget to create the Dataproc cluster with the Cloud SQL Proxy Initialization Action so that you can easily connect to the MySQL database. Lastly, avoid incurring charges by cleaning up the database when you’re done.


3 Steps for Manual Dataproc Job Submission

1. Create a Dataproc cluster

The gcloud tool’s Dataproc cluster create command will by default create one master node VM (Virtual Machine) and two worker node VMs. All 3 node VMs are type n1-standard-4 machine. The example commands below use the minimum necessary flags for simplicity. Note: The last --properties flag is used in order to overwrite the default on-cluster Hive warehouse directory (hdfs:///user/hive/warehouse) with a Cloud Storage location gs://<GCS_BUCKET>/hive-warehouse. You persist all your Hive data (even when you delete the Dataproc cluster) by pointing this Hive warehouse directory hive.metastore.warehouse.dir to Cloud Storage.

  • Create your cluster using the command below if you’re connecting to a MySQL database hosted on-prem and therefore don’t need Cloud SQL Proxy. Make sure you assign the cluster to the VPC network that’s shared between your GCP environment and your on-prem environment.
gcloud dataproc clusters create <CLUSTER_NAME> --zone=<ZONE> --network=<VPC_NETWORK> --properties=hive:hive.metastore.warehouse.dir=gs://<GCS_BUCKET>/hive-warehouse
  • Create your cluster using the command below if you’re connecting to a Cloud SQL-MySQL database using Cloud SQL Proxy. Note: Since the default MySQL port 3306 is already occupied by Dataproc’s default Hive metadata MySQL server, you need to choose a value other than 3306 for <PORT> (e.g. 3307).
gcloud dataproc clusters create <CLUSTER_NAME> --zone=<ZONE> --scopes=default,sql-admin --initialization-actions=gs://dataproc-initialization-actions/cloud-sql-proxy/cloud-sql-proxy.sh --properties=hive:hive.metastore.warehouse.dir=gs://<GCS_BUCKET>/hive-warehouse --metadata=enable-cloud-sql-hive-metastore=false --metadata=additional-cloud-sql-instances=<PROJECT_ID>:<REGION>:<SQL_INSTANCE_NAME>=tcp:<PORT> 

2. Submit a dataproc hadoop job which runs the Sqoop import tool

  • Submit the job command below if you want Sqoop to import your database table as Avro files into Cloud Storage, which you’ll then load into BigQuery using the bq tool.
gcloud dataproc jobs submit hadoop --cluster=<CLUSTER_NAME> --class=org.apache.sqoop.Sqoop --jars=gs://<GCS_BUCKET>/sqoop-1.4.7-hadoop260.jar,gs://<GCS_BUCKET>/avro-tools-1.8.2.jar,file:///usr/share/java/mysql-connector-java-5.1.42.jar -- import -Dmapreduce.job.user.classpath.first=true --connect=jdbc:mysql://<HOSTNAME>:<PORT>/<DATABASE_NAME> --username=<DATABASE_USERNAME> --password-file=gs://<GCS_BUCKET>/passwordFile.txt --target-dir=gs://<GCS_BUCKET>/mysql_output --table=<TABLE> --as-avrodatafile
  • Submit the job command below if you want Sqoop to import your database table into a Hive table. Note: The command below is the same as above except it adds one more jar file:///usr/lib/hive/lib/hive-exec.jar to the — -jars= argument, replaces --as-avrodatafile with --hive-import , and removes the --target-dir flag.
gcloud dataproc jobs submit hadoop --cluster=<CLUSTER_NAME> --class=org.apache.sqoop.Sqoop --jars=gs://<GCS_BUCKET>/sqoop-1.4.7-hadoop260.jar,gs://<GCS_BUCKET>/avro-tools-1.8.2.jar,file:///usr/share/java/mysql-connector-java-5.1.42.jar,file:///usr/lib/hive/lib/hive-exec.jar -- import -Dmapreduce.job.user.classpath.first=true --connect=jdbc:mysql://<HOSTNAME>:<PORT>/<DATABASE_NAME> --username=<DATABASE_USERNAME> --password-file=gs://<GCS_BUCKET>/passwordFile.txt --table=<TABLE> --hive-import

3. Delete the cluster once the Dataproc job completes

gcloud dataproc clusters delete <CLUSTER_NAME>

Take a Closer Look at Step 2

Closer Look

gcloud dataproc jobs submit hadoop←This first section of the command is where you invoke the gcloud tool. The gcloud tool has A LOT of functions, so more specifically, you’re calling the hadoop job submission command. This command is nested under several layers of command groups in the gcloud tool: dataproc group -> job group -> submit group -> hadoop command.

--cluster ← This gcloud flag specifies to which dataproc cluster you’re submitting your job.

--class ← This gcloud flag specifies the main class (org.apache.sqoop.Sqoop) that you want the Dataproc job to run. Behind the scenes, Dataproc is calling the Hadoop tool and running this main class much the same as the Sqoop command line tool does. Note: Make sure you include the path to the jar file of the main class in the --jars flag (e.g. --jars=gs://<GCS_BUCKET>/sqoop-1.4.7-hadoop260.jar).

--jars ←This gcloud flag specifies a comma separated list of jar file paths (paths should point to files in GCS or in the Dataproc cluster). These jar files are provided to the MapReduce and driver classpaths in the Dataproc cluster. To run Sqoop in Dataproc, you need to provide the following jar files:

  • Latest Sqoop release (sqoop-1.4.7-hadoop260.jar) (Maven Site to check for latest)
  • JDBC drivers needed to connect to your database. For the MySQL driver, simply add the file path to the MySQL driver that exists in the Dataproc cluster (file:///usr/share/java/mysql-connector-java-5.1.42.jar). If you’re connecting to another database, DB2 for example, include the DB2 JDBC drivers necessary for that database.
  • Any jar files necessary for serializing data in your specified output format. In this example, you store the imported MySQL data as Avro files, so you include the Avro jar (avro-tools-1.8.2.jar) (Maven Site to check for latest). Note: The Sqoop tool itself requires minimum Avro version 1.8.0 to run properly, so the Avro jar is necessary even if you don’t plan on storing data in Avro format.
  • (Optional) Hive driver that already exists in Dataproc (file:///usr/lib/hive/lib/hive-exec.jar). Note: This Hive jar is only necessary when you use Sqoop to import data directly into Dataproc’s Hive warehouse.

-- import ←The single -- gcloud argument separates the gcloud specific flags on the left and the hadoop job args on the right. Since you passed the Sqoop main class to the hadoop tool (via --class flag), you must then also specify the Sqoop tool (import in this example) that you want the main class to run. After import, you specify all the arguments to the Sqoop import tool.

-Dmapreduce.job.user.classpath.first=true ←This Hadoop argument is necessary to instruct Hadoop to give preference to the user provided jars (passed via the --jars flag) over the default jars included in the cluster. Sqoop requires version 1.8.0 of Avro and the native Dataproc version of Avro, as of this writing, is 1.7.7. Setting this argument to true will give Sqoop the correct version of Avro jar which you pass in the--jars list.

-—connect= ←This Sqoop argument specifies the JDBC connection string: jdbc:mysql://<HOSTNAME>:<PORT>/<DATABASE_NAME> . Note: If you’re connecting to your database using Cloud SQL Proxy, make sure that HOSTNAME is set to ‘localhost’ and that the port you use is the same one the proxy used when you created the Dataproc cluster (e.g. make sure the cluster creation argument--metadata=additional-cloud-sql-instances=<PROJECT_ID>:<REGION>:<SQL_INSTANCE_NAME>=tcp:<PORT> and your Sqoop connect argument --connect=jdbc:mysql://<HOSTNAME>:<PORT>/<DATABASE_NAME> share the same port).

--username= ←This Sqoop argument specifies the username to use when authenticating to the relational database.

--password-file= ←This Sqoop argument specifies the path to a file in GCS (Google Cloud Storage) which contains the password to your database. Warning: Do not use the --password option instead of --password-file as this will leave your password exposed in plain sight in logs.

--target-dir= ←Traditionally when running Sqoop in Hadoop, you’d set this Sqoop argument to an HDFS (hdfs://) directory path, but since you’re using Dataproc you can take advantage of the built in GCS connector and set this to a GCS (gs://) directory path. This means Sqoop will store the imported data in the GCS location you specify, skipping HDFS altogether.

--table= ←This Sqoop argument specifies the database table you want to import.

--as-avrodatafile ←Specify this Sqoop flag to store all imported data in Avro file format. Avro format has the advantage of both compressing data in binary format and storing the table schema in the same file.

--hive-import ← Specify this Sqoop flag to store all imported data into a Hive table. Note: When you use the --hive-import flag, make sure your Dataproc cluster was created using the --properties=hive:hive.metastore.warehouse.dir=gs://<GCS_BUCKET>/hive-warehouse flag so you persist your Hive data in GCS.


4 Steps for Automated Dataproc Job Submission using Workflow Templates

1. Create workflow

  • Creating a workflow template does not itself create a Cloud Dataproc cluster or create and submit jobs. The template is simply the set of instructions. Clusters and jobs are only created and executed when a workflow template is instantiated (See Step 4).
gcloud beta dataproc workflow-templates create <TEMPLATE_ID>

2. Add workflow cluster creation properties

  • Many cluster properties have been purposely left out for simplicity
gcloud beta dataproc workflow-templates set-managed-cluster <TEMPLATE_ID> --zone=<ZONE> --cluster-name=<CLUSTER_NAME>

3. Add a job(s) to your workflow to run on the cluster

  • Add the job below to your workflow if you want Sqoop to import your database table as Avro files into Cloud Storage, which you’ll then load into BigQuery using the bq tool.
gcloud beta dataproc workflow-templates add-job hadoop --step-id=<STEP_ID> --workflow-template=<TEMPLATE_ID> --class=org.apache.sqoop.Sqoop --jars=gs://<GCS_BUCKET>/sqoop-1.4.7-hadoop260.jar,gs://<GCS_BUCKET>/avro-tools-1.8.2.jar,file:///usr/share/java/mysql-connector-java-5.1.42.jar -- import -Dmapreduce.job.user.classpath.first=true --connect=jdbc:mysql://<HOSTNAME>:<PORT>/<DATABASE_NAME> --username=sqoop --password-file=gs://<GCS_BUCKET>/passwordFile.txt --target-dir=gs://<GCS_BUCKET>/mysql_output --table=<TABLE> --as-avrodatafile
  • Add the job below to your workflow if you want Sqoop to import your database table into a Hive table. Note: The command below adds one more jar file:///usr/lib/hive/lib/hive-exec.jar to the — -jars= argument and replaces --as-avrodatafile with --hive-import
gcloud beta dataproc workflow-templates add-job hadoop --step-id=<STEP_ID> --workflow-template=<TEMPLATE_ID> --class=org.apache.sqoop.Sqoop --jars=gs://<GCS_BUCKET>/sqoop-1.4.7-hadoop260.jar,gs://<GCS_BUCKET>/avro-tools-1.8.2.jar,file:///usr/share/java/mysql-connector-java-5.1.42.jar,file:///usr/lib/hive/lib/hive-exec.jar -- import -Dmapreduce.job.user.classpath.first=true --connect=jdbc:mysql://<HOSTNAME>:<PORT>/<DATABASE_NAME> --username=<DATABASE_USERNAME> --password-file=gs://<GCS_BUCKET>/passwordFile.txt --table=<TABLE> --hive-import

4. Instantiate/Run workflow

  • Multiple (simultaneous) instantiations of a template are supported.
gcloud beta dataproc workflow-templates instantiate <TEMPLATE_ID>

Loading Your Sqoop-Import Data into BigQuery

If you use Sqoop to import your database table into Cloud Storage, you can simply load it into BigQuery using the bq command-line tool:

bq load --source_format=AVRO <YOUR_DATASET>.<YOUR_TABLE> gs://<GCS_BUCKET>/mysql_output/*.avro

Querying Your Hive Tables with Dataproc Hive Jobs

If you use Sqoop to import your database table into Hive in Dataproc, you can run SQL queries on your Hive warehouse by submitting a Hive job to a Dataproc cluster:

gcloud dataproc jobs submit hive --cluster=<CLUSTER_NAME> -e="SELECT * FROM <TABLE>"

Note: Make sure you run these Hive jobs on a Dataproc cluster that has the default Hive warehouse directory pointing to the same GCS bucket that contains your Sqoop-exported data (e.g. your cluster should be created with--properties=hive:hive.metastore.warehouse.dir=gs://<GCS_BUCKET>/hive-warehouse).


Hope this guide serves you well with many scoops of delicious data!