Playing with Spark in Scala: Warm-up Game

Alec Lebedev
Oct 16 · 15 min read

Apache Spark is undeniably one of the most popular tools used for data processing these days. Cloud platforms, such as AWS, package Spark in their serverless (AWS Glue) and managed (AWS EMR) offerings lowering the overhead associated with managing a Spark cluster on your own. The good news is that it reduces the operational complexity associated with managing Spark and lowers the barrier to entry for new users. At the same time, this presents Spark as a shiny shrink-wrapped black box in the cloud that just works without us having to understand its inner-workings. But what if we try to bring it down from the cloud, pop it open and play with it at home?

Spark is a distributed system, which means that it horizontally distributes the work that needs to be done across multiple executors. For example, Spark can split the input data into partitions and assign partitions to executors instructing them to process the data in the partitions assigned. However, this doesn’t mean that multiple physical machines are mandatory for every Spark deployment. In fact, a well-designed distributed system, can be run on your laptop allowing you to test its functionality locally. Moreover, such systems allow you to run them in your favorite IDE taking advantage of all its nice features, such as code completion, on-the-fly compilation and debugging.

In this article I present a software engineer’s attempt to build a simple data transformation in Spark and test it the way we usually test software.

The code developed in this article is available in https://gitlab.com/softminded/transformer-spark-scala

Data Processing with Apache Spark

Apache Spark is used to perform data transformations and execute queries against data. An example of a data transformation is converting data from a row-oriented format, such as CSV, to a columnar format, such as Parquet. An example of a query is searching an employee directory stored in CSV files on a local hard drive for employees with a given last name. Keep in mind that Spark is a processing engine and does not implement its own persistent storage. Instead, Spark reads data from an external storage, performs data transformations and queries and writes the result to an external data storage, potentially different from the one that provided the input data. Here is a data flow diagram for a Spark transformer job that converts data from CSV to Parquet format.

In this article we put in place the development workflow for a data transformer which converts data from its one format to another. The main focus here is to explore how we can utilize software engineering tools, techniques and processes when developing code that runs on Apache Spark.

It is important to use the right tools for any job. Software engineers use Interactive Development Environments (IDEs) to write code. IDEs come with such features as code autocompletion, on-the-fly code compilation, support for code debugging, such as breakpoints, expression watches, resource utilization monitoring, etc. For this project, I used IntelliJ IDE. Although I chose Scala as the programming language for this exercise, the practices described in this article apply to other languages supported by Spark, such as Python.

Designing the Object Model

One of the use cases for the Spark job we are building in this article is to convert data from a row-based to a columnar format. This means that the job needs to be able to read the initial data, transform it and write transformed data to its target destination. Spark provides several programming abstractions, such as RDD, DataFrame and Dataset, to represent the data it operates with. Unlike RDDs, DataFrames and Datasets have schemas associated with them. However, Dataset schema is strongly typed whereas DataFrame schema is more generic and, in fact, is just a type alias for Dataset[Row]. Since the job will be producing a columnar format it will have a schema, so we would want to use either DataFrame or Dataset to represent the output data. Since the job should be able to transform different kinds of data, e.g. customer, order, line item, the schema may not be known at compile time and may need to be read from a configuration file. For these reason, I find DataFrame an adequate abstraction for this project.

The following is a list of the main object interfaces:

  • DataFrameReader — a trait with read():DataFrame method which reads data into a DataFrame; the source of data and data format are an implementation detail of an extending class;
  • DataFrameTransformer — a trait with transform(inputDF: DataFrame): DataFrame method which transforms the data from the given DataFrame and returns the result;
  • DataFrameWriter — a trait with write(DataFrame):Unit method which writes the given DataFrame to its destination; the destination and data format are an implementation detail of an extending class;
  • TableTransformerJob — a class with run():Unit method which orchestrates reading, transforming and writing transformed data as follows:

At this point we can start thinking about how exactly we are going to read, transform and write data using Spark.

Development Workflow

As we are getting ready to write our first line of code, it’s a good idea to think about the code organization and the build process. I used spark-sbt.g8 template to create the folder structure, sample code and build configuration for this project. This project uses SBT as the Scala build tool and you can review SBT configuration in the Git repo. This is what the project organizational structure looks like in the IDE:

This project uses GitLab for the source code and CI/CD pipeline management. The goal here is to use a single service for hosting the source code, running tests, displaying the status of the project and sending notifications when things break. GitLab CI/CD pipeline configuration file .gitlab-ci.yml resides in the root of the source repository allowing us to manage configuration changes using standard Git workflow. The file contains the name of the Docker image that should be used for provisioning the build container and the scripts that install the necessary dependencies, run tests, report code coverage, etc.

It’s important to assess the quality of the code we write. To do this, we will write Unit and Integration tests which will verify that the code behaves as expected and produces the desired result. Unit tests are white-box tests which attempt to exercise each line of the code without testing its integration with external APIs. “White-box test” means that the test writer knows the exact implementation and is able to target its various code branches. Effectiveness of unit tests can be measured by the portion of the code they exercise. Unit tests need to run fast allowing us to verify the quality of our code and iterate quickly. To do this, unit tests mock out calls to external data sources and other third-party dependencies that can drastically slow down unit test. For example, in this project I use the excellent Mockito Scala library to mock out SparkSession. You can find more on using mocks in tests in Martin Fowler’s Mocks Aren’t Stubs article.

On the other hand, integration tests are black-box tests which check integration points and exercise the code from the interface level allowing the code to make the external calls required, such as reading data from a file. I use the term “integration tests” to refer to the “narrow integration tests” as described in Martin Fowler’s IntegrationTest article. I refer to the “broad integration tests” as “end-to-end tests”.

Unit and integration test are run as a part of the build process. In .gitlab-ci.yml file I configured GitLab’s CI/CD pipeline to push the code coverage results to CodeCov.io service. This requires that an authentication token is generated on CodeCov.io and CODECOV_TOKEN environment variable is set to the token value. This can be easily done by creating a CODECOV_TOKEN variable with the token value on the Settings->CI/CD->Variables page in GitLab. The GitLab project used for this article can be accessed by following this link.

Reading and Writing Data in Spark

Spark integrates with different data sources and provides several APIs for reading data in different formats, such as DataFrameReader#textFile(String):Dataset<String> to read data from a text file. Spark also allows a DataFrame to be registered as a virtual table or view with a Spark session. Once registered, the view can be used in SQL queries executed by calling SparkSession#sql(String):Dataset<Row>.

Let’s assume that the source of our data has already been registered as a table with SparkSession by the caller. With this assumption, we implement DataFrameTableReader as follows:

class DataFrameTableReader(spark: SparkSession, config: DataFrameTableReader.Config) extends DataFrameReader {

override def read(): DataFrame = {
spark.sql(s"select * from ${config.tableName}")
}

}

DataFrameTableReader class takes SparkSession and DataFrameTableReader.Config as constructor parameters and uses them to run a Spark SQL query to select data from the table with the name specified in the config. We take a similar approach implementing DataFrameTableWriter class as shown below:

class DataFrameTableWriter(spark: SparkSession, config: DataFrameTableWriter.Config) extends DataFrameWriter {

override def write(df: DataFrame): Unit = {
require(df != null, "df session must be specified")
df.write
.format(config.outputFormat)
.insertInto(config.tableName)
}

}

We also implement TableTransformerJob — the main entry point for the ETL conversion, which orchestrates the ETL by reading, transforming and writing data as follows:

/**
* This object reads data from one Spark table, transforms it and writes output to another Spark table.
*
*
@param spark the spark session to use for the transformations
*
@param config the config describing transformations to perform
*/
class TableTransformerJob(spark: SparkSession, config: TableTransformerConfig) {

def run(): Unit = {
// read data
val inputDF = newReader().read()

// transform data
val outputDF = newTransformer().transform(inputDF)

// write transformed data out
newWriter().write(outputDF)
}

@VisibleForTesting
protected def newTransformer(): DataFrameTransformer = {
DataFrameTransformerFactory.getTransformer(config.transformerClass)
}

@VisibleForTesting
protected def newReader(): DataFrameReader = {
new DataFrameTableReader(spark, config.readerConfig)
}

@VisibleForTesting
protected def newWriter(): DataFrameWriter = {
new DataFrameTableWriter(spark, config.writerConfig)
}
}

Unit Testing Spark Job

Let’s write a unit test that verifies that transformer job calls the reader, transformer and writer with correct parameters. To do this we create a TableTransformerJobSpec class in the test source tree with the following class signature:

class TableTransformerJobSpec
extends FunSuite
with MockitoSugar
with SparkSessionLender

FunSuite is a class from ScalaTest library which provides support for writing tests in Scala. ScalaTest supports different testing styles and I chose a simple but descriptive enough style which allows me to provide a description and register a test with test(“description”) {} block.

MockitoSugar is a trait from Mockito Scala library which provides abstractions on top of Mockito Java API adding support for a more Scala-like syntax, such as mock[DataFrameReader] used in the code below. This also allows us to use traditional expectation-verification style of Mockito tests with when(thisHappens).then(doThis) and verify(that).happened calls.

SparkSessionLender is a trait where I implemented the Loan Pattern by offloading SparkSession lifecycle management responsibility from the test code into this trait. The following method creates a local SparkSession and hands it to the test function passed as a parameter. Extracting this code out to a separate method allows us to adjust the SparkSession settings in one place, e.g. in order to enable Hive integration we will use in for the integration tests below.

def withLocalSparkContext(testFunction: SparkSession => Any) {
val spark = SparkSession.builder()
.appName("spark testing")
.master("local")
.getOrCreate()

try {
testFunction(spark)
}
finally spark.stop()
}

Now, let’s add a unit test in TableTransformerJobSpec which mocks out SparkSession, reader, transformer and writer and just focuses on testing that the data is properly handed off from reader to transformer and from transformer to writer. I like to use given-when-then commenting style to demarcate the part of the code which sets up test fixture, the part of the code that invokes the code under test and the part of the code which verifies the post-conditions.

test("TableTransformerJob calls reader, transformer and writer") {

// given: mocked SparkSession, reader, writer and transformer
val spark = mock[SparkSession]
val mockReader = mock[DataFrameReader]
val mockWriter = mock[DataFrameWriter]
val mockTransformer = mock[DataFrameTransformer]

val mockInputDF = mock[DataFrame]
when(mockReader.read()).thenReturn(mockInputDF)

val mockOutputDF = mock[DataFrame]
when(mockTransformer.transform(mockInputDF)).thenReturn(mockOutputDF)

// when: the job is run
newTableTransformerJob(spark, mockReader, mockWriter, mockTransformer).run()

// then: mocked reader, transformer and writer are called with the correct data frames
verify(mockReader).read()
verify(mockTransformer).transform(mockInputDF)
verify(mockWriter).write(mockOutputDF)
}

Let’s push these changes to GitLab which will trigger the build pipeline and update the status of the project as follows.

Understandably, the code coverage is pretty low because we mocked out most of our code’s functionality. The desired level of code coverage is application specific and different engineering teams have different code coverage standards, e.g. for the percentage of classes, methods and lines covered. In Test Coverage article, Martin Fowler describes the value of code coverage and provides some rationale for establishing such standards.

Before we add more tests to increase the code coverage, let’s make our transformation process a little more flexible. The purpose of our transformer job is to transform data from one format to another. The job should be able to handle different types of data, such as customer data or data from an order processing system. We introduce a DataFrameTransformerFactory class which will create an instance of a transformer given the fully qualified transformer class name as transformerClass parameter. We also change TableTransformerJob to delegate to DataFrameTransformerFactory for transformer instantiation given the job’s config.transformerClass setting.

object DataFrameTransformerFactory {
/**
*
@param transformerClass the fully qualified class name of the transformer to instantiate
*
@return an instance of transformer of the given class
*/
def getTransformer(transformerClass: String): DataFrameTransformer = {
getClass.getClassLoader.loadClass(transformerClass).newInstance()
.asInstanceOf[DataFrameTransformer]
}
}

Let’s write a unit test to exercise this new functionality which adds support for configuration-driven transformer instantiation. But first, let’s create a FakeDataFrameTransformer class to which the test will configure the job to delegate and then verify that the data returned by the fake transformer was passed to the writer. We implement the fake transformer to produce the result (outputDF) we can compare against in the test as follows:

object FakeDataFrameTransformer extends MockitoSugar {
val outputDF: DataFrame = mock[DataFrame]
}

class FakeDataFrameTransformer extends DataFrameTransformer {
override def transform(inputDF: DataFrame): DataFrame = {
FakeDataFrameTransformer.outputDF
}
}

And now we are ready to implement a unit test which verifies that the job correctly instantiates and delegates to FakeDataFrameTransformer.

test("TableTransformerJob delegates to the fake transformer") {

// given: mocked reader and writer and fake transformerClass set in the job's configuration
val spark = mock[SparkSession]
val mockReader = mock[DataFrameReader]
val mockWriter = mock[DataFrameWriter]

val mockInputDF = mock[DataFrame]
when(mockReader.read()).thenReturn(mockInputDF)

// when: the job is run
newTableTransformerJob(spark, mockReader, mockWriter, classOf[FakeDataFrameTransformer].getName).run()

// then: mocked reader, transformer and writer are called with the correct data frames
verify(mockWriter).write(FakeDataFrameTransformer.outputDF)
}
private def newTableTransformerJob(spark: SparkSession, mockReader: DataFrameReader, mockWriter: DataFrameWriter, transformerClass: String) = {
val readerConfig = DataFrameTableReader.Config("inputTable")
val writerConfig = DataFrameTableWriter.Config("outputName", "orc")
val config = TableTransformerConfig(transformerClass, readerConfig, writerConfig)
new TableTransformerJob(spark, config) {

override protected def newReader(): DataFrameReader = mockReader

override protected def newWriter(): DataFrameWriter = mockWriter
}
}

The test passes, but let’s double-check that it really covered the lines we expected it to exercise. IntelliJ allows us to run the test with test coverage enabled by clicking on the right-most icon in the screenshot below.

Upon test completion, we can inspect the transformer job and factory classes and observe the green markers to the left of each line covered by the test.

That’s pretty cool, we have the test passing and exercising the lines of code we expected. So, let’s run all our tests and confirm that all of them are still passing. We can do this in IntelliJ or by executing sbt clean coverage test coverageReport command which is the exact command that is run by our build process. Now we can commit our change to Git with a descriptive Git comment, such as “added a unit test to verify that TableTransformerJob delegates to the transformer class set in the config” and push it to GitLab triggering the build pipeline. Upon its completion, our project page in GitLab shows that the test coverage went up as shown below.

OK, that’s good, so let’s explore which code has not yet been covered by tests by clicking on the codecov badge. This will show that neither DataFrameTableReader nor DataFrameTableWriter have been covered because all our tests have been mocking them out so far.

Let’s add a test which generates some test data in memory, registers it as a table with SparkSession and asks TableTransformerJob to orchestrate the identity transformation of this data.

From Wikipedia: The identity transform is a data transformation that copies the source data into the destination data without change.

Identity transformations are useful in cases that don’t depend on the semantics of the input data. Such cases include compressing input data stored in raw text format or splitting large input files into smaller, more numerous, files in order to increase the ability to parallelize processing of the larger number of files. The following shows an implementation of an identity transformer.

class IdentityDataFrameTransformer extends DataFrameTransformer {
override def transform(inputDF: DataFrame): DataFrame = {
inputDF
}
}

In the following test we do not use mocks making it the transformer job’s responsibility to instantiate the reader, transformer and writer the way it would have been done in production. We generate some fake input data and register it as a table with SparkSession. The input data should be read by DataFrameTableReader and passed to the IdentityDataFrameTransformer which will return the data without any modifications. The data will then be given to DataFrameTableWriter to stored it in the output table registered with SparkSession. However, DataFrameTableWriter assumes that the output table already exists and thus requires the test to create this table with an appropriate schema. Remember that Spark does not have its own persistent storage and delegates to other technologies for persistent storage management. One such technology is Apache Hive which supports persistent storage on the local file system. To enable Hive support we need to do two things:

  • Add spark-hive dependency in build.sbt:
    libraryDependencies += “org.apache.spark” %% “spark-hive” % “2.4.3” % “provided” // for integration testing
  • Enable Hive support in SparkSession configured inside SparkSessionLender#withLocalSparkContext:
    val spark = SparkSession.builder()
    .appName(“spark testing”)
    .master(“local”)
    .enableHiveSupport()
    .getOrCreate()

Let’s use the Loan Pattern to implement the code that creates a table given the table name and schema, executes the test function and cleans up after itself by dropping the table.

def withSparkTable(spark: SparkSession, tableName: String, tableSchema: StructType, testFunction: () => Any) {
try {
spark.sql(s"create table $tableName(${tableSchema.toDDL})")
testFunction()
}
finally {
spark.sql(s"drop table if exists $tableName")
}
}

Now we are ready to write an integration test which exercises the code in DataFrameTableReader, DataFrameTableWriter and IdentityDataFrameTransformer. The test verifies that the identity transformation works correctly by asserting that the output table contains the same data as the input table using assertSmallDataFrameEquality(actualDF, expectedDF) from spark-fast-tests library. We use import spark.implicits._ to bring in some Spark helper functions which make working with Spark data frames a little easier, e.g. by calling RDD#toDF to create a data frame containing input data for the test.

test("TableTransformerJob performs identity transformation correctly") {

withLocalSparkContext(spark => {
import spark.implicits._

// given: some input data registered as a Spark table and a transformer configured to transform the input data to ORC format
val inputDF = spark.sparkContext.parallelize(Seq((1, "2019-10-05", "00", "A"), (2, "2019-10-05", "01", "B"))).toDF("id", "date", "hour", "content")
inputDF.createOrReplaceTempView("input_table")

val readerConfig = DataFrameTableReader.Config("input_table")
val writerConfig = DataFrameTableWriter.Config("output_table", "orc")
val config = TableTransformerConfig(classOf[IdentityDataFrameTransformer].getName, readerConfig, writerConfig)

// identity transformation should preserve input schema in the output
val outputTableSchema = inputDF.schema

withSparkTable(spark, writerConfig.tableName, outputTableSchema, () => {

// when: the job is run
new TableTransformerJob(spark, config).run()

// then: identity transformation returns correct data
val outputDF = spark.sql("select * from output_table")
assertSmallDataFrameEquality(outputDF, inputDF, ignoreNullable = true)
})
})
}

Let’s verify that all our tests are still passing by running sbt clean coverage test coverageReport locally. After that, we can commit our change to Git with a descriptive comment and push it to GitLab causing it to trigger the build pipeline. Upon completion of the build process the project status in GitLab looks as follows.

This looks pretty good — all our tests are passing and we achieved a 100% code coverage. The following screenshots from CodeCov.io show how our code coverage changed over time and the current line-by-line coverage statistics.

Code overage trend (CodeCove.io)
Summary of lines covered in the project’s top-level package (CodeCove.io)
A break-down of lines covered in each class (CodeCove.io)

Our code is in a pretty good shape, so let’s tag this code state in Git in case we want to check out this state of code in the future.

git tag -a warmup-0.1 -m "identity transformation with tests and code coverage"
git push --tags

Summary

In this article we started exploring working with Spark code in Scala from the software engineering perspective. We created a source code repository in Git and configured a CI/CD pipeline for it in GitLab. We integrated the pipeline to push code coverage metrics to CodeCov.io and implemented unit and integration tests to achieve a high level of coverage. In our unit tests, we experimented with object mocking techniques. In the integration test we generated a sample data set and registered it as a table with SparkSession. We enabled Spark integration with Hive in order to allow the test to write transformed data to a Hive table backed by the local file system. In the next article we will continue this exploration by implementing a data conversion for a practical use case.

Alec Lebedev

Written by

Systems architect and software engineer with the focus on data architecture and adoption of best software engineering practices.

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade