Transforming Data with Embedded Spark

Alec Lebedev
Nov 27 · 12 min read

In this article we continue the exploration of Spark jobs written in Scala by building up on the foundation established in the previous article in this series “Playing with Spark Scala: Warmup Game”. The goal of this article is to enhance our transformer job to handle a practical use case and get some insights into operational details of Spark job execution. The article is organized as follows:

1. Generating Test TPC-H Data
2. Transforming LineItem Data
2.1 Choosing Table Schema Representation
2.2 Defining LineItem Table Schema
2.3 Creating LineItem Table in Hive
2.4 Reading Raw LineItem Data
2.5 Transforming LineItem Data from CSV to ORC
3. Gleaning insights into Spark job performance
4. Finalizing feature implementation
5. Summary

Generating Test TPC-H Data

In this article, we will implement the transformation of TPC-H benchmark data from its raw text format to a columnar format. From the TPC-H website:

The TPC Benchmark™H (TPC-H) is a decision support benchmark. It consists of a suite of business oriented ad-hoc queries and concurrent data modifications.

The goal here is not to benchmark the Spark transformation job, for which TPC-DI benchmark would have been more appropriate, but to generate some data which we can use to benchmark query performance of Spark and other query engines against the data transformed.

Let’s generate some TPC-H sample data with tpch-kit tool. The following will create a benchmark dataset of about 100KB, which is small enough to be checked in to the project repo and use in the integration tests run by our GitLab CI/CD pipeline. Alternatively, we could have uploaded the sample dataset to a shared location, such as a bucket in S3, and configure the build process to download the file before running the tests. (You may need to adjust the build command if you are not running on Mac OS.)

TPC-H dataset generated consists of the following files: customer.tbl, lineitem.tbl, nation.tbl, orders.tbl, part.tbl, partsupp.tbl, region.tbl, supplier.tbl. These files contain data simulated from a fictional parts-ordering system where customers place orders for parts and suppliers fill those orders.

Let’s also generate TPC-H queries which can be run against TPC-H sample data. The following will generate tpch-q${i}.sql files in the folder with qgen executable.

The queries generated allow us to answer some business questions about the data simulated. For example, here is a description of query #3 from the TPC-H Specifications:

Shipping Priority Query (Q3)
This query retrieves the 10 unshipped orders with the highest value.

This should give you some idea about TPC-H benchmark data and queries.

Transforming LineItem Data

TPC-H benchmark dataset consists of several tables and in this section we will implement a transformer for the LineItem table. The schemas of all tables are described in the full TPC-H specification and can be downloaded in PDF format from the TPC Downloads page.

Our goal here is to read LineItem data from the lineitem.tbl file generated in the previous section, transform the data to the desired format, such as ORC or Parquet, and store the data transformed for further analysis. Here is the content of lineitem.tbl file with 6 line items, which we’ll be using in the tests below.

lineitem.tbl generated by TPC-H with scale 0.001

The first order of business is to create a new Git branch which will host our changes.

> git branch feature/lineitem-transform
> git checkout feature/lineitem-transform

Choosing Table Schema Representation

Before we dive into code, let’s think about the schema of the data that we would want the transformer to produce.

First, let’s decide how to represent the schema of the LineItem table. One option would be to use SQL DDL and write a “CREATE TABLE AS” statement that would create a table with the right schema for us. SQL has been around for a long time and was originally designed to work with relational databases (RDBMs). However, the standard, ANSI SQL, does not have support for expressing certain properties of the table schema, such as the ones pertaining to the format, partitioning and data layout of the underlying physical storage. Instead, the standard ANSI SQL defers to the RDBMs implementing the standard for making those decisions.

Another option is to use Apache Hive SQL language to express our table schema. Hive is a data warehouse software originally developed as a part of Apache Hadoop framework. One of the core components of Hive is the metastore. Hive metastore is a registry that maintains information about databases, table schemas and other metadata. Its role is similar to the Information Schema in relational databases. Popular data technologies, such as Apache Spark and Presto, integrate with Hive metastore.

Hive DDL seems like a good choice for representing our table schema. However, it would be nice if we could represent a Hive table schema as a Scala object and avoid having to parse Hive DDL commands. To do this we will borrow from another technology, called AWS Glue. Hive metastore is so popular that AWS implemented its own fully managed metadata repository, called AWS Glue Data Catalog, to be compatible with Hive metastore. AWS Glue implements a Create Table Web API which we can use as a template for expressing a table schema in our code. Converting AWS Glue Create Table API request payload to Scala results in the following simple representation of a Hive table schema:

Defining LineItem Table Schema

The definition of a table schema in Hive consists of the traditional column names and types as well as the description of the physical data format and layout. We can define the schema for the table that will hold the LineItem ORC data produced by LineItemDataFrameTransformer as follows:

Note that in addition to the column types, the schema defines the SerDe (serializer/deserializer) library org.apache.hadoop.hive.ql.io.orc.OrcSerde which will read data from the underlying storage, such as HDFS, using org.apache.hadoop.hive.ql.io.orc.OrcInputFormat and will write to the underlying storage using org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat.

Creating LineItem Table in Hive

Spark integrates with Hive metastore which can be deployed in embedded, local and remote modes. In embedded mode, Hive uses Derby database for persistent storage of metadata, which will work well for our integration tests, but may not scale for production-level workloads. The following diagram shows the data and metadata flows in our integration tests. Hive metastore maintains information about where the input CSV file are stored and where the transformed data should be written and Spark uses this information to perform the actual reading and writing of data.

Embedded Spark and Hive metastore

In order to store transformed data in a Hive table we first need to create the table with an appropriate schema. We can introduce the following helper method in SparkTestHelper class which will create a Hive table given a schema representation argument tableSchema:

Let’s now add a test in LineItemDataFrameTransformerSpec that verifies that we can create a Hive table with the LineItem schema defined in LineItemDataFrameTransformer.

Reading Raw LineItem Data

In the previous article, I showed how to create a DataFrame in memory and register it as a table with SparkSession. Here, we can extend this approach by reading the data into a DataFrame from a file.

The following code snippet shows the use of DataFrameReader#csv Spark API for reading a CSV file without a header and | as a custom delimiter into a data frame.

Following the pattern established in the previous article, we create a DataFrameCsvFileReader class for reading CSV files.

Next, we implement an integration test which checks that the data read from the CSV file is correct. Note that, although possible, we don’t specify the schema for the data frame read because we will be transforming this data into a different schema anyway. So, at this point, having all columns read as strings is sufficient for our needs.

Although this approach works, it only operates on one file at a time. When running at scale, we would want our job to read multiple files at a time. One way to implement this is by using DataFrameReader#csv API which operates on multiple files. This works, but requires us to find all the CSV files that need to be read at runtime. If needed, this can be done by either listing files in directories or watching directories for file modifications. However, frequently data in storage is partitioned by the time when it is written, e.g. the storage structure is organized as a date/hour hierarchy. Note that the advantage of this data organization is that it guarantees that once the data is written — it’s immutable. The disadvantage of this data organization is that it makes it difficult to group relevant data by other attributes, such as shipping date, without creating a lot of combinations of folders/partitions inside the storage. Data immutability is a very important property because it simplifies incremental processing of data and reasoning about its state. That’s why it is common to organize raw data by the time when it’s written and subsequently transform that data to a more appropriate format, such as ORC or Parquet, and partitioning structure, e.g. by shipping date.

One option for reading all files in a folder is to create a Hive table with the location pointing to the folder and then use our DataFrameTableReader to read data from the Hive table. The following shows a test which does just that.

Transforming LineItem Data from CSV to ORC

At this point we have the code to read line item data from CSV files into a data frame and also write a data frame to a Hive table. Note that the data is read from CSV into a data frame with all columns of type String. So, one transformation we would want to do is casting each column to an appropriate type. The following shows an implementation of such casting in LineItemDataFrameTransformer class. The transformation casts input String columns to the types declared in the output table schema definition from LineItemDataFrameTransformer.getSchema():

The following test stages a CSV file in a folder, creates a Hive table on top of that folder, reads the Hive table into a data frame, passes the data frame to the transformer and verifies that the transformer produced a data frame with the right number of rows with properly typed schema.

Gleaning insights into Spark job performance

Having tests around our Spark jobs is great, but can we tell what Spark is actually doing when these tests are executed in the embedded mode? How can we get any insights into what’s happening behind the scenes?

It turns out that, just like in standalone local and remote modes, Spark can run a UI server in embedded mode as well. Spark UI is a web interface which provides visibility into the performance characteristics of Spark jobs, including how the jobs are broken down into stages and how Spark executors are being utilized by each job.

In embedded mode, Spark UI runs in the same JVM as the test and gets shut down before the test finishes. So, in order to be able to access the UI while the test is running, we put a breakpoint right after the Spark job completes. In the test above, the Spark job is started by transformedDF.count() call, because, in Spark DAG terminology, count() is an action, as opposed to a transformation, which triggers the execution of the DAG in our test. Please refer to Apache Spark documentation for details on transformations and actions supported by Spark.

One caveat here is that we want the breakpoint to pause the thread executing the test but allow the thread serving Spark UI to continue its execution. In IntelliJ IDEA, this can be done by right-clicking on the breakpoint and selecting Thread in the Suspend sections as shown below.

Test execution paused to allow access to the Spark UI

Now we can run the test in debug mode and access Spark UI when the breakpoint is hit. Note that Spark UI server is started by default but can be disabled by setting spark.ui.enabled config setting to false. If Spark UI is enabled, then you will see the following line in the log generated during the test:

19/11/24 11:12:16 INFO Utils: Successfully started service 'SparkUI' on port 4040.

This means that Spark UI can be accessed by opening http://localhost:4040/ in the browser while the test is paused on the breakpoint. The UI port can be changed by setting spark.ui.port Spark configuration to the port desired, e.g. when creating Spark session as in SparkSession.builder().config(“spark.ui.port”, “4041”).getOrCreate().

This screenshot shows the Spark UI with the job and its stages completed before the test hit the breakpoint. It’s interesting that a simple count() action caused a data shuffle resulting in two job stages. The reason for this is that Dataset#count() is implemented using groupBy().count() with an empty key. This causes partial counts to be calculated in each partition and all those counts to be sent to a single partition mapped to the empty key. That partition will then produce the final aggregate count. Thus, in this case, the shuffle moves a small amount of data around and is not a point of concern.

It’s interesting that switching from Dataset#count() to RDD#count() API removes this extra shuffle because the latter does not use groupBy and instead adds up partial counts received from each partition directly.

There are lots of little nuances like this which we can uncover while using Spark UI. The topic of Spark job optimization is beyond the scope of this article and has an abundance of supporting literature online.

One last thing I’d like to point out is the SQL tab in the UI, which shows Spark SQL queries run by jobs and their execution plans. Note that the same execution plans can be acquired programmatically by prefixing the query with EXPLAIN or EXPLAIN EXTENDED.

Finalizing feature implementation

At this point, the line item transformation has been implemented and we need to make sure that all our tests are still passing and we have a good code coverage. One way to do this is by running tests with coverage in the IDE as we did in the previous article. Another way to do this is by running the following commands to generate a coverage report:

> sbt clean coverage test
> sbt coverageReport

The last command will produce a target/scala-2.11/scoverage-report/index.html in the project folder. Open this file in a web browser and review the coverage report.

Once we are happy with the code coverage, we can push the feature branch to the Git server which will trigger the CI/CD pipeline.

> git push --set-upstream origin feature/lineitem-transform

Once satisfied with the tests passing and the code coverage produced by CI/CD, let’s create a pull request (called “merge request” in GitLab) with the changes we’ve made in feature/lineitem-transform branch for other developers to review. When creating such a request, use feature/lineitem-transform as the source branch, master as the target branch, provide a clear description of the changes and assign it to users who should review the request. Here is an example of what such a pull request looks like in GitLab:

Pull/merge request in GitLab

Once the request is approved, the changes in the request can be merged into the master branch by clicking on the Merge button in the UI or using git merge command. Note that if changes were made in the master branch after we forked the feature branch from it, then we would want to apply those changes to the feature branch before merging the pull request into the master branch. This can be done using git rebase command which will temporarily roll back the changes made in the feature branch, replay changes that had been made in the master branch and replay the changes reverted on top of the changes replayed. Sometimes, this may result in merge conflicts which you would have to resolve in the feature branch before merging it onto master.

Summary

In this article we introduced TPC-H benchmarks and briefly discussed the dataset and queries it’s comprised of. We evaluated different options for representing a table schema in Spark using Scala object model and settled on the format used in Create Table AWS Glue Web API. We reviewed different Spark API for reading from and writing to files and used them to implement a Spark job for converting TPC-H LineItem data from its raw to ORC format. Next, we discussed trade offs of different ways to organize data in the underlying physical storage. Finally, we peeked behind the scenes of Spark API to look at the job execution details in Spark UI while running Spark jobs in embedded mode. All along, we’ve been following the best software engineering practices ensuring that we produce clean code with high and measurable quality.

Alec Lebedev

Written by

Systems architect and software engineer with the focus on data architecture and adoption of best software engineering practices.

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade