GridDB Connector for Apache Spark
Apache Spark now has support to fully integrate GridDB into its workflow. For those unaware, Spark is FOSS which saw its initial release in 2014. Since then, it has very quickly established itself as an important piece of Big Data processing and analyzing. This blog post is meant give instructions on how to install Spark on your GridDB machine and will also go over some brief queries to provide a tangible look at its usage.
As briefly explained before, Apache Spark is a parallel data processing framework meant to provide fast data analytics. Using the GridDB connector allows a GridDB database to be used as an input source for Spark queries and analytics. Its interactive shell can be used to quickly and easily perform ad-hoc queries by data scientists/developers or can be built into user-facing business applications. Installation is a fairly simple process.
This blog assumes your machine already has a GridDB server, the GridDB Java Client, and the GridDB Hadoop Mapreduce Connector. These items all also each have their own sets of dependencies, so I will post a full list below. And please note, if you have any sorts of issues installing any of these items, please leave a comment below or post on the forums for help.
Full list of dependencies:
- OS: CentOS6.7(x64)
- Maven: apache-maven-3.3.9
- Java: JDK 1.8.0_101
- Apache Hadoop: Version 2.6.5
- Apache Spark: Version 2.1.0
- Scala: Version 2.11.8
- GridDB server and Java client: 3.0 CE
- GridDB connector for Apache Hadoop MapReduce: 1.0
If beginning from scratch, I recommend ensuring all of these items are installed and configured. This tutorial also assumes that your Hadoop, Spark, and Connector are all installed in the
[INSTALL_FOLDER] directory (I used
Once verified, please proceed with the steps outlined below:
We start this process off with adding the following environment variables to
$ nano ~/.bashrc
export JAVA_HOME=/usr/lib/jvm/[JDK folder]
export HADOOP_OPTS="$HADOOP_OPTS -Djava.library.path=$HADOOP_HOME/lib/native"$ source ~/.bashrc
Once those are added, modify the
$ cd $GRIDDB_SPARK
$ nano gd-config.xml<!-- GridDB properties -->
<value>[GridDB user] </value>
<value>[GridDB password] </value>
<value>[GridDB cluster name] </value>
<!-- Define address and port for multicast method, leave it blank if using other method -->
<value>[GridDB notification address(default is 18.104.22.168)] </value>
<value>[GridDB notification port(default is 31999)] </value>
Build The Connector + An Example
Next up, refer to this configuration page for a quick definition of each of the GridDB properties.
To build a GridDB Java client and a GridDB connector for Hadoop MapReduce, place the following files under the
.jar files should have been created when you built your GridDB client and the GridDB Mapreduce Connector. You can find
/usr/griddb-X.X.X/bin, for example)
Once that’s complete, add the SPARK_CLASSPATH to “spark-env.sh”
$ cd $SPARK_HOME
$ nano conf/spark-env.shSPARK_CLASSPATH=.:$GRIDDB_SPARK/gs-spark-datasource/target/gs-spark-datasource.jar:$GRIDDB_SPARK/gs-spark-datasource/lib/gridstore.jar:$GRIDDB_SPARK/gs-spark-datasource/lib/gs-hadoop-mapreduce-client-1.0.0.jar
Now that we’ve got the prerequisites out of the way, we can continue on to build the connector and an example to ensure everything is working properly.
To begin, we will need to edit our
Init.java file to add the correct authentication credientials.
$ cd $SPARK_HOME/gs-spark-datasource-example/src/
$ nano Init.java
And add in your credentials:
Properties props = new Properties();
GridStore store = GridStoreFactory.getInstance().getGridStore(props);
And now we can run the mvn command like so:
$ cd $GRIDDB_SPARK
$ mvn package
which will create the following
Now proceed with running the example program. First start your GridDB cluster. And then:
Put some data into the server with the GridDB Java client
$ cd $GRIDDB_SPARK
$ java -cp ./gs-spark-datasource-example/target/example.jar:gs-spark-datasource/lib/gridstore.jar Init
Now you can run queries with your GridDB connector for Spark:
$ spark-submit --class Query ./gs-spark-datasource-example/target/example.jar
We will go over some brief examples of Apache Spark’s API. Examples are pulled from the official page.
Spark’s defining feature is its RDD (Resilient Distributed Datasets) and the accompanying API. RDDs are immutable data structures that can be run in parallel on commodity hardware — essentially it is exactly what allows Spark to run its queries in parallel and outperform MapReduce. Here’s a very basic example; it will showcase how to build an RDD of the numbers 1–5
List data = Arrays.asList(1, 2, 3, 4, 5);
JavaRDD distData = sc.parallelize(data);
With this, you can now run that small array in parallel. Pretty cool, huh?
Command Line Query
A “must-run” query in the Big Data scene is running a word count, so here’s what it looks like on Spark. For this example, let’s try using the shell (example taken from: here). To run this, please be sure you place a text file
input.txt into your
$GRIDDB_SPARK directory. Fill it with whatever text you like; I used the opening chapter of Moby Dick . Now fire up the spark shell:
scala> val inputfile = sc.textFile ("input.txt")
inputfile: org.apache.spark.rdd.RDD[String] = input.txt MapPartitionsRDD at textFile at :24
scala> val counts = inputfile.flatMap (line => line.split (" " )).map (word => (word, 1)).reduceByKey(_+_)
counts: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD at reduceByKey at :26
scala> counts.saveAsTextFile ("output")
And now if you head back into
$GRIDDB_SPARK, you should find the
output dir. Now just run a simple
cat on the file in there to retrieve the word count results of your text file.
$ cd $GRIDDB_SPARK
$ cd output
$ cat part-00000
Of course, Spark is also capable of handling much more complex queries. Because GridDB ideally deals mostly in TimeSeries (TS) data, how about we take a look into a TS query? Here’s a sample query taken from here:
val tsRdd: TimeSeriesRDD = ...
// Find a sub-slice between two dates
val zone = ZoneId.systemDefault()
val subslice = tsRdd.slice(
// Fill in missing values based on linear interpolation
val filled = subslice.fill("linear")
// Use an AR(1) model to remove serial correlations
val residuals = filled.mapSeries(series => ar(series, 1).removeTimeDependentEffects(series))
Using Spark with GridDB as an input source is very easy and will prove to be a very useful implementation. We hope the marriage of both services yields lots of productive analysis work.
Originally published at griddb.net.