Apache Spark vs. Sqoop: Engineering a better data pipeline

Nikhil Goel
Zaloni Engineering
Published in
7 min readMar 3, 2020

As a data engineer building data pipelines in a modern data platform, one of the most common tasks is to extract data from an OLTP database or data warehouse that can be further transformed for analytical use-cases or building reports to answer business questions.

Apache Sqoop quickly became the de facto tool of choice to ingest data from these relational databases to HDFS (Hadoop Distributed File System) over the last decade when Hadoop was the primary compute environment. Once data has been persisted into HDFS, Hive or Spark can be used to transform the data for target use-case.

As adoption of Hadoop, Hive and Map Reduce slows, and the Spark usage continues to grow, taking advantage of Spark for consuming data from relational databases becomes more important.

Before we dive into the pros and cons of using Spark over Sqoop, let’s review the basics of each technology:

Apache Sqoop is a MapReduce-based utility that uses JDBC protocol to connect to a database to query and transfer data to Mappers spawned by YARN in a Hadoop cluster. When the Sqoop utility is invoked, it fetches the table metadata from the RDBMS. If the table you are trying to import has a primary key, a Sqoop job will attempt to spin-up four mappers (this can be controlled by an input argument) and parallelize the ingestion process as it splits the range of primary key across the mappers.

If the table does not have a primary key, users specify a column on which Sqoop can split the ingestion tasks. Without specifying a column on which Sqoop can parallelize the ingest process, only a single mapper task will be spawned to ingest the data.

Basic Usage:

sqoop import — connect <jdbc-url> — username <username> — password <password> — table <table-name> — target-dir <destination-hdfs-location>

For example, to import my CustomerProfile table in MySQL database to HDFS, the command would like this -

sqoop import --connect jdbc:mysql://db1.zaloni.com/customer --username ngoel --password xxxxx --table CustomerProfile --target-dir /customer/customer_profile

If the table metadata specifies a primary key or to change the split by column, simply add an input argument — split-by

sqoop import --connect jdbc:mysql://db1.zaloni.com/customer --username ngoel --password xxxxx --table CustomerProfile --split-by customer_id --target-dir /customer/customer_profile

For further performance tuning, add input argument -m or — num-mappers <n>, the default value is 4.

To only fetch a subset of the data, use the — where <condition> argument to specify a where clause expression, example -

sqoop import --connect jdbc:mysql://db1.zaloni.com/customer --username ngoel --password xxxxx --table CustomerProfile --target-dir /customer/customer_profile --where “state = ‘WA’”

For data engineers who want to query or use this ingested data using hive, there are additional options in Sqoop utility to import in an existing hive table or create a hive table before importing the data.

sqoop import --connect jdbc:mysql://db1.zaloni.com/customer 
--username ngoel --password xxxxx --table CustomerProfile --target-dir /customer/customer_profile
--hive-import --create-hive-table --hive-table myHiveDB.CustomerProfile

Apache Spark is a general-purpose distributed data processing and analytics engine. Spark can be used in standalone mode or using external resource managers such as YARN, Kubernetes or Mesos. Spark works on the concept of RDDs (resilient distributed datasets) which represents data as a distributed collection. Spark engine can apply operations to query and transform the dataset in parallel over multiple Spark executors. Dataframes are an extension to RDDs which imposes a schema to the distributed collection of data. Dataframes can be defined to consume from multiple data sources including files, relational databases, NoSQL databases, streams, etc.

Let’s look at a how at a basic example of using Spark dataframes to extract data from a JDBC source:

  • Creating dataframe
val df = spark.read.format(“jdbc”).option(“url”,”jdbc:mysql://db1.zaloni.com/customer”).option(“driver”,”com.mysql.jdbc.driver”).option(“dbtable”,”customerProfile”).option(“user”,”ngoel”).option(“password”,”xxxxxx”).load()
  • Performance Options

Similar to Sqoop, Spark also allows you to define split or partition for data to be extracted in parallel from different tasks spawned by Spark executors. ParitionColumn is an equivalent of — split-by option in Sqoop. LowerBound and UpperBound define the min and max range of primary key, which is then used in conjunction with numPartitions that lets Spark parallelize the data extraction by dividing the range into multiple tasks. NumPartitions also defines the maximum number of “concurrent” JDBC connections made to the databases. The actual concurrent JDBC connection might be lower than this number based on the number of Spark executors available for the job.

val df = spark.read.format(“jdbc”).option(“url”,”jdbc:mysql://db1.zaloni.com/customer”).option(“driver”,”com.mysql.jdbc.driver”).option(“dbtable”,”customerProfile”).option(“user”,”ngoel”).option(“password”,”xxxxxx”).option(“lowerBound”, 0).option(“upperBound”,10000).option(“numPartitions”, 4).option(“partitionColumn”, customer_id).load()
  • Filtering data

Instead of specifying the dbtable parameter, you can use a query parameter to specify a subset of the data to be extracted into the dataframe.

val df = spark.read.format(“jdbc”).option(“url”,”jdbc:mysql://db1.zaloni.com/customer”).option(“driver”,”com.mysql.jdbc.driver”).option(“query”, “select * from customer.CustomerProfile where state = ’WA’”).option(“user”,”ngoel”).option(“password”,”xxxxxx”).load()
  • Persisting data to FileSystem or database

Once the dataframe is created, you can apply further filtering, transformations on the dataframe or persist the data to a filesystem including hive or another database.

df.write.saveAsTable(‘customer.customerprofile’)

OR

df.write.format(“jdbc”).option(“url”,”jdbc:mysql://db2.zaloni.com/customer”).option(“driver”,”com.mysql.jdbc.driver”).option(“dbtable”,”customerProfile”).option(“user”,”ngoel”).option(“password”,”xxxxxx”).save()

Now that we have seen some basic usage of how to extract data using Sqoop and Spark, I want to highlight some of the key advantages and disadvantages of using Spark in such use cases.

  • When using Sqoop to build a data pipeline, users have to persist a dataset into a filesystem like HDFS, regardless of whether they intend to consume it at a future time or not. With Spark, persisting data is completely optional. Users can write Spark jobs to perform the necessary filtering/transformations or build analytical models on the dataframes created on the JDBC source and only persist the transformed data to their target system if needed. Based on the use case, users can also choose from an extensible list of target systems to persist the transformed data, including other relational databases, NoSQL databases, streams, and file systems (out of the box support for common file formats such AVRO, Parquet, JSON, etc.)
  • Data engineers may want to work with the data in an interactive fashion using Jupyter Notebooks or simply Spark Shell. This is a key advantage over Sqoop, which only submits a MapReduce job and does not let users interactively work with the data extracts and decide if they want to persist the extracted data to the filesystem.
  • Apache Spark can be run in standalone mode or optionally using a resource manager such as YARN/Mesos/Kubernetes. This presents an opportunity for data engineers to start a lightweight transient Spark cluster in an environment of choice and shut down after the task is done and not have to compete for resources in a typical long-running Hadoop cluster.
  • Many data pipeline use-cases require you to join disparate data sources. For example, what if my Customer Profile table is in a relational database but Customer Transactions table is in S3 or Hive. Using Spark, you can actually run Federated data queries by defining dataframes for both data sources and join them in memory instead of first persisting my CustomerProfile table in Hive or S3

Next, I will highlight some of the challenges we faced when transitioning to unified data processing using Spark.

In the Zaloni Data Platform, Apache Spark now sits at the core of our compute engine. Data engineers can visually design a data transformation which generates Spark code and submits the job a Spark Cluster. One of the new features — Data Marketplace enables data engineers and data scientist to search the data catalog for data that they want to use for analytics and provision that data to a managed and governed sandbox environment. ZDP allows extracting data from file systems such as HDFS, S3, ADLS or Azure Blob, relational databases to provision the data out to target sandbox environments. Apache Spark drives the end-to-end data pipeline from reading, filtering and transforming data before writing to the target sandbox.

Some of the challenges we faced include:

  • Data type mapping — Apache Spark provides an abstract implementation of JDBCDialect, which provides basic conversion of SQL data types to Catalyst data type. There are out of box concrete implementations for many popular databases but there might be a need to extend or create new implementation based on the use case and database.
  • Performance tuning — As described in the examples above, pay attention to configuring numPartitions and choosing the right PartitionColumn is key to achieving parallelism and performance. When persisting data to filesystem or relation database, it is also important to use a coalesce or repartition function to avoid writing small files to the file system OR reduce the number of JDBC connections used to write to target a database.

In conclusion, this post describes the basic usage of Apache Sqoop and Apache Spark for extracting data from relational databases along with key advantages and challenges of using Apache Spark for this use case. In the next post, we will go over how to take advantage of transient compute in a cloud environment.

--

--