Loading database data into Spark using Data Sources API

Sujee
Spark Experts
Published in
3 min readJul 30, 2017

Update: Read this new post for Spark 2.0 example.

With Spark 1.3 release, it is easy to load database data into Spark using Spark SQL data sources API.

Data sources API which provides a unified interface to query external data sources from Spark SQL is introduced in Spark 1.2. However an official JDBC data source API is released only in version 1.3. This release also introduced a very important feature of Spark — The DataFrame API which is a higher level abstraction over RDD and provides a simpler API for data processing.

There are two ways to call data sources API

  1. Programmatically using SQLContext load function
  2. Using SQL — We’ll look at this in another blog post

SQLContext load

The load function is defined in Scala as follows,

def load(source: String, options: java.util.Map[String, String]): DataFrame

First parameter ‘source’ specifies the type of data source API. In our case, it is ‘jdbc’. 2nd parameter is a map of options required by the data source implementation specified in the first parameter. It varies from data source to data source. For JDBC datasource following parameters are required (Reproduced from Spark SQL documentation).

  1. url:
    The JDBC URL to connect to.
  2. dbtable:
    The JDBC table that should be read. Note that anything that is valid in a ‘FROM’ clause of a SQL query can be used. For example, instead of a full table you could also use a subquery in parentheses.
  3. driver:
    The class name of the JDBC driver needed to connect to this URL. This class with be loaded on the master and workers before running an JDBC commands to allow the driver to register itself with the JDBC subsystem.
  4. partitionColumn, lowerBound, upperBound, numPartitions:
    These options must all be specified if any of them is specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric column from the table in question.

The example application I wrote to explain the concepts uses MySQL sample Employees database https://dev.mysql.com/doc/employee/en/. First it loads part of MySQL data into Spark using Data sources API and prints it in the log.

  • MySQL connection parameters are defined as constants
private static final String MYSQL_DRIVER = "com.mysql.jdbc.Driver";
private static final String MYSQL_USERNAME = "expertuser";
private static final String MYSQL_PWD = "expertuser123";
private static final String MYSQL_CONNECTION_URL = "jdbc:mysql://localhost:3306/employees?user=" + MYSQL_USERNAME + "&password=" + MYSQL_PWD;
  • Initializing SQLContext from SparkContext
private static final JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("SparkJdbcDs").setMaster("local[*]"));private static final SQLContext sqlContext = new SQLContext(sc);
  • JDBC data source options for MySQL database
//Datasource options
Map<String, String> options = new HashMap<>();
options.put("driver", MYSQL_DRIVER);
options.put("url", MYSQL_CONNECTION_URL);
options.put("dbtable", "(select emp_no, concat_ws(' ', first_name, last_name) as full_name from employees) as employees_name");
options.put("partitionColumn", "emp_no");
options.put("lowerBound", "10001");
options.put("upperBound", "499999");
options.put("numPartitions", "10");

As of writing this, it is not possible to separately specify username and password. So it has been specified as part of connection URL.

The most interesting part here is the ‘dbtable’ option. I have used a derived table created from a subquery. The subquery only selects the required columns from ’employees’ table. It also concatenates first name and last name and returns it as full name. Since this transformation happens inside the MySQL, overall performance of this app will be improved. Please note that similar optimization is also possible in JdbcRDD. Here I’m doing it this way to demontrate that ‘dbtable’ is not just limited to name of an existing table.

As you can see, primary key “emp_no” is used for partitioning “employees” table. Minimum and maximum values of emp_no are used as lowerBound and upperBound respectively. The data will be loaded into 10 partitions which means first partition will have records whose primary keys are within 10001 to roughly 59000.

  • Loading MySQL query result as DataFrame
DataFrame jdbcDF = sqlContext.load("jdbc", options);

Like RDD, DataFrame is also lazily executed. So calling load function will not immediately load the data from MySQL. It will wait until an action is executed on the returned DataFrame.

  • Executing DataFrame by invoking an action
List<Row> employeeFullNameRows = jdbcDF.collectAsList();for (Row employeeFullNameRow : employeeFullNameRows) {
LOGGER.info(employeeFullNameRow);
}

Complete Java application is available at my GitHub repo https://github.com/sujee81/SparkApps.

--

--