Apache Spark as a Distributed SQL Engine

Nicolas A Perez
6 min readJan 6, 2016

SQL have been there for a while and people like it. However, the engines that power SQL have changed with time in order to solve new problems and keep up with demands from consumers.

Traditional engines such as Microsoft SQL Server had some problems with scalability that they have solved with time and cloud based solutions. On the other hand, others have been built from the ground up to work in distributed environment so they can put performance in the top of their priority list.

There is not a tool for all use cases, in fact, we believe that tools are built with use cases in mind, to solve a specific problem and then they evolve to a more mature stages where they can be used to solve many other problems.

In a traditional SQL environment, the data is represented by tables and the relationships between them, but this representation is sometimes not enough so new tools are born to solve this. We can find everywhere organizations that don’t use relational databases; instead, they prefer to go to the non-SQL ones.

Hadoop

In the Hadoop world, we have find different query engines and each of them has its own particularities, solving a wide variety of problems.

In any Hadoop distribution we can find Apache Hive, a SQL like tool that offers data warehouse infrastructure and capabilities for big data queries and analysis.

Depending of the Hadoop distribution we can also find Apache Impala, and Apache Drill. All of them offer more of less the same capabilities sharing a common goal. We can use SQL or SQL like languages to query data stored in Hadoop. They also have their own limitations and advantages you should be aware of. Here is a link with more details about these technologies.

Apache Spark

Apache Spark, a lightning-fast cluster computing that can be deployed in a Hadoop cluster or stand alone mode. It can also be used as an SQL engine like the others we mentioned. Spark, however, offers some advantages over the previous ones that cannot be ignored easily.

Spark exposes API(s) for different languages such as Scala, Java, Python, and recently added, R. This makes it accessible by many people from different areas of expertise. Developers, data scientists, advanced business people with statistic experience (I have seen business guys using R).

Interactive algorithms are easily implemented in Spark, especially Machine Learning ones.

Let’s walk through an example of how to use Spark as an SQL engine.

Exploring Our Data Source

Our data set is a simple folder with few terabytes in CSV formatted files and each file is about 40 MB each. The size of the files does not affect our performance because they are stored in a MapR cluster. MapR take cares of the Hadoop small file problem as I explain in this post.

Because we are using MapR, copying files to the cluster is quite easy since we have mounted a volume to our local file system.

In order to mount the MapR volume we run this command:

sudo mount_nfs -o "hard,nolock" 10.21.112.209:/mapr/mapr.domain.com/datalake /Users/anicolaspp/mapr/

Now, if we run POSIX commands again our local folder they will in fact be executed in the MapR cluster.

Preparing the Environment for Auto Schema Discovery

We are going to create a Spark application using Scala that will allow us to execute SQL statements over our data stored in MapR, the Hadoop distribution we are using for this post.

In this post I explained how to create an application in Spark and the previous steps we need to follow.

Our app class will look as follow:

/**
* Created by anicolaspp.
*/
import org.apache.spark
import org.apache.spark._
import org.apache.spark.sql.hive.HiveContext
import org.apache.spark.sql.hive.thriftserver.HiveThriftServer2
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.sql.types.{StringType, StructField, StructType}
object app { def main(args: Array[String]) { val conf = new SparkConf().setAppName("testing")
val sc = new SparkContext(conf)
val sql = new HiveContext(sc)

sql.setConf("hive.server2.thrift.port", "10001")
val delimiter = "\t" val data = sc.textFile("datalake/myTestDataFolder/") val headers = data.first.split(delimiter)
val schema = StructType(headers.map(h => StructField(h, StringType)))
val rowRDD = data.map(p => Row.fromSeq(p.split(delimiter)))

val dataFrame = sql.createDataFrame(rowRDD, schema)

dataFrame.registerTempTable("someTableName")
HiveThriftServer2.startWithContext(sql) while (true) { Thread.`yield`() }
}
}

Let’s review our code.

First, we create the Spark Context based on a Config object.

val conf = new SparkConf().setAppName("testing")
val sc = new SparkContext(conf)
val sql = new HiveContext(sc)

Then, we set the thrift port to avoid conflicts with other components such as Hive.

sql.setConf("hive.server2.thrift.port", "10001")

Now, we set our CSV delimiter that in this case is the tab character. We also set the location of our data set by creating a Resilient Distributed Dataset (RDD) using the Spark Context (sc)

val delimiter = "\t"
val data = sc.textFile("datalake/myTestDataFolder/")

At this point, we want to be able to serve our data without worrying about the schema of our file; we want a Self Service BI Environment as I explained here. Using the headers from our data files, we can create the schema, automatically, so we don’t have to worry about schema changes in the future. Once we have the schema, we create a Data Frame that we are going to expose to be queried using SQL.

val headers = data.first.split(delimiter)
val schema = StructType(headers.map(h => StructField(h, StringType)))
val rowRDD = data.map(p => Row.fromSeq(p.split(delimiter)))
val dataFrame = sql.createDataFrame(rowRDD, schema)

The only part missing is the one that register our data set as a table in the Hive meta store and we do that by doing:

dataFrame.registerTempTable("someTableName")
HiveThriftServer2.startWithContext(sql)

We have a loop just to keep our app alive. Note that RDD transformations are lazy and they will be only executed when a query is submitted for execution.

Deploying Our Application

We build and test our app using SBT and the resulting .jar can be copied to the cluster in the same way we copy files in our local file system.

cp pathToOurJar/app.jar /Users/anicolaspp/mapr/testing

Remember this is possible because we have previously mounted a MapR volume in our local file system.

Now, we need to submit our application in the cluster and we do that by using the spark-submit command. Detailed documentation about submitting Spark applications can be found at Spark website.

In our cluster, we run:

/spark-submit --master yarn /mapr/mapr.domain.com/datalake/testing/testing_2.10-1.0.jar

Our application should start running on Yarn as we indicated when submitting it.

Our SQL engine is ready to be queried, so let’s move forward and test it out.

SQL Clients

An easy way to test our SQL engine is to run beeline, a command line tool that works as an SQL client.

We can find beeline in the Spark bin folder, to start it we do ./beeline.

Within beeline, we need connect to the end point we have defined in our application so we run:

!connect jdbc:hive2://localhost:10001

We should be ready to run SQL statements, but let’s verify we can see the table we registered.

show tables;

Spark SQL will return a table with the registered tables including the one we registered in our application (someTableName).

In the same way, we can connect using other clients such as Microstrategy or Tableau. We have tried both and they both can build and execute queries on tables registered by Spark applications. We can also combine different sources (Spark SQL, MS SQL Server, Hive, Impala, etc…) with gives us the flexibility of combining relational sources with non-relational data.

Spark SQL performs quite well and often better than the other providers in Hadoop, but be aware that performance can be degraded under certain conditions and use cases.

Why Apache Spark

Certainly, Spark SQL offers some of the functionalities that other tools have within Hadoop. However, the possibility of explore complex data sets is kind of unique to Spark since we can code custom serialization / deserialization processes in our application. Using Spark SQL, we can connect to any data source and present it as tables to be consumed by SQL clients. This is as easy as changing how we ready the data in those sources by changing our serializer in our application.

Endings

There are very useful tools that we can use within Hadoop to query data in an SQL fashion and all of them have their advantages. The Spark SQL module from Apache Spark offers some flexibility that others lack while keeping performance as one of the main priorities.

Spark is not the only tool we can use, but we strongly advise to include it in big data solutions where SQL statements are to be executed. It might be a mix between different tools, but for sure, Spark will be an important part of the system we are building.

Read next:

Extending Our Spark SQL Query Engine

How MapR improves our productivity and simplifies our design

This blog doesn’t represent the thoughts, plans or strategies of my employer.

--

--