Customize Spark JDBC Data Source to work with your dedicated Database Dialect

Huaxin Gao
6 min readSep 26, 2017

--

Currently, Spark supports JDBC Data Source which works with DB2, Oracle, Derby, MS SQL Server, MySQL, Postgres and Teradata. If your database is not one of these, then very likely, you will have problems if you use your database with Spark JDBC Data Source. The reason is that there could be data type mapping inconsistency between your database and Spark, that is, some of the data types Spark uses are not supported by your database, and vice versa. In addition, there may be other database specific properties that are not consistent with the Spark JDBC Data Source default values. For example, Spark JDBC Data Source uses double quote as delimited identifier, but some databases may use back tick instead. In order to make your database work properly with Spark JDBC Data Source, you may need to implement your specific database dialect. This article will show you step by step how to do this.

In Spark code (https://github.com/apache/spark.git), inside package org.apache.spark.jdbc, there is an abstract class JdbcDialect. It is used to handle the SQL dialect of a certain database or JDBC driver. Currently, DB2Dialect, DerbyDialect, MsSqlDialect, MySQLDialect, OracleDialect, PostgresDialect and TeradataDialect have already been implemented in Spark. If your database is not one of them, you may need to implement your own database dialect. I will use SQLite as an example, to show how to implement your own database dialect.

Let’s take a look of the JdbcDialect APIs.

1.

/**
* Check if this dialect instance can handle a certain jdbc url.
*
@param url the jdbc url.
*
@return True if the dialect can be applied on the given jdbc
* url.
*
@throws NullPointerException if the url is null.
*/

def canHandle(url : String): Boolean

You will need to override this API for SQLite. SQLite JDBC url is jdbc:sqlite, so in SQLiteDialect, you will need to override the canHandle API to the following:

override def canHandle(url: String): Boolean = 
url.startsWith("jdbc:sqlite")

2.

/**
* Get the custom datatype mapping for the given jdbc meta
* information.
*
@param sqlType The sql type (see java.sql.Types)
*
@param typeName The sql type name (e.g. "BIGINT UNSIGNED")
*
@param size The size of the type.
*
@param md Result metadata associated with this type.
*
@return The actual DataType (subclasses of
*
[[org.apache.spark.sql.types.DataType]])
* or null if the default type mapping should be used.
*/

def getCatalystType(
sqlType: Int, typeName: String, size: Int, md: MetadataBuilder):
Option[DataType] = None

`getCatalystType` is used when reading data from a Spark JDBC Data Source. First, check method JdbcUtils.getCatalystType to see if the mapping between JDBC type and Spark Catalyst type covers all your database types correctly. Here is the mapping:

/**
* Maps a JDBC type to a Catalyst type. This function is called
* only when the JdbcDialect class corresponding to your database
* driver returns null.
*
*
@param sqlType - A field of java.sql.Types
*
@return The Catalyst type corresponding to sqlType.
*/

private def getCatalystType(
sqlType: Int,
precision: Int,
scale: Int,
signed: Boolean): DataType = {
val answer = sqlType match {
// scalastyle:off
case java.sql.Types.ARRAY => null
case
java.sql.Types.BIGINT =>
if (signed) { LongType } else { DecimalType(20,0) }
case java.sql.Types.BINARY => BinaryType
case java.sql.Types.BIT => BooleanType
case java.sql.Types.BLOB => BinaryType
case java.sql.Types.BOOLEAN => BooleanType
case java.sql.Types.CHAR => StringType
case java.sql.Types.CLOB => StringType
case java.sql.Types.DATALINK => null
case
java.sql.Types.DATE => DateType
case java.sql.Types.DECIMAL
if precision != 0 || scale != 0 =>
DecimalType.bounded(precision, scale)
case java.sql.Types.DECIMAL => DecimalType.SYSTEM_DEFAULT
case java.sql.Types.DISTINCT => null
case
java.sql.Types.DOUBLE => DoubleType
case java.sql.Types.FLOAT => FloatType
case java.sql.Types.INTEGER =>
if (signed) { IntegerType } else { LongType }
case java.sql.Types.JAVA_OBJECT => null
case
java.sql.Types.LONGNVARCHAR => StringType
case java.sql.Types.LONGVARBINARY => BinaryType
case java.sql.Types.LONGVARCHAR => StringType
case java.sql.Types.NCHAR => StringType
case java.sql.Types.NCLOB => StringType
case java.sql.Types.NULL => null
case
java.sql.Types.NUMERIC
if precision != 0 || scale != 0 =>
DecimalType.bounded(precision, scale)
case java.sql.Types.NUMERIC => DecimalType.SYSTEM_DEFAULT
case java.sql.Types.NVARCHAR => StringType
case java.sql.Types.OTHER => null
case
java.sql.Types.REAL => DoubleType
case java.sql.Types.REF => StringType
case java.sql.Types.REF_CURSOR => null
case
java.sql.Types.ROWID => LongType
case java.sql.Types.SMALLINT => IntegerType
case java.sql.Types.SQLXML => StringType
case java.sql.Types.STRUCT => StringType
case java.sql.Types.TIME => TimestampType
case java.sql.Types.TIME_WITH_TIMEZONE
=> TimestampType
case java.sql.Types.TIMESTAMP => TimestampType
case java.sql.Types.TIMESTAMP_WITH_TIMEZONE
=> TimestampType
case -101 => TimestampType
// Value for Timestamp with Time Zone in Oracle
case java.sql.Types.TINYINT => IntegerType
case java.sql.Types.VARBINARY => BinaryType
case java.sql.Types.VARCHAR => StringType
case _ =>
throw new SQLException("Unrecognized SQL type " + sqlType)
// scalastyle:on
}
if (answer == null) {
throw new SQLException("Unsupported type " +
JDBCType.valueOf(sqlType).getName)
}
answer
}

If any of the type mapping in the above code doesn’t work for your specific database, you will need to customize the mapping by overriding the JdbcDialects.getCatalystType API in your dialect class. In SQLite, one of the data type storage class is NULL. If the data is null, SQLite JDBC driver returns java.sql.Types.NULL for a few types such as Bit and Byte. This will cause Unsupported Type Exception in Spark JDBC, thus we will need to map Bit and Byte to the correct Spark Catalyst Types by overriding the SQLiteDialect.getCatalystType API:

override def getCatalystType(
sqlType: Int,
typeName: String,
size: Int,
md: MetadataBuilder): Option[DataType] = sqlType match {
case Types.NULL =>
typeName match {
case "BIT" => Option(BooleanType)
case "BYTE" => Option(IntegerType)
case _ => None
}
case _ => None
}

3.

/** 
* Retrieve the jdbc / sql type for a given datatype.
*
@param dt The datatype
* (e.g
[[org.apache.spark.sql.types.StringType]])
*
@return The new JdbcType if there is an override for this
* DataType
*/

def getJDBCType(dt: DataType): Option[JdbcType] = None

Check the data types that Spark uses to write to the JDBC Data Source, make sure all these data types are supported by your database. If any of these data types are not supported by your database, you will need to map these data types to the one that supported by your database by overriding the `getJDBCType` method.

First, go to JdbcUtils class inside package org.apache.spark.jdbc, you will find getCommonJDBCType method as shown below

def getCommonJDBCType(dt: DataType): Option[JdbcType] = {
dt match {
case IntegerType => Option(JdbcType("INTEGER", java.sql.Types.INTEGER))
case LongType => Option(JdbcType("BIGINT", java.sql.Types.BIGINT))
case DoubleType => Option(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE))
case FloatType => Option(JdbcType("REAL", java.sql.Types.FLOAT))
case ShortType => Option(JdbcType("INTEGER", java.sql.Types.SMALLINT))
case ByteType => Option(JdbcType("BYTE", java.sql.Types.TINYINT))
case BooleanType => Option(JdbcType("BIT(1)", java.sql.Types.BIT))
case StringType => Option(JdbcType("TEXT", java.sql.Types.CLOB))
case BinaryType => Option(JdbcType("BLOB", java.sql.Types.BLOB))
case TimestampType => Option(JdbcType("TIMESTAMP", java.sql.Types.TIMESTAMP))
case DateType => Option(JdbcType("DATE", java.sql.Types.DATE))
case t: DecimalType => Option(
JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL))
case _ => None
}
}

By comparing the data types in the above method and the data types that are supported by SQLite, you will find out that SQLite supports all these data types. However, since SQLite supports SMALLINT, we can map ShortType to SMALLINT instead of INTEGER by overriding the getJDBCType API.

override def getJDBCType(dt: DataType): Option[JdbcType] = dt match 
{
case ShortType => Option(JdbcType("SMALLINT", Types.SMALLINT))
case _ => None
}

4.

/**
* Quotes the identifier. This is used to put quotes around the
* identifier in case the column name is a reserved keyword, or in
* case it contains characters that require quotes (e.g. space).
*/

def quoteIdentifier(colName: String): String = {
s""""$colName""""
}

SQLite can use double quote, single quote or back tick to quote the identifier, so there is no need to override this API.

5.

/**
* Get the SQL query that should be used to find if the given table
* exists. Dialects can override this method to return a query
* that works best in a particular database.
*
@param table The name of the table.
*
@return The SQL query to use for checking the table.
*/

def getTableExistsQuery(table: String): String = {
s"SELECT * FROM $table WHERE 1=0"
}
/**
* The SQL query that should be used to discover the schema of a
* table. It only needs to ensure that the result set has the same
* schema as the table, such as by calling
* "SELECT * ...". Dialects can override this method to return a
* query that works best in a particular database.
*
@param table The name of the table.
*
@return The SQL query to use for discovering the schema.
*/
@Since("2.1.0")
def getSchemaQuery(table: String): String = {
s"SELECT * FROM $table WHERE 1=0"
}

Spark usesSELECT * FROM $table WHERE 1=0” for both getTableExistsQuery and getSchemaQuery. You will need to check if this SQL statement works with your database. If not, you will need to override this API in your dialect implementation. SQLite supports this SQL statement, so there is no need to do anything here.

After implementing your own database dialect, i.e. SQLiteDialect, there are two ways to use it:

  1. Open a jira and submit a Pull Request to integrate this new class into Spark. You will need to put this new class under org.apache.spark.sql.jdbc package, along with DB2Dialect, OracleDialect, PostgresDialect and other database dialects. Also, you will need to register SQLiteDialect in JdbcDialects using registerDialect(SQLiteDialect). After the PR is integrated into Spark, SQLiteDialect will be part of the Spark code.
  2. If you don’t want to integrate your database dialect into Spark, you will have to include your database dialect in your Spark JDBC Data Source program. In addition, in your JDBC Data Source program, you will need to register your database dialect before using it, and unregister it afterwards. For example:
JdbcDialects.registerDialect(SQLiteDialect)
val df = spark.read.jdbc(urlWithUserAndPass, "TESTDB", new
Properties()
df.filter($"Col1" > 21).show()
......
JdbcDialects.unregisterDialect(SQLiteDialect)

--

--