Delta/Update/Insert: Building a Single Framework for Writing into RDBMS in Both Spark & Pure Scala Modes

Niroj Pattnaik
Nov 8, 2020 · 9 min read

RDBMS is an integral part of building most analytical layers on top of a data warehouse. Similarly, Spark is well known and vastly being used across the organizations to build a robust ETL pipeline that can do massive computing in memory and load to the various platforms including RDBMS.

Image for post
Image for post

Many times, we face issues when we try to do DB operations switching between Scala and Spark. When we just need a single value we may not want to use spark to create a data frame for us. Similarly, many SQL queries like UPDATE/DELETE, etc. are not supported directly in Spark, so need to switch between these two frequently. We will try to address this issue by discussing an approach to create a single framework to achieve both in this topic.

For simplicity, I am keeping DB2 as my database layer but this applies to all other relational databases.

System Prerequisites: Java 8, Scala 2.11+, Spark 2.3+, Db2 9.5.+

Spark has already a nice built-in JDBC API that can be leveraged here along with Scala’s usual way of interacting through JDBC. We will consider simple dimensional modeling terms like surrogate id, natural & composite keys.

Please go through the below references before discussing our custom framework:
https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html https://docs.oracle.com/javase/tutorial/jdbc/basics/index.html

Framework:
I would prefer this framework in a programmatic way rather than spark-SQL-wise, as it is easier to do so in an IDE environment.

Step 1: Let’s first define an enum in Scala to suggest to the users the DB methods we are going to support when we run in spark mode:

Step 2: Now let’s define our primary class with the companion object:

Description of all parameters

Now we have all our parameters defined. Let’s discuss its crucial function execute

def execute(): Either[Option[ResultSet], DataFrame]

As you see above, it uses Scala’s handy syntax of Either that can return an optional ResultSet on Left and DataFrame on Right. If Spark is defined, it calls executeDbQuery function in spark mode else in pure Scala mode

The companion object adds syntactical sugar to access the private class functions along with all possible combinations to call the execute in the right ways (which are hidden here to concentrate on our main objective). Two possible ways are just shown for illustrations:

def apply(configFilePath: String, query: String): Any = Option(execute(new ExecuteDbQuery(configFilePath,query) )).getOrElse("")** Above appply execute in pure scala mode and returns the ResultSet or empty result based on the SQL query passed to itdef apply(spark: Option[SparkSession], configFilePath: String, query: String) = execute(new ExecuteDbQuery(spark,configFilePath,query)).asInstanceOfDataFrame** Above apply function executes in spark mode and returns a DataFrame from the Database

Step 3: Load and set the DB properties file placed inside the jar

Basically, it loads the config file and setups all the required paramters like DB url, user id, credenatials etc. for connecting to a database. Param query is passed by the user which can be a SQL query in spark/scala or a table name in spark mode. If the query starts with “file:”, it treats its a file and load all SQLs separated by a semi-colon(;). The file mode is generally meant for executing all SQL queris in batch mode using pure Scala. Broadcasting the properies variable may not required here, readers are advised to try skipping that. “batchSize” is the parameter we can use in spark.jdbc api to commit (From Spark’s jdbc help: the JDBC batch size, which determines how many rows to insert per round trip. This can help performance on JDBC drivers. This option applies only to writing. It defaults to 1000.)

Ignore the “.resolveEnv” as of now. This is just an implicit class implementation of a string to resolve its value from the environment. There are several existing good articles about the implicit class already. We will discuss them on some other day.

Step 4: Plain Scala only mode to read/write in DB:
def executeDbQuery(internalQuery: String): Option[ResultSet] = { …}

Now let's discuss how we write and execute the query where the ExecuteDbQuery object is called without the optional spark parameter. Without Spark, it runs the queries in general pure Scala way of querying through the JDBC. Let’s define the variables & make the DB connection, we need as below:

Running in Scala only batch or single query execution

Above, we are trying to see the length of queries passed by the user. Say the user is sending more than one query separated by semi-colons (;). At our framework, we read them together and split them to get an array out of it. If the Array length equals 1, we can simply execute the query otherwise we submit all the queries together as part of batch submit. Also, we check whether the query starts with “SELECT” or others. We run the appropriate execute methods based upon the query user writes.

We also check the explicit commits kept by the user inside multiple queries or in a batch file. Similarly, we split the whole batch for the commitNbr implicitly. For each such commit, we execute the commit in between, and at last, we place a defacto commit at the end to ensure committing everything if remaining. Finally, if it gets resultSet it returns else returns None.

One of the syntactic sugar using the companion object here is to directly return an Array buffer instead of telling users to loop through the result set object and handle the DB close etc as shown below:

Spark READ Mode:

Below we are trying to read the DB using spark. When the optional object parameter “readByParitionColumn” is defined, it reads the table or SELECT query partitioned; otherwise in the nonpartitioned way shown below. Remember that, connection_type is nothing but set as “jdbc” from our config file.

Spark WRITE Mode:
In spark write mode, we will consider different settings based upon the DbOperations enum set by the user i.e. DbOperations.INSERT/UPDATE/UPSERT(Delta).

Before jumping to different custom write modes, we will first learn how to adjust and drop unnecessary columns in the loadReady data frame before we write using spark and how to autoincrement the surrogate key for the table if the user leaves the framework to do so.

Here if the user enables the generateSurrogate with no sequenceName passed, the framework will generate the sequence by auto-incrementing using the Window functions as shown below. In this case, it simply gets the max value of the surrogate id user passed and then increments with 1 as partitioned.

For extra incoming columns which are absent in the target table, the below logic shows a simple hack to get the required columns from the table with an always false “1=2” where condition check and then drops the incoming unwanted columns not matching the same. The initial dataframe is now reassigned by the new dataframe having an automatic sequence and balanced columns.

If generateSurrogate is set to true in the UPDATE/UPSERT method, but with a sequenceName, it increments the number from NEXT VAL of the sequence object defined in the database, instead of Max+1 for the surrogate id. We will discuss this later.

5.1 DbOperations.INSERT mode

When INSERT mode is passed, let’s use the joinKeys to partition the data frame before we write into the target table.

5.2 DbOperation.UPSERT/UPDATE (Delta Operation)

et’s do the delta operation using Merge SQL. Here, I’m keeping DB2 SQL as an example. The variables will be set dynamically from each record and partition per batch group while we traverse through the load ready data frame.

5.2.1 
MERGE INTO $queries AS trgtTable
USING TABLE( VALUES($bindingVars) )
SRC( $insertColStr)
ON $joinConditions
$updateQuery
$allColsInsrtValuesStr
Ref: https://www.ibm.com/support/knowledgecenter/SSEPGG_10.5.0/com.ibm.db2.luw.sql.ref.doc/doc/r0010873.html

For debugging purposes, when we want to see the query we are firing, we need to construct the debugging string by ourselves as most DB or JDBC API does not provide a direct way to do that as below. We are actually tagging each field name with “@” so that we can replace them later when we browse through each row in the data frame. That makes it printable for our debugging. Notice that when we need a sequence object for the surrogate key generation, we simply use “NEXT VAL”. Hence we skip that by filtering it out.

val humanReadableColStr = if (generateSurrogateKey 
&& sequenceName != null)
allCols.filterNot(_ equalsIgnoreCase
surrogateKey)
.map("@"+_).reduce( _+ "," + _)
else
allCols.map("@" + ).reduce( _+ "," +_ )
val humanReadableQueryStr = s"""|MERGE INTO $queries AS trgtTable
|USING
|TABLE(VALUES($humanReadableColStr)
| SRC( $insertColStrl )
|ON $joinConditions
|$(updateQuery}
|$(allColslnsrtValuesStr}
""".stripMargin

Below is the pseudo-code that describes the steps for looping through the data frame partition and records.

loadReadyDb2DF.repartition(batchPartition).foreachPartition { partition =>
// ... Set your DB JDBC connection here
partition.grouped(commitNbr).foreach{ batch =>
batch.foreach { record =>
// Translate into your merge query
// Bind the variables and map to proper field with datatype
//Prepare Statement: preparedStatement.setObject
}
//Add DB batch from the prepared Statement: preparedStatement.addBatch()
}
//Execute Batch preparedStatement.executeBatch()
//Commit your changes
//preparedStatement.clearBatch()
}

To set the binding variables in 5.2.1, we just need to count the total columns we want to insert & prepare the string. Similarly to create the join condition, we use the joinKeys passed by the user. Here instead of simple “x = y” we used “x NOT DISTINCT FROM y”, just to avoid NULL issue scenarios. This may vary depending on what database you are using. Please refer to the below example to understand:

val bindingVars = ("?" * tblTotCols).map(_.toString).reduce(_ + "," +_)
// op -> ( "?", "?", ... "?")
val joinConditions = joinKeys.map(key => s"trgtTable.$key IS NOT DISTINCT FROM SRC.$key").reduce( + " AND "+ _)

insertColStr(in 5.2.1) can be constructed from all columns:

val insertColStr = allCols.reduce(_ + "," + _)

updateQuery(in 5.2.1) can be created from updateCols passed by the user:

val updateQueryl = if (updateCols.isEmpty) "" else updateCols.filter(!_.equalsIgnoreCase(surrogateKey)).map(col => s"$col = SRC.$col").reduce( + "," + )val updateQuery = if (updateCols.isEmpty) "" 
else s"""(
|WHEN MATCHED $extraUpdateSQLConditions THEN
|UPDATE SET
| S{updateQuery1)""".stripMargin

allColsInsrtValuesStr is created from all columns adding the surrogate key in case it is asked to generate the sequence from a DB sequence object. Also, note that we are constructing the INSERT condition only when it is in UPSERT mode not in UPDATE mode, as in update mode we are not supposed to insert anything, shown as below:

val allColsInsrtValuesStrl = allCols.map { x => 
if (generateSurrogateKey && sequenceName != null &&
(x equalsIgnoreCase surrogateKey))
s"NEXT VALUE FOR $sequenceName" else "SRC." + x }
.reduce(_ + "," _)
val allColsInsrtValuesStr = if (dbOperation = DbOperations.UPSERT)
s"""|WHEN NOT MATCHED THEN
| INSERT ( $insertColStr )
| VALUES ( SallColsInsrtValuesStrl )
""".stripMargin
else
""

Once your query is prepared, we can then pass the record from each row element to the binding variable through the JDBC API. But while doing so we make sure to convert each field to proper DB SQL types from Spark, even the NULL values, as shown below:

case "long" =>
srcFieldType = "long"; endFieldType = "BIGINT"
val fieldValue = if (record.get(fieldIndex) = null) null
else record.getAs[Long)(fieldIndex)
if (!dbPrintOnlyFlag) preparedStatement.
setObject(fieldIndex + offset + 1, fieldValue,
java.sql.Types.BIGINT)
if (dbPrintStatememntFlag){
val fieldValueStr = if (fieldValue = null)
"null"
else
s"$fieldValue"
humanReadableQuery = humanReadableOuery.replaceAllLiterally(s"@$col", fieldValueStr)
}

**You should also be able to derive the data types and construct it dynamically instead of explicitly keeping the field data type mapping from the database sys query instead of the above as given below:

val fieldValue = record.get(fieldIndex)preparedStatement.setObject(fieldIndex+1,fieldValue,getTargetColType(col))
*Where: getTargetColType function infers the target column type as below:
def getTargetColType (connection: Connection, tableName: String): Map[String, Int] = {val rs = connection.prepareStatement(s"SELECT * FROM ${tableName} WHERE 1=2")
val rsMetaData = rs.getMetaData
val totCols = rsMetaData.getColumnCount
Seq(1 to totCols).map{ i =>
rsMetaData.getColumnName(i) -> rsMetaData.getColumnType(i)
}.toMap}

Like for null,string,int etc., preparedStatement.setObject(fieldIndex + 1, fieldValue, java.sql.Types.NULL) will ensure adding the field value for the binding variable for each row. After the looping completes per batch, we need to run preparedStatement.executeBatch(), commit per group followed by preparedStatement.clearBatch() and we are done.

You can further refer to the details in the below gist link on this.
https://gist.github.com/niroj-office/b9dd49966ee27a9772eedf58491f0063

Step 6: Example of calling the primary object:

As discussed before, calling the object depends upon the way you set up the companion object. I’m keeping here one simple full mode for illustration:

Spark mode:
ExecuteDbQuery(spark=Some(spark),
configFilePath = "db2.config",
query = s"${SCHEMA}.TABLE_NM",
mode = SaveMode.Append,
dbOperation = DbOperations.UPSERT,,
joinKeys = Seq("key1", "key2", ...),
updatedColumns = Seq("field1", "field2" ...),
surrogateKet = "surrogate_field",
generatedSurrKey = true,
chkPrintFlg = true,
rejectionHdfsPath = s"<hdfs_path>")
Scala Mode:
ExecuteDbQuery(configFilePath = "db2.config",
query = s"SELECT col FROM TABLE")

There are several ways similar frameworks can be built which can save a lot of time & effort for doing redundant tasks. Hopefully, you find it helpful.

Thanks for reading !! :)

The Startup

Medium's largest active publication, followed by +756K people. Follow to join our community.

Niroj Pattnaik

Written by

Hadoop Developer |BigData/ETL Engineer| Techincal Architect| And a Student. https://www.linkedin.com/in/niroj-kumar-pattnaik-89b64221/

The Startup

Medium's largest active publication, followed by +756K people. Follow to join our community.

Niroj Pattnaik

Written by

Hadoop Developer |BigData/ETL Engineer| Techincal Architect| And a Student. https://www.linkedin.com/in/niroj-kumar-pattnaik-89b64221/

The Startup

Medium's largest active publication, followed by +756K people. Follow to join our community.

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store