How to Easily Test Spark DataFrame Transformations

Eriks Dombrovskis
CodeX
Published in
5 min readMay 19, 2021
Photo by Johannes Groll on Unsplash

As a Data Engineer, I often need to write different complexity DataFrame transformations. Often these manipulations can get so complex that on a larger dataset it can take hours or even maybe more to test. When you understand that this is not feasible you need to utilize one of Spark’s great traits — Runs Everywhere. So let us try running our tests in our IDE and see how we can test our DataFrame transformations easily.

Task

First, we need to imagine some data that, for example, is too big for us to test on it directly:

Sample citizen data

We have a dataset that contains a citizen’s name, surname, and identification number. The task will be simple — we need to extract the year, month, and day of a citizen’s birth date. The birth date is within the identification_number column as the first part before the dash sign. For example, the first citizen Adore Shaddock has identification number 19981217–73959 and thus the date of birth is 1998-12-17.

Solution

Let's write some code to accomplish this. First, we need to somehow capture the part of the identification number before the dash symbol — here we can use a simple RegEx expression to extract it:

\d+(?=-)
  • \d matches any digit character (0–9).
  • + is a quantifier to match 1 or more proceeding this type of characters (digits).
  • (?=-) matches a group after the main expression without including it in the result after the dash symbol.

Now for the Spark transformation that will extract this birth date:

def extractBirthDate(inputDf: DataFrame): DataFrame = {
inputDf
.withColumn("birth_date", regexp_extract(col("identification_number"), """\d+(?=-)""", 0))
.withColumn("year", substring(col("birth_date"), 0, 4))
.withColumn("month", substring(col("birth_date"), 5, 2))
.withColumn("day", substring(col("birth_date"), 7, 2))
}

We take the column identification_number and using our RegEx expression extract the first part to a new column called birth_date. Then from this column using the substring() function extract year, month, and day.

Testing solution

Now before writing any tests for your DataFrame transformations always take care that your functions are pure. So they do not access the network resources, databases, or file systems.

For testing, we will be using the ScalaTest testing tool which is the most popular one for Scala but you can use any other one that you prefer. Before writing tests we need to also somehow have Spark running in our IDE. The most convenient way of doing this is to write a Trait that can extend our testing class and set up a SparkSession for us:

import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession

trait SparkSessionTestWrapper {

Logger.getLogger("org").setLevel(Level.ERROR)
Logger.getLogger("akka").setLevel(Level.ERROR)

val spark: SparkSession =
SparkSession
.builder()
.master("local[1]")
.appName("Local Test")
.getOrCreate()
}

By setting the .master(“local[1]”) option we specify Spark to run locally with one thread which is great for our tests.

To test a DataFrame transformation or chain of these transformations we need to have some idea of what needs to be tested. We have to compare two DataFrames and to do this we need to compare the following things:

  • DataFrame schema — this includes all the structural information of a DataFrame like column names, data types, and nullability.
  • DataFrame data — these are the values that are stored in our DataFrame.

Let's write a general and reusable functionality that could compare both of these DataFrame properties:

import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.types.StructType

trait DataFrameTestUtils {

def assertSchema(schema1: StructType, schema2: StructType, checkNullable: Boolean = true): Boolean = {
val s1 = schema1.fields.map(f => (f.name, f.dataType, f.nullable))
val s2 = schema2.fields.map(f => (f.name, f.dataType, f.nullable))
if (checkNullable) {
s1.diff(s2).isEmpty
}
else {
s1.map(s => (s._1, s._2)).diff(s2.map(s => (s._1, s._2))).isEmpty
}
}

def assertData(df1: DataFrame, df2: DataFrame): Boolean = {
val data1 = df1.collect()
val data2 = df2.collect()
data1.diff(data2).isEmpty
}
}

Again to have this reusable for our tests we have written it as a Trait with two methods assertSchema() and assertData().

assertSchema() takes two StructTypes (DataFrame schemas) and a Boolean (whether to check nullability) and returns a Boolean to indicate if these two DataFrame schemas are matching. The nullability check is necessary if you specify your own schema for data and not infer it automatically on reading (when you do not care about nullability).

assertData() simply takes two DataFrames and returns a Boolean to indicate if data for these is the same. It does this by returning all the elements of a dataset as an array at the driver program and compares them. collect() is an action and it can get very costly performance-wise so take care not to overshoot with the size of your test mock-up data.

Now that we have our functionality for creating a local SparkSession and comparison of DataFrames let's write the tests themselves. First, we need some test data. Usually, you can just take a couple of records from your dataset but be sure that you are allowed to do so and no sensitive data is in your code. You can either have a separate file for the test data (like a .csv or .json) and read it while running tests or just create one programmatically in your code:

import spark.implicits._

val sourceDf = Seq(
("Adore", "Shaddock", "19981217-73959"),
("Lorenza", "Kiersten", "19621220-14807"),
("Mureil", "Willie", "19781211-72222")
).toDF("name", "surname", "identification_number")

Now that we have our source data lets create an expected DataFrame — how it should look after our transformations:

val expectedDf = Seq(
("Adore", "Shaddock", "19981217-73959", "19981217", "1998", "12", "17"),
("Lorenza", "Kiersten", "19621220-14807", "19621220", "1962", "12", "20"),
("Mureil", "Willie", "19781211-72222", "19781211", "1978", "12", "11")
).toDF("name", "surname", "identification_number", "birth_date", "year", "month", "day")

Putting it all together into one test:

test("DataFrame Schema Test") {

val sourceDf = Seq(
("Adore", "Shaddock", "19981217-73959"),
("Lorenza", "Kiersten", "19621220-14807"),
("Mureil", "Willie", "19781211-72222")
).toDF("name", "surname", "identification_number")

val resDf = extractBirthDate(sourceDf)

val expectedDf = Seq(
("Adore", "Shaddock", "19981217-73959", "19981217", "1998", "12", "17"),
("Lorenza", "Kiersten", "19621220-14807", "19621220", "1962", "12", "20"),
("Mureil", "Willie", "19781211-72222", "19781211", "1978", "12", "11")
).toDF("name", "surname", "identification_number", "birth_date", "year", "month", "day")

assert(assertSchema(resDf.schema, expectedDf.schema))
}

Similarly for the DataFrame data test:

test("DataFrame Data Test") {
val sourceDf = Seq(
("Jackie", "Ax", "19861126-29967"),
("Vanessa", "Campball", "19881021-86591"),
("Willetta", "Reneta", "19991125-38555")
).toDF("name", "surname", "identification_number")

val resDf = extractBirthDate(sourceDf)

val expectedDf = Seq(
("Jackie", "Ax", "19861126-29967", "19861126", "1986", "11", "26"),
("Vanessa", "Campball", "19881021-86591", "19881021", "1988", "10", "21"),
("Willetta", "Reneta", "19991125-38555", "19991125", "1999", "11", "25")
).toDF("name", "surname", "identification_number", "birth_date", "year", "month", "day")

assert(assertData(resDf, expectedDf))
}

You could reuse the same sourceDf and expectedDf for both tests and reduce the lines of code but I think it’s better to have for each test separate mock-up data. Now let's try to run our tests in Intellij to see if our function extractBirthDate() does what it's supposed to:

Running DataFrame transformation tests in Intellij

Success! We have a tested DataFrame transformation ran on our IDE and it took less than 7 seconds. If you have not yet started testing your Spark transformations then I hope now you have some basic setup for writing your own tests.

The project is available on GitHub.

Thank you and good luck in testing!

--

--