Spark + Scala Dataset Tutorial
The Dataset API is available in Spark since 2016 January (Spark version 1.6). It provides an efficient programming interface to deal with structured data in Spark. There are several blogposts about why to use Datasets and what their benefits to RDDs and DataFrames ae. Two examples:
“Dataset API combines object-oriented programming style and compile-time type-safety but with the performance benefits of the Catalyst query optimizer. Datasets also use the same efficient off-heap storage mechanism as the DataFrame API.”
See more here: http://www.agildata.com/apache-spark-rdd-vs-dataframe-vs-dataset/“If you want higher degree of type-safety at compile time, want typed JVM objects, take advantage of Catalyst optimization, and benefit from Tungsten’s efficient code generation, use Dataset.”
See more here:
https://databricks.com/blog/2016/07/14/a-tale-of-three-apache-spark-apis-rdds-dataframes-and-datasets.html
After reading some posts I decided to start using Datasets. But working with them wasn’t as easy as I expected, to say the least. I couldn’t find a detailed tutorial with example codes on how to use Datasets, and even to code the most basic data transformations needed a couple of hours googleing-trying-thinking loop. Apart from the new syntax, working with Datasets have three speciality which makes them cumbersome compared to pandas or R dataframes:
- the types of the columns disappear when running untyped transformations
- the names of the columns sometimes disappear partially or completely when typed transformation is used
- missing value handling
Before we go into the details it is important to know that Datasets are DataFrames with the types of the columns defined. When converting a Dataset to DataFrame only the type info is lost otherwise the object is the same and vica versa i.e. you can convert a DataFrame to Dataset by defining the types of the columns in a case class.
The first difficulty is related to untyped transformations which give a DataFrame result instead of Dataset. For exmple if you add a new column to your Dataset, the result will be a DataFrame, even if you define the type of the new column. If you wanted to work within a Dataset environment, then the steps are the followings for untyped transformations:
- Have a Dataset
- Apply the function to it
- The result is a DataFrame
- Convert the result to Dataset by defining the types of the columns in a case class
The second group of functions requiring attention are the typed transformations. When we use typed transformation, the output is a Dataset with proper types. But if there is a change in the columns (fewer or more columns, or new columns created) then the name of the columns we see at the display will be valid only in DataFrame sense (“columnName”). The Dataset reference _.columnName won’t work in these cases, but you can refer to the columns with ._1 or ._2 etc. If you would like to have proper column names, use a case class again.
The third pain point is the missing value handling which demands very special care. You have to specify explicity — along with the type of the columns — whether there might be missing values in that column. During the data transformation steps you have to handle Option type columns differently from the fields where no missing values are allowed, as you have to define exactly what should happen with missing values.
Note, that this tutorial is based on my experience, and
- there might be other Dataset functions, solutions — providing better performance to the these tasks — which I could not find,
- in some cases using Datasets could be ineffective and RDD (or other API) might perform better.
I use Spark as a data scientist, not as an engineer or a developer, so my main focus is on data analysis and data preparation for machine learning. What I wanted to show in this tutorial are examples on how to do basic data transformations on Datasets.
Hopefully my experience will be useful for others.
In this tutorial I use a small Dataset and show the main data transformation functions:
- Creating Datasets
- Manually
- From file
2. Joining Datasets
3. Selecting columns
4. Renaming columns
5. Adding new columns
- Constant column
- Derived column
6. Filtering rows
7. Groupby and aggregating
8. Sorting
9. Appending Datasets
10. Other useful functions
11. Example
The version numbers of my tools:
- jupyter-scala 0.4.0-RC1
- ammonium 0.8.0
- Scala 2.11.8
- Java 1.8.0_111
In Spark notebook everything should work similarly (only the the envoronment setup is different)
- Spark: sparkVersion-2.0.2
- Spark Notebook: sparkNotebookVersion-0.8.0-SNAPSHOT
- Scala: scalaVersion-2.11.8
- SBT: sbtVersion-0.13.8
With sparkVersion-2.0.0 the results were almost the same.
Let’s get started!
Set up the environment.
In [1]:
import $exclude.`org.slf4j:slf4j-log4j12`, $ivy.`org.slf4j:slf4j-nop:1.7.21`, $ivy.`org.slf4j:log4j-over-slf4j:1.7.21` // for cleaner logs
import $ivy.`org.apache.spark::spark-sql:2.0.2` // adjust spark version - spark >= 1.6 should be fine, possibly >= 1.3 too
import $ivy.`org.jupyter-scala::spark:0.4.0-RC1` // allows to create SparkContext-s aware of the jupyter-scala kernel
import jupyter.spark._
import sqlContext.implicits._sparkInit()
sparkConf.
setAppName("SBTB").
setMaster("local")
Out[1]:
import $exclude.$ , $ivy.$ , $ivy.$ // for cleaner logsimport $ivy.$ // adjust spark version - spark >= 1.6 should be fine, possibly >= 1.3 tooimport $ivy.$ // allows to create SparkContext-s aware of the jupyter-scala kernelimport jupyter.spark._import sqlContext.implicits._
res0_6: org.apache.spark.SparkConf = org.apache.spark.SparkConf@19a8b5e3
In [2]:
import org.apache.spark.sql.{Dataset, DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql._
Out[2]:
import org.apache.spark.sql.{Dataset, DataFrame, SparkSession}import org.apache.spark.sql.functions._import org.apache.spark.sql._
1. Creating Dataset
Manually
- Define the data as a sequence.
- Convert the sequence to DataFrame and define the column names
- Define the type of the columns by a case class (using proper column names is a must)
- Convert to Dataset
In this example we create a small Dataset with two columns: the first column contains the name of Star Wars Characters and the second one lists the name of their friends. I used spaces to place the fields visually properly, but it is not neccessary, one space is enough between the items.
In [3]:
//1-2.
val df = Seq(("Yoda", "Obi-Wan Kenobi"),
("Anakin Skywalker", "Sheev Palpatine"),
("Luke Skywalker", "Han Solo, Leia Skywalker"),
("Leia Skywalker", "Obi-Wan Kenobi"),
("Sheev Palpatine", "Anakin Skywalker"),
("Han Solo", "Leia Skywalker, Luke Skywalker, Obi-Wan Kenobi, Chewbacca"),
("Obi-Wan Kenobi", "Yoda, Qui-Gon Jinn"),
("R2-D2", "C-3PO"),
("C-3PO", "R2-D2"),
("Darth Maul", "Sheev Palpatine"),
("Chewbacca", "Han Solo"),
("Lando Calrissian", "Han Solo"),
("Jabba", "Boba Fett")
)
.toDF("name", "friends")Creating SparkContext
Adding session JARs to SparkContext
SparkContext initialized
Out[3]:
df: DataFrame = [name: string, friends: string]
The output message shows that the df object is a DataFrame. In the following two steps we convert it into Dataset.
In [4]:
//3.
case class Friends(name: String, friends: String)
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
Out[4]:
defined class Friends
The second line in step 3. is needed only in jupyter notebook to ensure that the case class is available in the other cells as well. For example in Spark Notebook you don’t need the second line, but it makes no trouble there.
In [5]:
//4.
val friends_ds = df.as[Friends]
Out[5]:
friends_ds: Dataset[Friends] = [name: string, friends: string]
Now the output message shows that the friends_ds object is a Dataset. Finally display the content of the Dataset:
In [6]:
friends_ds.show()+----------------+--------------------+
| name| friends|
+----------------+--------------------+
| Yoda| Obi-Wan Kenobi|
|Anakin Skywalker| Sheev Palpatine|
| Luke Skywalker|Han Solo, Leia Sk...|
| Leia Skywalker| Obi-Wan Kenobi|
| Sheev Palpatine| Anakin Skywalker|
| Han Solo|Leia Skywalker, L...|
| Obi-Wan Kenobi| Yoda, Qui-Gon Jinn|
| R2-D2| C-3PO|
| C-3PO| R2-D2|
| Darth Maul| Sheev Palpatine|
| Chewbacca| Han Solo|
|Lando Calrissian| Han Solo|
| Jabba| Boba Fett|
+----------------+--------------------+
Missing values
Real data is usually not that perfect, missing values and other data quality problems can emerge. To demonstrate missing values handling we create a similar but smaller Dataset where the friends column contains missing values.
Two changes needed:
- use Option[Type] when defining the type of the column in the case class
- use Some(value) for defined values and None for missing values when creating the sequence
In [7]:
case class Friends_Missing(Who: String, friends: Option[String])
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
Out[7]:
defined class Friends_Missing
In [8]:
val ds_missing = Seq(
("Yoda", Some("Obi-Wan Kenobi")),
("Anakin Skywalker", Some("Sheev Palpatine")),
("Luke Skywalker", None),
("Leia Skywalker", Some("Obi-Wan Kenobi")),
("Sheev Palpatine", Some("Anakin Skywalker")),
("Han Solo", Some("Leia Skywalker, Luke Skywalker, Obi-Wan Kenobi")))
.toDF("Who", "friends")
.as[Friends_Missing]
Out[8]:
ds_missing: Dataset[Friends_Missing] = [Who: string, friends: string]
In [9]:
ds_missing.show()+----------------+--------------------+
| Who| friends|
+----------------+--------------------+
| Yoda| Obi-Wan Kenobi|
|Anakin Skywalker| Sheev Palpatine|
| Luke Skywalker| null|
| Leia Skywalker| Obi-Wan Kenobi|
| Sheev Palpatine| Anakin Skywalker|
| Han Solo|Leia Skywalker, L...|
+----------------+--------------------+
Luke Skywalker has no friends in this example so there is a null value in the friends column.
Reading from csv
There are several data formats that can be read with Spark from Parquet to json. For the sake of simplicity I show how to read in a csv file. Earlier the csv reader was a separate package but since the Spark 2.0 version it is integrated into the mainline.
You can download my example file from this link: /wp-content/uploads/2016/12/StarWars.csv. (The origin of the csv is here, but I made some minor modification on it, to better serve my purposes.)
The steps of the csv reading:
- Define the names and the types of the columns in a case class. Note that the names of the columns must be identical with the colum names in the header of the file!
- Read the csv into a DataFrame
- Convert into Dataset
The result of the read is a DataFrame and as we have seen earlier, the .as[Characters] at the end of the expressions converts it to Dataset.
In [10]:
//1.
case class Characters(name: String,
height: Integer,
weight: Option[Integer],
eyecolor: Option[String],
haircolor: Option[String],
jedi: String,
species: String)
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
Out[10]:
defined class Characters
Columns weight, eyecolor and haircolor are Option types, so they are allowed to contain missing values. All the other fields must have values from the type defined in the case class.
In [11]:
//2-3.
val characters_ds: Dataset[Characters] = sqlContext
.read
.option("header", "true")
.option("delimiter", ";")
.option("inferSchema", "true")
.csv("StarWars.csv")
.as[Characters]
Out[11]:
characters_ds: Dataset[Characters] = [name: string, height: int ... 5 more fields]
Some explanation to the read function:
- option(“header”, “true”) — the column names are defined in the first row of the file
- option(“delimiter”, “;”) — the delimiter is the ;
- option(“inferSchema”, “true”) — detect the column types automatically. The schema could also be given manually (see in the Subsidiary comment below).
In [12]:
characters_ds.show()+----------------+------+------+--------+---------+-------+-----------+
| name|height|weight|eyecolor|haircolor| jedi| species|
+----------------+------+------+--------+---------+-------+-----------+
|Anakin Skywalker| 188| 84| blue| blond| jedi| human|
| Padme Amidala| 165| 45| brown| brown|no_jedi| human|
| Luke Skywalker| 172| 77| blue| blond| jedi| human|
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human|
| Qui-Gon Jinn| 193| 89| blue| brown| jedi| human|
| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human|
| Han Solo| 180| 80| brown| brown|no_jedi| human|
| Sheev Palpatine| 173| 75| blue| red|no_jedi| human|
| R2-D2| 96| 32| null| null|no_jedi| droid|
| C-3PO| 167| 75| null| null|no_jedi| droid|
| Yoda| 66| 17| brown| brown| jedi| yoda|
| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian|
| Dooku| 193| 86| brown| brown| jedi| human|
| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee|
| Jabba| 390| null| yellow| none|no_jedi| hutt|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human|
| Boba Fett| 183| 78| brown| black|no_jedi| human|
| Jango Fett| 183| 79| brown| black|no_jedi| human|
+----------------+------+------+--------+---------+-------+-----------+
Here is the Dataset containing the name, height, eyecolor, etc. of Star Wars characters. Some values are missing, like Jabba’s weight or R2-D2’s haircolor.
Remember when the Dataset was created manually we had to define the values for columns with missing values as Some(value1), Some(value2) etc. Of course, in the csv this is not the case, but the reading function does this conversion for the columns with Option types. For example if you wanted to filter the characters with brown eyes, you would have to use the expression filter(x => x.eyecolor == Some(“brown”)) instead of filter(x => x.eyecolor == “brown”) because of the Option type.
Although we don’t see which columns are Option types, we have to keep it in mind when working with them later.
What happens when non-expected missing values appear in your data?
This was the most painfull lesson for me. I read a Dataset from a csv, where some values were missing, but I did not know about them, so in the case class definition I did not use Option. Some data transformation steps were successful, I could even display the Dataset. But at a later point for the umpteenth data transformation function I got NullPointerException and I was struggling to find the reason why that transformation fails after a number of sucesfull steps. So let’s see this behavior. First define a bad case class without options, and then read the csv into Dataset using this bad case class definition.
In [13]:
case class Characters_BadType(name: String,
height: Integer,
weight: Integer,
eyecolor: String,
haircolor: String,
jedi: String,
species: String)
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
Out[13]:
defined class Characters_BadType
In [14]:
val characters_BadType_ds: Dataset[Characters_BadType] = sqlContext
.read
.option("header", "true")
.option("delimiter", ";")
.option("inferSchema", "true")
.csv("StarWars.csv")
.as[Characters_BadType]
Out[14]:
characters_BadType_ds: Dataset[Characters_BadType] = [name: string, height: int ... 5 more fields]
In [15]:
characters_BadType_ds.show()+----------------+------+------+--------+---------+-------+-----------+
| name|height|weight|eyecolor|haircolor| jedi| species|
+----------------+------+------+--------+---------+-------+-----------+
|Anakin Skywalker| 188| 84| blue| blond| jedi| human|
| Padme Amidala| 165| 45| brown| brown|no_jedi| human|
| Luke Skywalker| 172| 77| blue| blond| jedi| human|
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human|
| Qui-Gon Jinn| 193| 89| blue| brown| jedi| human|
| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human|
| Han Solo| 180| 80| brown| brown|no_jedi| human|
| Sheev Palpatine| 173| 75| blue| red|no_jedi| human|
| R2-D2| 96| 32| null| null|no_jedi| droid|
| C-3PO| 167| 75| null| null|no_jedi| droid|
| Yoda| 66| 17| brown| brown| jedi| yoda|
| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian|
| Dooku| 193| 86| brown| brown| jedi| human|
| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee|
| Jabba| 390| null| yellow| none|no_jedi| hutt|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human|
| Boba Fett| 183| 78| brown| black|no_jedi| human|
| Jango Fett| 183| 79| brown| black|no_jedi| human|
+----------------+------+------+--------+---------+-------+-----------+
As you can see there is no error, we can see the Dataset and even the null values in it. Filter this Dataset by a field without missing values.
In [16]:
val characters_BadType_ds2 = characters_BadType_ds.filter(x=> x.jedi=="no_jedi")
Out[16]:
characters_BadType_ds2: Dataset[Characters_BadType] = [name: string, height: int ... 5 more fields]
In [17]:
characters_BadType_ds2.show()+----------------+------+------+--------+---------+-------+-----------+
| name|height|weight|eyecolor|haircolor| jedi| species|
+----------------+------+------+--------+---------+-------+-----------+
| Padme Amidala| 165| 45| brown| brown|no_jedi| human|
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human|
| Han Solo| 180| 80| brown| brown|no_jedi| human|
| Sheev Palpatine| 173| 75| blue| red|no_jedi| human|
| R2-D2| 96| 32| null| null|no_jedi| droid|
| C-3PO| 167| 75| null| null|no_jedi| droid|
| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian|
| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee|
| Jabba| 390| null| yellow| none|no_jedi| hutt|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human|
| Boba Fett| 183| 78| brown| black|no_jedi| human|
| Jango Fett| 183| 79| brown| black|no_jedi| human|
+----------------+------+------+--------+---------+-------+-----------+
Filtering further the Dataset by a string field containing missing values is also successful as the missing values in string types convert to empty string: “”.
In [18]:
characters_BadType_ds2.filter(x=> x.haircolor=="brown").show()+--------------+------+------+--------+---------+-------+-------+
| name|height|weight|eyecolor|haircolor| jedi|species|
+--------------+------+------+--------+---------+-------+-------+
| Padme Amidala| 165| 45| brown| brown|no_jedi| human|
|Leia Skywalker| 150| 49| brown| brown|no_jedi| human|
| Han Solo| 180| 80| brown| brown|no_jedi| human|
| Chewbacca| 228| 112| blue| brown|no_jedi|wookiee|
+--------------+------+------+--------+---------+-------+-------+
But if we filter by weight which is a numeric type with missing values, then we get the java.lang.NullPointerException.
In [19]:
characters_BadType_ds2.filter(x=> x.weight>79).show()org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 10.0 failed 1 times, most recent failure: Lost task 0.0 in stage 10.0 (TID 10, localhost): java.lang.NullPointerException
at scala.Predef$.Integer2int(Predef.scala:362)
at $sess.cmd18Wrapper$Helper$$anonfun$1.apply(cmd18.sc:1)
at $sess.cmd18Wrapper$Helper$$anonfun$1.apply(cmd18.sc:1)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
at org.apache.spark.scheduler.Task.run(Task.scala:86)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)Driver stacktrace:
org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1454)
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1442)
org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1441)
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1441)
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:811)
scala.Option.foreach(Option.scala:257)
org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:811)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1667)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1622)
org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1611)
org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:632)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1873)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1886)
org.apache.spark.SparkContext.runJob(SparkContext.scala:1899)
org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:347)
org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:39)
org.apache.spark.sql.Dataset$$anonfun$org$apache$spark$sql$Dataset$$execute$1$1.apply(Dataset.scala:2193)
org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2546)
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$execute$1(Dataset.scala:2192)
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$collect(Dataset.scala:2199)
org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1935)
org.apache.spark.sql.Dataset$$anonfun$head$1.apply(Dataset.scala:1934)
org.apache.spark.sql.Dataset.withTypedCallback(Dataset.scala:2576)
org.apache.spark.sql.Dataset.head(Dataset.scala:1934)
org.apache.spark.sql.Dataset.take(Dataset.scala:2149)
org.apache.spark.sql.Dataset.showString(Dataset.scala:239)
org.apache.spark.sql.Dataset.show(Dataset.scala:526)
org.apache.spark.sql.Dataset.show(Dataset.scala:486)
org.apache.spark.sql.Dataset.show(Dataset.scala:495)
$sess.cmd18Wrapper$Helper.<init>(cmd18.sc:1)
$sess.cmd18Wrapper.<init>(cmd18.sc:493)
$sess.cmd18$.<init>(cmd18.sc:406)
$sess.cmd18$.<clinit>(cmd18.sc:-1)
java.lang.NullPointerException
scala.Predef$.Integer2int(Predef.scala:362)
$sess.cmd18Wrapper$Helper$$anonfun$1.apply(cmd18.sc:1)
$sess.cmd18Wrapper$Helper$$anonfun$1.apply(cmd18.sc:1)
org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIterator.processNext(Unknown Source)
org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$8$$anon$1.hasNext(WholeStageCodegenExec.scala:370)
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:246)
org.apache.spark.sql.execution.SparkPlan$$anonfun$4.apply(SparkPlan.scala:240)
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:803)
org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:319)
org.apache.spark.rdd.RDD.iterator(RDD.scala:283)
org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:70)
org.apache.spark.scheduler.Task.run(Task.scala:86)
org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:274)
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
java.lang.Thread.run(Thread.java:745)
The reason behind this failure is that the type of the weight column should be Integer as defined in the case class. The missing value was found during the running time, and the null value is not an Integer so we get the exception.
A quickfix here could be to check whether the data is non-missing as shown in the following expression, but using Options in case class definitions is a safer way to handle missing values.
In [20]:
characters_BadType_ds2.filter(x=> x.weight!=null && x.weight>79).show()+----------+------+------+--------+---------+-------+-----------+
| name|height|weight|eyecolor|haircolor| jedi| species|
+----------+------+------+--------+---------+-------+-----------+
| Han Solo| 180| 80| brown| brown|no_jedi| human|
|Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian|
| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee|
+----------+------+------+--------+---------+-------+-----------+
The conslusion here is that if you can not trust a column has all the values defined then it is safer to use Option in the case class to handle missing values. Use types without Option[] only for columns where it is 100% sure that no missing values can appear (applies for numeric, string or all other types as well).
Subsidiary comment — defining the schema of the DataFrame manually
This part is related to DataFrames rather than Datasets.
When reading the csv file into a DataFrame, we can define the schema manually. We can get the idea that we could control (or detect) missing values during the reading process if we use nullable=false in the schema. Let’s try this. The first step is to create the schema manually by defining the column names, types and whether nullable is true or false. Before creating the schema import some types.
In [21]:
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
Out[21]:
import org.apache.spark.sql.types.{StructType, StructField, StringType, IntegerType}
In [22]:
val DF_schema = StructType(Array(
StructField("name", StringType, false),
StructField("height", IntegerType, false),
StructField("weight", IntegerType, false),
StructField("eyecolor", StringType, false),
StructField("haircolor", StringType, false),
StructField("jedi", StringType, false),
StructField("species", StringType, false)))
Out[22]:
DF_schema: StructType = StructType(
StructField(name,StringType,false),
StructField(height,IntegerType,false),
StructField(weight,IntegerType,false),
StructField(eyecolor,StringType,false),
StructField(haircolor,StringType,false),
StructField(jedi,StringType,false),
StructField(species,StringType,false)
)
In [23]:
DF_schema.printTreeStringroot
|-- name: string (nullable = false)
|-- height: integer (nullable = false)
|-- weight: integer (nullable = false)
|-- eyecolor: string (nullable = false)
|-- haircolor: string (nullable = false)
|-- jedi: string (nullable = false)
|-- species: string (nullable = false)
As we see nullable should be false for all the columns. Read the data from csv and use this schema.
In [24]:
val characters1_df = sqlContext
.read
.format("com.databricks.spark.csv")
.option("header", "true")
.option("delimiter", ";")
.schema(DF_schema)
.csv("StarWars.csv")
Out[24]:
characters1_df: DataFrame = [name: string, height: int ... 5 more fields]
Surprisingly it worked without error, even with the missing values in the data. Displaying the data we see that there is a null value in the column weight.
In [25]:
characters1_df.show()+----------------+------+------+--------+---------+-------+-----------+
| name|height|weight|eyecolor|haircolor| jedi| species|
+----------------+------+------+--------+---------+-------+-----------+
|Anakin Skywalker| 188| 84| blue| blond| jedi| human|
| Padme Amidala| 165| 45| brown| brown|no_jedi| human|
| Luke Skywalker| 172| 77| blue| blond| jedi| human|
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human|
| Qui-Gon Jinn| 193| 89| blue| brown| jedi| human|
| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human|
| Han Solo| 180| 80| brown| brown|no_jedi| human|
| Sheev Palpatine| 173| 75| blue| red|no_jedi| human|
| R2-D2| 96| 32| null| null|no_jedi| droid|
| C-3PO| 167| 75| null| null|no_jedi| droid|
| Yoda| 66| 17| brown| brown| jedi| yoda|
| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian|
| Dooku| 193| 86| brown| brown| jedi| human|
| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee|
| Jabba| 390| null| yellow| none|no_jedi| hutt|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human|
| Boba Fett| 183| 78| brown| black|no_jedi| human|
| Jango Fett| 183| 79| brown| black|no_jedi| human|
+----------------+------+------+--------+---------+-------+-----------+
When we print the schema of the DataFrame, we see that nullable = true for all the columns, although we defined them to be false. So unfortunately the manual schema definition did not help in missing values handling.
In [26]:
characters1_df.printSchemaroot
|-- name: string (nullable = true)
|-- height: integer (nullable = true)
|-- weight: integer (nullable = true)
|-- eyecolor: string (nullable = true)
|-- haircolor: string (nullable = true)
|-- jedi: string (nullable = true)
|-- species: string (nullable = true)
But at least missing values don’t make error message to the following filter (as we got it when we used Dataset with wrong types).
In [27]:
characters1_df.filter($"weight"<75).show()+--------------+------+------+--------+---------+-------+-------+
| name|height|weight|eyecolor|haircolor| jedi|species|
+--------------+------+------+--------+---------+-------+-------+
| Padme Amidala| 165| 45| brown| brown|no_jedi| human|
|Leia Skywalker| 150| 49| brown| brown|no_jedi| human|
| R2-D2| 96| 32| null| null|no_jedi| droid|
| Yoda| 66| 17| brown| brown| jedi| yoda|
+--------------+------+------+--------+---------+-------+-------+
Let’s go back to Datasets.
2. Joining Datasets
We work further with the following two Datasets: the first one called friends_ds created manually and a second one called characters_ds which was read in from the csv file. Let’s join them by the name of the characters.
Inner join
If we use inner join, then the result table will contain the keys included in both Datasets.
Unfortunately the default syntax of join in Spark keeps the key fields from both Datasets. Thus having “name” column in both Datsets results in a DataFrame having two identical columns with identical names, and it is impossible to work with them later on, as we get the following error: Reference ‘name’ is ambiguous
In [28]:
val bad_join_df = characters_ds.join(friends_ds, characters_ds.col("name") === friends_ds.col("name"))
Out[28]:
bad_join_df: DataFrame = [name: string, height: int ... 7 more fields]
In [29]:
bad_join_df.show()+----------------+------+------+--------+---------+-------+-----------+----------------+--------------------+
| name|height|weight|eyecolor|haircolor| jedi| species| name| friends|
+----------------+------+------+--------+---------+-------+-----------+----------------+--------------------+
|Anakin Skywalker| 188| 84| blue| blond| jedi| human|Anakin Skywalker| Sheev Palpatine|
| Luke Skywalker| 172| 77| blue| blond| jedi| human| Luke Skywalker|Han Solo, Leia Sk...|
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Leia Skywalker| Obi-Wan Kenobi|
| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Obi-Wan Kenobi| Yoda, Qui-Gon Jinn|
| Han Solo| 180| 80| brown| brown|no_jedi| human| Han Solo|Leia Skywalker, L...|
| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Sheev Palpatine| Anakin Skywalker|
| R2-D2| 96| 32| null| null|no_jedi| droid| R2-D2| C-3PO|
| C-3PO| 167| 75| null| null|no_jedi| droid| C-3PO| R2-D2|
| Yoda| 66| 17| brown| brown| jedi| yoda| Yoda| Obi-Wan Kenobi|
| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Darth Maul| Sheev Palpatine|
| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Chewbacca| Han Solo|
| Jabba| 390| null| yellow| none|no_jedi| hutt| Jabba| Boba Fett|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human|Lando Calrissian| Han Solo|
+----------------+------+------+--------+---------+-------+-----------+----------------+--------------------+
In [30]:
bad_join_df.select($"name")org.apache.spark.sql.AnalysisException: Reference 'name' is ambiguous, could be: name#34, name#5.;
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolve(LogicalPlan.scala:264)
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveChildren(LogicalPlan.scala:148)
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$5$$anonfun$31.apply(Analyzer.scala:609)
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$5$$anonfun$31.apply(Analyzer.scala:609)
org.apache.spark.sql.catalyst.analysis.package$.withPosition(package.scala:48)
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:609)
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9$$anonfun$applyOrElse$5.applyOrElse(Analyzer.scala:605)
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:308)
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$transformUp$1.apply(TreeNode.scala:308)
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:307)
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$4.apply(TreeNode.scala:305)
org.apache.spark.sql.catalyst.trees.TreeNode$$anonfun$5.apply(TreeNode.scala:328)
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
org.apache.spark.sql.catalyst.trees.TreeNode.transformChildren(TreeNode.scala:326)
org.apache.spark.sql.catalyst.trees.TreeNode.transformUp(TreeNode.scala:305)
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionUp$1(QueryPlan.scala:269)
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:279)
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2$1.apply(QueryPlan.scala:283)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
scala.collection.AbstractTraversable.map(Traversable.scala:104)
org.apache.spark.sql.catalyst.plans.QueryPlan.org$apache$spark$sql$catalyst$plans$QueryPlan$$recursiveTransform$2(QueryPlan.scala:283)
org.apache.spark.sql.catalyst.plans.QueryPlan$$anonfun$8.apply(QueryPlan.scala:288)
org.apache.spark.sql.catalyst.trees.TreeNode.mapProductIterator(TreeNode.scala:186)
org.apache.spark.sql.catalyst.plans.QueryPlan.transformExpressionsUp(QueryPlan.scala:288)
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:605)
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$$anonfun$apply$9.applyOrElse(Analyzer.scala:547)
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan$$anonfun$resolveOperators$1.apply(LogicalPlan.scala:61)
org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(TreeNode.scala:69)
org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperators(LogicalPlan.scala:60)
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:547)
org.apache.spark.sql.catalyst.analysis.Analyzer$ResolveReferences$.apply(Analyzer.scala:484)
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:85)
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1$$anonfun$apply$1.apply(RuleExecutor.scala:82)
scala.collection.LinearSeqOptimized$class.foldLeft(LinearSeqOptimized.scala:124)
scala.collection.immutable.List.foldLeft(List.scala:84)
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:82)
org.apache.spark.sql.catalyst.rules.RuleExecutor$$anonfun$execute$1.apply(RuleExecutor.scala:74)
scala.collection.immutable.List.foreach(List.scala:381)
org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:74)
org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:65)
org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:63)
org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:51)
org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:64)
org.apache.spark.sql.Dataset.org$apache$spark$sql$Dataset$$withPlan(Dataset.scala:2603)
org.apache.spark.sql.Dataset.select(Dataset.scala:969)
$sess.cmd29Wrapper$Helper.<init>(cmd29.sc:1)
$sess.cmd29Wrapper.<init>(cmd29.sc:571)
$sess.cmd29$.<init>(cmd29.sc:448)
$sess.cmd29$.<clinit>(cmd29.sc:-1)
The solution to the problem above is to use Seq(“name”) in case the keys have the same appelation in your Datasets.
In [31]:
val sw_df = characters_ds.join(friends_ds, Seq("name"))
Out[31]:
sw_df: DataFrame = [name: string, height: int ... 6 more fields]
In [32]:
sw_df.show()+----------------+------+------+--------+---------+-------+-----------+--------------------+
| name|height|weight|eyecolor|haircolor| jedi| species| friends|
+----------------+------+------+--------+---------+-------+-----------+--------------------+
|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine|
| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...|
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi|
| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn|
| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...|
| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker|
| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO|
| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2|
| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi|
| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine|
| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo|
| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|
+----------------+------+------+--------+---------+-------+-----------+--------------------+
Although we created an inner join of two Datasets, thus the column types were all defined, the result of the join is a DataFrame.
In order to get a Dataset again, create a case class for the names and the types of the joined data and convert the DataFrame to Dataset.
In [33]:
case class SW(name: String,
height: Integer,
weight: Option[Integer],
eyecolor: Option[String],
haircolor: Option[String],
jedi: String,
species: String,
friends: String)
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
Out[33]:
defined class SW
In [34]:
val sw_ds = sw_df.as[SW]
Out[34]:
sw_ds: Dataset[SW] = [name: string, height: int ... 6 more fields]
In [35]:
sw_ds.show()+----------------+------+------+--------+---------+-------+-----------+--------------------+
| name|height|weight|eyecolor|haircolor| jedi| species| friends|
+----------------+------+------+--------+---------+-------+-----------+--------------------+
|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine|
| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...|
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi|
| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn|
| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...|
| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker|
| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO|
| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2|
| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi|
| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine|
| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo|
| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|
+----------------+------+------+--------+---------+-------+-----------+--------------------+
This sw_ds Dataset will be used mainly to demonstrate further Dataset transformation functions (apart from cases where I show very specific tasks using the ds_missing Dataset).
Other joins
If we have to keep all the keys from one of the Datasets we can use “left_outer” or “right_outer” properly.
In [36]:
characters_ds.join(ds_missing, characters_ds.col("name") === ds_missing.col("Who"), "left_outer").show()+----------------+------+------+--------+---------+-------+-----------+----------------+--------------------+
| name|height|weight|eyecolor|haircolor| jedi| species| Who| friends|
+----------------+------+------+--------+---------+-------+-----------+----------------+--------------------+
|Anakin Skywalker| 188| 84| blue| blond| jedi| human|Anakin Skywalker| Sheev Palpatine|
| Padme Amidala| 165| 45| brown| brown|no_jedi| human| null| null|
| Luke Skywalker| 172| 77| blue| blond| jedi| human| Luke Skywalker| null|
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Leia Skywalker| Obi-Wan Kenobi|
| Qui-Gon Jinn| 193| 89| blue| brown| jedi| human| null| null|
| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| null| null|
| Han Solo| 180| 80| brown| brown|no_jedi| human| Han Solo|Leia Skywalker, L...|
| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Sheev Palpatine| Anakin Skywalker|
| R2-D2| 96| 32| null| null|no_jedi| droid| null| null|
| C-3PO| 167| 75| null| null|no_jedi| droid| null| null|
| Yoda| 66| 17| brown| brown| jedi| yoda| Yoda| Obi-Wan Kenobi|
| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| null| null|
| Dooku| 193| 86| brown| brown| jedi| human| null| null|
| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| null| null|
| Jabba| 390| null| yellow| none|no_jedi| hutt| null| null|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| null| null|
| Boba Fett| 183| 78| brown| black|no_jedi| human| null| null|
| Jango Fett| 183| 79| brown| black|no_jedi| human| null| null|
+----------------+------+------+--------+---------+-------+-----------+----------------+--------------------+
3. Selecting columns
The first surprise was how overcomplicated is to select some columns from a Dataset. We already know the name and the type of the columns, but still if we need a part of the Dataset columns then the names or the types should be defined again. Let’s see the possibilities:
- If we use map, then the result is a Dataset so the column types are inherited but the column names are lost.
- If we use select and the column names, then the result is a DataFrame, so the type of the columns are lost.
- If we use select and provide the column names AND the column types, then the result is a Dataset with seemingly proper column names and proper types.
In [37]:
//1.
sw_ds.map(x => (x.name, x.weight))
Out[37]:
res36: Dataset[(String, Option[Integer])] = [_1: string, _2: int]
In [38]:
sw_ds.map(x => (x.name, x.weight)).show()+----------------+----+
| _1| _2|
+----------------+----+
|Anakin Skywalker| 84|
| Luke Skywalker| 77|
| Leia Skywalker| 49|
| Obi-Wan Kenobi| 77|
| Han Solo| 80|
| Sheev Palpatine| 75|
| R2-D2| 32|
| C-3PO| 75|
| Yoda| 17|
| Darth Maul| 80|
| Chewbacca| 112|
| Jabba|null|
|Lando Calrissian| 79|
+----------------+----+
In [39]:
//2.
sw_ds.select("name", "weight")
Out[39]:
res38: DataFrame = [name: string, weight: int]
In [40]:
sw_ds.select("name", "weight").show()+----------------+------+
| name|weight|
+----------------+------+
|Anakin Skywalker| 84|
| Luke Skywalker| 77|
| Leia Skywalker| 49|
| Obi-Wan Kenobi| 77|
| Han Solo| 80|
| Sheev Palpatine| 75|
| R2-D2| 32|
| C-3PO| 75|
| Yoda| 17|
| Darth Maul| 80|
| Chewbacca| 112|
| Jabba| null|
|Lando Calrissian| 79|
+----------------+------+
In [41]:
//3.
sw_ds.select($"name".as[String], $"weight".as[Integer])
Out[41]:
res40: Dataset[(String, Integer)] = [name: string, weight: int]
In [42]:
sw_ds.select($"name".as[String], $"weight".as[Integer]).show()+----------------+------+
| name|weight|
+----------------+------+
|Anakin Skywalker| 84|
| Luke Skywalker| 77|
| Leia Skywalker| 49|
| Obi-Wan Kenobi| 77|
| Han Solo| 80|
| Sheev Palpatine| 75|
| R2-D2| 32|
| C-3PO| 75|
| Yoda| 17|
| Darth Maul| 80|
| Chewbacca| 112|
| Jabba| null|
|Lando Calrissian| 79|
+----------------+------+
This last solution seems to work well but it has two problems:
- The result is a Dataset[(String, Integer)]. Despite seeing the column names in the display these names are valid only if we use the Dataset as a DataFrame. So we can refer to the columns as “weight” in the untyped expressions (for example .select(“weight”) ), but we can not use the column names in typed expresions where _.weight is needed. For example using gropupByKey(_.weight) or .map(x=> x.weight) after this selection step will result in the following error:
error: value weight is not a member of (String, Integer)
Instead of the column names we can refer to the columns in typed operation as ._1 or ._2. So although the names are inherited in a DataFrame sense they were lost in the Dataset sense. (Does it make sense?) - When defining “weight”.as[Integer] we can not use “weight”.as[Option[Integer]] and this could lead us to the NullPointerException because there is a missing value in that column for example by using filter(x=> x._2 > 79)
Either way the select is executed you will end up creating a proper case class. We can correct all 3 ways easily by using a new case class:
In [43]:
case class NameWeight(name: String, weight: Option[Integer])
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
Out[43]:
defined class NameWeight
In [44]:
//1. corrected
sw_ds.map(x => NameWeight(x.name, x.weight))
Out[44]:
res43: Dataset[NameWeight] = [name: string, weight: int]
In [45]:
sw_ds.map(x => NameWeight(x.name, x.weight)).show()+----------------+------+
| name|weight|
+----------------+------+
|Anakin Skywalker| 84|
| Luke Skywalker| 77|
| Leia Skywalker| 49|
| Obi-Wan Kenobi| 77|
| Han Solo| 80|
| Sheev Palpatine| 75|
| R2-D2| 32|
| C-3PO| 75|
| Yoda| 17|
| Darth Maul| 80|
| Chewbacca| 112|
| Jabba| null|
|Lando Calrissian| 79|
+----------------+------+
In [46]:
//2. corrected
sw_ds.select("name", "weight").as[NameWeight]
Out[46]:
res45: Dataset[NameWeight] = [name: string, weight: int]
In [47]:
sw_ds.select("name", "weight").as[NameWeight].show()+----------------+------+
| name|weight|
+----------------+------+
|Anakin Skywalker| 84|
| Luke Skywalker| 77|
| Leia Skywalker| 49|
| Obi-Wan Kenobi| 77|
| Han Solo| 80|
| Sheev Palpatine| 75|
| R2-D2| 32|
| C-3PO| 75|
| Yoda| 17|
| Darth Maul| 80|
| Chewbacca| 112|
| Jabba| null|
|Lando Calrissian| 79|
+----------------+------+
In [48]:
//3. corrected
sw_ds.select($"name".as[String], $"weight".as[Integer]).as[NameWeight]
Out[48]:
res47: Dataset[NameWeight] = [name: string, weight: int]
In [49]:
sw_ds.select($"name".as[String], $"weight".as[Integer]).as[NameWeight].show()+----------------+------+
| name|weight|
+----------------+------+
|Anakin Skywalker| 84|
| Luke Skywalker| 77|
| Leia Skywalker| 49|
| Obi-Wan Kenobi| 77|
| Han Solo| 80|
| Sheev Palpatine| 75|
| R2-D2| 32|
| C-3PO| 75|
| Yoda| 17|
| Darth Maul| 80|
| Chewbacca| 112|
| Jabba| null|
|Lando Calrissian| 79|
+----------------+------+
4. Renaming columns
By renaming some of the columns we get a DataFrame. (At least I could not find a column renamer function producing a Dataset.)
- If we use withColumnRenamed then we can rename the columns one-by-one, the result is a DataFrame.
- We can convert the Dataset to DataFrame and define all new column names in one step and the result is obviously a DataFrame.
It doesn’t matter which way is used to rename the columns, the result is a DataFrame. Finally we have to create a case class for the new column names and types and convert the DataFrame to Dataset.
In [50]:
sw_ds.withColumnRenamed("name", "Name")
Out[50]:
res49: DataFrame = [Name: string, height: int ... 6 more fields]
In [51]:
sw_ds.withColumnRenamed("name", "Name").show()+----------------+------+------+--------+---------+-------+-----------+--------------------+
| Name|height|weight|eyecolor|haircolor| jedi| species| friends|
+----------------+------+------+--------+---------+-------+-----------+--------------------+
|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine|
| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...|
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi|
| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn|
| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...|
| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker|
| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO|
| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2|
| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi|
| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine|
| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo|
| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|
+----------------+------+------+--------+---------+-------+-----------+--------------------+
The output shows that the result is a DataFrame.
We can rename more columns by chaining this function.
In [52]:
sw_ds.withColumnRenamed("name", "Who").withColumnRenamed("jedi", "Religion").show()+----------------+------+------+--------+---------+--------+-----------+--------------------+
| Who|height|weight|eyecolor|haircolor|Religion| species| friends|
+----------------+------+------+--------+---------+--------+-----------+--------------------+
|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine|
| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...|
| Leia Skywalker| 150| 49| brown| brown| no_jedi| human| Obi-Wan Kenobi|
| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn|
| Han Solo| 180| 80| brown| brown| no_jedi| human|Leia Skywalker, L...|
| Sheev Palpatine| 173| 75| blue| red| no_jedi| human| Anakin Skywalker|
| R2-D2| 96| 32| null| null| no_jedi| droid| C-3PO|
| C-3PO| 167| 75| null| null| no_jedi| droid| R2-D2|
| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi|
| Darth Maul| 175| 80| yellow| none| no_jedi|dathomirian| Sheev Palpatine|
| Chewbacca| 228| 112| blue| brown| no_jedi| wookiee| Han Solo|
| Jabba| 390| null| yellow| none| no_jedi| hutt| Boba Fett|
|Lando Calrissian| 178| 79| brown| blank| no_jedi| human| Han Solo|
+----------------+------+------+--------+---------+--------+-----------+--------------------+
If we would like to rename all the columns, then a shorter way could be to convert the Dataset into a DataFrame by .toDF and then define the new column names.
The column names in the case class are not case sensitive. If you changed only upper case — lower case pairs in the column names, then your original case class should still work. But if the new column names are different in at least one letter, then a new case class definition is needed with proper column names.
In [53]:
sw_ds.toDF(Seq("Name", "Height", "Weight", "Eyecolor", "Haircolor", "Jedi", "Species", "Friends"): _*).as[SW]
Out[53]:
res52: Dataset[SW] = [Name: string, Height: int ... 6 more fields]
In [54]:
sw_ds.toDF(Seq("Name", "Height", "Weight", "Eyecolor", "Haircolor", "Jedi", "Species", "Friends"): _*).as[SW].show()+----------------+------+------+--------+---------+-------+-----------+--------------------+
| Name|Height|Weight|Eyecolor|Haircolor| Jedi| Species| Friends|
+----------------+------+------+--------+---------+-------+-----------+--------------------+
|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine|
| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...|
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi|
| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn|
| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...|
| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker|
| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO|
| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2|
| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi|
| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine|
| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo|
| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|
+----------------+------+------+--------+---------+-------+-----------+--------------------+
In [54]:
// sw_ds.toDF(Seq("WHO", "Height", "Weight", "Eyecolor", "Haircolor", "Jedi", "Species", "Friends"): _*).as[S
In [55]:
case class SW2(WHO: String,
height: Integer,
weight: Option[Integer],
eyecolor: Option[String],
haircolor: Option[String],
jedi: String,
species: String,
friends: String)
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
Out[55]:
defined class SW2
In [56]:
sw_ds.toDF(Seq("WHO", "Height", "Weight", "Eyecolor", "Haircolor", "Jedi", "Species", "Friends"): _*).as[SW2]
Out[56]:
res55: Dataset[SW2] = [WHO: string, Height: int ... 6 more fields]
In [57]:
sw_ds.toDF(Seq("WHO", "Height", "Weight", "Eyecolor", "Haircolor", "Jedi", "Species", "Friends"): _*).as[SW2].show()+----------------+------+------+--------+---------+-------+-----------+--------------------+
| WHO|Height|Weight|Eyecolor|Haircolor| Jedi| Species| Friends|
+----------------+------+------+--------+---------+-------+-----------+--------------------+
|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine|
| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...|
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi|
| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn|
| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...|
| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker|
| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO|
| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2|
| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi|
| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine|
| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo|
| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|
+----------------+------+------+--------+---------+-------+-----------+--------------------+
5. Adding new columns
There are several ways to add new columns to your Dataset based on what kind of column is created. I show the main types with examples. Independently of how you added the new column and whether the type was defined or not, the result will be a DataFrame. So if you need a Dataset output, then define a proper case class and convert the DataFrame into a Dataset.
constant column
Adding a constant column is easy. Use the withColumn function and provide the name of the new column and the lit() with the value inside the brackets. The result is a DataFrame even if you define the type of the new colmn by sw_ds.withColumn(“count”, lit(1).as[Integer])
In [58]:
sw_ds.withColumn("count", lit(1))
Out[58]:
res57: DataFrame = [name: string, height: int ... 7 more fields]
In [59]:
sw_ds.withColumn("count", lit(1)).show()+----------------+------+------+--------+---------+-------+-----------+--------------------+-----+
| name|height|weight|eyecolor|haircolor| jedi| species| friends|count|
+----------------+------+------+--------+---------+-------+-----------+--------------------+-----+
|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine| 1|
| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...| 1|
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi| 1|
| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn| 1|
| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...| 1|
| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker| 1|
| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO| 1|
| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2| 1|
| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi| 1|
| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine| 1|
| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo| 1|
| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett| 1|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo| 1|
+----------------+------+------+--------+---------+-------+-----------+--------------------+-----+
expression1 — type1
In these expressions the function needs only one string of input, so we can simply use “colum_name”. In the example I calculated the logarithm of the weight of each character.
In [60]:
sw_ds.withColumn("log_weight", log("weight"))
Out[60]:
res59: DataFrame = [name: string, height: int ... 7 more fields]
In [61]:
sw_ds.withColumn("log_weight", log("weight")).show()+----------------+------+------+--------+---------+-------+-----------+--------------------+------------------+
| name|height|weight|eyecolor|haircolor| jedi| species| friends| log_weight|
+----------------+------+------+--------+---------+-------+-----------+--------------------+------------------+
|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine| 4.430816798843313|
| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...| 4.343805421853684|
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi|3.8918202981106265|
| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn| 4.343805421853684|
| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...| 4.382026634673881|
| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker| 4.31748811353631|
| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO|3.4657359027997265|
| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2| 4.31748811353631|
| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi| 2.833213344056216|
| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine| 4.382026634673881|
| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo| 4.718498871295094|
| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett| null|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|4.3694478524670215|
+----------------+------+------+--------+---------+-------+-----------+--------------------+------------------+
expression2 — type2
In case when the transformation needs more than a string input, we have to use dataset_name(“colum_name”) when referring to a column of the original Dataset.
For example we can calculate the Body mass index of the characters.
In [62]:
sw_ds.withColumn("BMI", sw_ds("weight")/(sw_ds("height")*sw_ds("height")/10000))
Out[62]:
res61: DataFrame = [name: string, height: int ... 7 more fields]
In [63]:
sw_ds.withColumn("BMI", sw_ds("weight")/(sw_ds("height")*sw_ds("height")/10000)).show()+----------------+------+------+--------+---------+-------+-----------+--------------------+------------------+
| name|height|weight|eyecolor|haircolor| jedi| species| friends| BMI|
+----------------+------+------+--------+---------+-------+-----------+--------------------+------------------+
|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine| 23.76641014033499|
| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...| 26.0275824770146|
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi| 21.77777777777778|
| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn|23.245984784446325|
| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...|24.691358024691358|
| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker| 25.05930702662969|
| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO| 34.72222222222222|
| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2| 26.89232313815483|
| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi| 39.02662993572085|
| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine|26.122448979591837|
| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo|21.545090797168356|
| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett| null|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|24.933720489837143|
+----------------+------+------+--------+---------+-------+-----------+--------------------+------------------+
User defined function
Finally if using expressions originally not defined for columns but for primitive types like Integer or String, then we have to create user defined functions aka UDFs.
The example I show creates a column containing a tuple made from two columns in the Dataset. We can create tuples from primitive type items, so we define an UDF and then use it on columns.
In [64]:
import scala.reflect.runtime.universe.TypeTag
def createTuple2[Type_x: TypeTag, Type_y: TypeTag] = udf[(Type_x, Type_y), Type_x, Type_y]((x: Type_x, y: Type_y) => (x, y))
Out[64]:
import scala.reflect.runtime.universe.TypeTagdefined function createTuple2
In [65]:
sw_ds.withColumn("Jedi_Species", createTuple2[String, String].apply(sw_ds("jedi"), sw_ds("species")))
Out[65]:
res64: DataFrame = [name: string, height: int ... 7 more fields]
In [66]:
sw_ds.withColumn("Jedi_Species", createTuple2[String, String].apply(sw_ds("jedi"), sw_ds("species"))).show()+----------------+------+------+--------+---------+-------+-----------+--------------------+--------------------+
| name|height|weight|eyecolor|haircolor| jedi| species| friends| Jedi_Species|
+----------------+------+------+--------+---------+-------+-----------+--------------------+--------------------+
|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine| [jedi,human]|
| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...| [jedi,human]|
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi| [no_jedi,human]|
| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn| [jedi,human]|
| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...| [no_jedi,human]|
| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker| [no_jedi,human]|
| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO| [no_jedi,droid]|
| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2| [no_jedi,droid]|
| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi| [jedi,yoda]|
| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine|[no_jedi,dathomir...|
| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo| [no_jedi,wookiee]|
| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett| [no_jedi,hutt]|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo| [no_jedi,human]|
+----------------+------+------+--------+---------+-------+-----------+--------------------+--------------------+
We can create tuple column from columns with missing values as well.
In [67]:
sw_ds.withColumn("Name_Weight", createTuple2[String, Option[Integer]].apply(sw_ds("name"), sw_ds("weight")))
Out[67]:
res66: DataFrame = [name: string, height: int ... 7 more fields]
In [68]:
sw_ds.withColumn("Name_Weight", createTuple2[String, Option[Integer]].apply(sw_ds("name"), sw_ds("weight"))).show()+----------------+------+------+--------+---------+-------+-----------+--------------------+--------------------+
| name|height|weight|eyecolor|haircolor| jedi| species| friends| Name_Weight|
+----------------+------+------+--------+---------+-------+-----------+--------------------+--------------------+
|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine|[Anakin Skywalker...|
| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...| [Luke Skywalker,77]|
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi| [Leia Skywalker,49]|
| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn| [Obi-Wan Kenobi,77]|
| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...| [Han Solo,80]|
| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker|[Sheev Palpatine,75]|
| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO| [R2-D2,32]|
| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2| [C-3PO,75]|
| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi| [Yoda,17]|
| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine| [Darth Maul,80]|
| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo| [Chewbacca,112]|
| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett| [Jabba,null]|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|[Lando Calrissian...|
+----------------+------+------+--------+---------+-------+-----------+--------------------+--------------------+
6. Filtering rows
By filtering we can get a part of the rows of the Dataset. The good news is that the names and the types of the columns do not change at all, so the result of a filter is always a Dataset with proper column names. But we have to be very careful when working with columns containing missing values. In the filter function we have to define how to filter the defined values and how to filter the missing values. Let’s see exmaples.
Filter a string column with no missing values: select the humans in the Dataset.
In [69]:
sw_ds.filter(x => x.species=="human")
Out[69]:
res68: Dataset[SW] = [name: string, height: int ... 6 more fields]
In [70]:
sw_ds.filter(x => x.species=="human").show()+----------------+------+------+--------+---------+-------+-------+--------------------+
| name|height|weight|eyecolor|haircolor| jedi|species| friends|
+----------------+------+------+--------+---------+-------+-------+--------------------+
|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine|
| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...|
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi|
| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn|
| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...|
| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|
+----------------+------+------+--------+---------+-------+-------+--------------------+
The same syntax could be used for eyecolor (which contains missing values) without getting error or warning. But the result is empty, although there are characters with brown eyes.
In [71]:
sw_ds.filter(x => x.eyecolor== "brown").show()+----+------+------+--------+---------+----+-------+-------+
|name|height|weight|eyecolor|haircolor|jedi|species|friends|
+----+------+------+--------+---------+----+-------+-------+
+----+------+------+--------+---------+----+-------+-------+
The reason is that == operation works on different types, but a string value won’t be equal to any option value as they are represented by Some(value). There are two ways to handle the situation:
- Use Some(value) in the filter
- use .getOrElse() function and define what should be returned in case of missing values. In the exmple I use .getOrElse(“”) which provides the value if it was defined or an empty string if there was missing value in the record
So let’s see both ways:
In [72]:
sw_ds.filter(x => x.eyecolor == Some("brown")).show()+----------------+------+------+--------+---------+-------+-------+--------------------+
| name|height|weight|eyecolor|haircolor| jedi|species| friends|
+----------------+------+------+--------+---------+-------+-------+--------------------+
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi|
| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...|
| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|
+----------------+------+------+--------+---------+-------+-------+--------------------+
In [73]:
sw_ds.filter(x => x.eyecolor.getOrElse("") == "brown").show()+----------------+------+------+--------+---------+-------+-------+--------------------+
| name|height|weight|eyecolor|haircolor| jedi|species| friends|
+----------------+------+------+--------+---------+-------+-------+--------------------+
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi|
| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...|
| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|
+----------------+------+------+--------+---------+-------+-------+--------------------+
Filtering numeric columns without missing values works as expected: filter charactes whose height is less than 100 cm.
In [74]:
sw_ds.filter(x => x.height<100).show()+-----+------+------+--------+---------+-------+-------+--------------+
| name|height|weight|eyecolor|haircolor| jedi|species| friends|
+-----+------+------+--------+---------+-------+-------+--------------+
|R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO|
| Yoda| 66| 17| brown| brown| jedi| yoda|Obi-Wan Kenobi|
+-----+------+------+--------+---------+-------+-------+--------------+
If there might be missing values in a numeric column (for example the type is Option[Integer]) then the syntax above gives an error.
sw_ds.filter(x => x.weight >=79)
would end in
…error: value >= is not a member of Option[Integer] …
The solution is to use pattern matching and define explicitly the filter for Some() values and for None (missing) values.
In [75]:
sw_ds.filter(x => x.weight match {case Some(y) => y>=79
case None => false} ).show()+----------------+------+------+--------+---------+-------+-----------+--------------------+
| name|height|weight|eyecolor|haircolor| jedi| species| friends|
+----------------+------+------+--------+---------+-------+-----------+--------------------+
|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine|
| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...|
| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine|
| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|
+----------------+------+------+--------+---------+-------+-----------+--------------------+
7. GroupBy and Aggregating
Calculate a function (mean, min, max etc.) of numeric colums by groups defined in a key column. The syntax is as expected but we have to define the type of the result columns in the aggregation function.
- the key for groupby is given in: groupByKey(_.columnname)
- the aggregation functions are given in .agg( function_name1(“columnName1”).as[new_type1], function_name2(“columnName2”).as[new_type2] )
We can define several aggregation functions for different columns withinin one aggregation.
In [76]:
sw_ds.groupByKey(_.species).agg(max($"height").as[Integer], min($"height").as[Integer], mean($"weight").as[Double], count($"species").as[Long] )
Out[76]:
res75: Dataset[(String, Integer, Integer, Double, Long)] = [value: string, max(height): int ... 3 more fields]
In [77]:
sw_ds.groupByKey(_.species).agg(max($"height").as[Integer], min($"height").as[Integer], mean($"weight").as[Double], count($"species").as[Long] ).show()+-----------+-----------+-----------+-----------------+--------------+
| value|max(height)|min(height)| avg(weight)|count(species)|
+-----------+-----------+-----------+-----------------+--------------+
| hutt| 390| 390| null| 1|
| human| 188| 150|74.42857142857143| 7|
|dathomirian| 175| 175| 80.0| 1|
| yoda| 66| 66| 17.0| 1|
| wookiee| 228| 228| 112.0| 1|
| droid| 167| 96| 53.5| 2|
+-----------+-----------+-----------+-----------------+--------------+
The same works for columns with missing values. Jabba was not included in the calculation as his weight is not known.
In [78]:
sw_ds.groupByKey(_.eyecolor).agg(mean($"weight").as[Double])
Out[78]:
res77: Dataset[(Option[String], Double)] = [key: struct<value: string>, avg(weight): double]
In [79]:
sw_ds.groupByKey(_.eyecolor).agg(mean($"weight").as[Double]).show()+----------+-----------+
| key|avg(weight)|
+----------+-----------+
| [yellow]| 80.0|
| [null]| 53.5|
|[bluegray]| 77.0|
| [brown]| 56.25|
| [blue]| 87.0|
+----------+-----------+
The key can contain missing values and the missing values will form a separate group in the groupByKey. The columns in the aggregateion function might also contain missing values and they will be ignored from numerical computations.
Please note that the output is a Dataset with proper column types, but the column names can be used noly as DataFrame columns (“columnName”) and they could be referred by ._1 or ._2 etc as Dataset columns. For example .map(x => x.value) won’t work.
GroupBy multiple keys
For using multiple keys in groupByKey create a tuple from the key columns.
In [80]:
sw_ds.groupByKey(x=>(x.species, x.jedi, x.haircolor)).agg(mean($"weight").as[Double], count($"species").as[Long])
Out[80]:
res79: Dataset[((String, String, Option[String]), Double, Long)] = [key: struct<_1: string, _2: string ... 1 more field>, avg(weight): double ... 1 more field]
In [81]:
sw_ds.groupByKey(x=>(x.species, x.jedi, x.haircolor)).agg(mean($"weight").as[Double], count($"species").as[Long] ).show()+--------------------+-----------+--------------+
| key|avg(weight)|count(species)|
+--------------------+-----------+--------------+
| [hutt,no_jedi,none]| null| 1|
|[human,no_jedi,bl...| 79.0| 1|
|[dathomirian,no_j...| 80.0| 1|
| [human,no_jedi,red]| 75.0| 1|
|[droid,no_jedi,null]| 53.5| 2|
|[wookiee,no_jedi,...| 112.0| 1|
|[human,no_jedi,br...| 64.5| 2|
| [yoda,jedi,brown]| 17.0| 1|
| [human,jedi,auburn]| 77.0| 1|
| [human,jedi,blond]| 80.5| 2|
+--------------------+-----------+--------------+
8. Sorting by rows
Sort is easy, there is no surprise in the synatx.
In [82]:
sw_ds.orderBy($"species".desc, $"weight")
Out[82]:
res81: Dataset[SW] = [name: string, height: int ... 6 more fields]
In [83]:
sw_ds.orderBy($"species".desc, $"weight").show()+----------------+------+------+--------+---------+-------+-----------+--------------------+
| name|height|weight|eyecolor|haircolor| jedi| species| friends|
+----------------+------+------+--------+---------+-------+-----------+--------------------+
| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi|
| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo|
| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett|
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi|
| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker|
| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...|
| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|
| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...|
|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine|
| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO|
| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2|
| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine|
+----------------+------+------+--------+---------+-------+-----------+--------------------+
9. Appending Datasets
Adding two Datasets with the same case class definition is a cake-walk.
In [84]:
sw_ds.union(sw_ds)
Out[84]:
res83: Dataset[SW] = [name: string, height: int ... 6 more fields]
In [85]:
sw_ds.union(sw_ds).show()+----------------+------+------+--------+---------+-------+-----------+--------------------+
| name|height|weight|eyecolor|haircolor| jedi| species| friends|
+----------------+------+------+--------+---------+-------+-----------+--------------------+
|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine|
| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...|
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi|
| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn|
| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...|
| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker|
| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO|
| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2|
| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi|
| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine|
| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo|
| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo|
|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine|
| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...|
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi|
| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn|
| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...|
| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker|
| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO|
+----------------+------+------+--------+---------+-------+-----------+--------------------+
only showing top 20 rows
10. Other useful functions
Describe the Dataset
- get the number of records by using count()
- get the number of columns by using .columns.size
- get the schema by using printSchema or by dtypes
In [86]:
sw_ds.count()
Out[86]:
res85: Long = 13L
In [87]:
sw_ds.columns.size
Out[87]:
res86: Int = 8
In [88]:
sw_ds.printSchemaroot
|-- name: string (nullable = true)
|-- height: integer (nullable = true)
|-- weight: integer (nullable = true)
|-- eyecolor: string (nullable = true)
|-- haircolor: string (nullable = true)
|-- jedi: string (nullable = true)
|-- species: string (nullable = true)
|-- friends: string (nullable = true)
In [89]:
sw_ds.dtypes
Out[89]:
res88: Array[(String, String)] = Array(
("name", "StringType"),
("height", "IntegerType"),
("weight", "IntegerType"),
("eyecolor", "StringType"),
("haircolor", "StringType"),
("jedi", "StringType"),
("species", "StringType"),
("friends", "StringType")
)
Some more aggregation functions
Calculate correlation between columns (optionally by groups)
In [90]:
sw_ds.agg(corr($"height", $"weight").as[Double])
Out[90]:
res89: DataFrame = [corr(height, weight): double]
In [91]:
sw_ds.agg(corr($"height", $"weight").as[Double]).show()+--------------------+
|corr(height, weight)|
+--------------------+
| 0.9823964963433257|
+--------------------+
In [92]:
sw_ds.groupByKey(_.jedi).agg(corr($"height", $"weight").as[Double]).show()[Stage 93:===================================================> (188 + 1) / 199]+-------+--------------------+
| value|corr(height, weight)|
+-------+--------------------+
|no_jedi| 0.9749158985081434|
| jedi| 0.9973783324232722|
+-------+--------------------+
Get the first value by group
In [93]:
sw_ds.groupByKey(_.species).agg(first($"name").as[String]).show()[Stage 98:====================================================> (193 + 1) / 199]+-----------+------------------+
| value|first(name, false)|
+-----------+------------------+
| hutt| Jabba|
| human| Anakin Skywalker|
|dathomirian| Darth Maul|
| yoda| Yoda|
| wookiee| Chewbacca|
| droid| R2-D2|
+-----------+------------------+
Other useful functions for creating new columns
In the following examples we add new columns to the Dataset thus the result is a DataFrame. In order to get Dataset again create a proper case class and convert the result into Dataset.
The first example is the hash function of a column.
In [94]:
sw_ds.withColumn("hashed_hair", hash(sw_ds("haircolor"))).show()+----------------+------+------+--------+---------+-------+-----------+--------------------+-----------+
| name|height|weight|eyecolor|haircolor| jedi| species| friends|hashed_hair|
+----------------+------+------+--------+---------+-------+-----------+--------------------+-----------+
|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine| -519935767|
| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...| -519935767|
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi| 1075090752|
| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn| 1156710799|
| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...| 1075090752|
| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker| 1862204291|
| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO| 42|
| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2| 42|
| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi| 1075090752|
| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine| -209080169|
| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo| 1075090752|
| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett| -209080169|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo| 539099867|
+----------------+------+------+--------+---------+-------+-----------+--------------------+-----------+
The next example is calculating the size of a collection. For example get the number friends listed in the friends column. We need two steps:
- split the friends column at string “, “. This way we get an Array of Strings. Note that map drops the column names, so after this step we have to refer to the splitted column as _2
- use the size() to get the number of items are in that Array
In [95]:
sw_ds
.map(x => (x.name, x.friends.split(", ")) )
.withColumn("NrOfFriendsListed", size($"_2")).show()+----------------+--------------------+-----------------+
| _1| _2|NrOfFriendsListed|
+----------------+--------------------+-----------------+
|Anakin Skywalker| [Sheev Palpatine]| 1|
| Luke Skywalker|[Han Solo, Leia S...| 2|
| Leia Skywalker| [Obi-Wan Kenobi]| 1|
| Obi-Wan Kenobi|[Yoda, Qui-Gon Jinn]| 2|
| Han Solo|[Leia Skywalker, ...| 4|
| Sheev Palpatine| [Anakin Skywalker]| 1|
| R2-D2| [C-3PO]| 1|
| C-3PO| [R2-D2]| 1|
| Yoda| [Obi-Wan Kenobi]| 1|
| Darth Maul| [Sheev Palpatine]| 1|
| Chewbacca| [Han Solo]| 1|
| Jabba| [Boba Fett]| 1|
|Lando Calrissian| [Han Solo]| 1|
+----------------+--------------------+-----------------+
Add a monotonically increasing id into a new column using the function monotonically_increasing_id.
In [96]:
sw_ds.withColumn("id", monotonically_increasing_id).show()+----------------+------+------+--------+---------+-------+-----------+--------------------+---+
| name|height|weight|eyecolor|haircolor| jedi| species| friends| id|
+----------------+------+------+--------+---------+-------+-----------+--------------------+---+
|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine| 0|
| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...| 1|
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi| 2|
| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn| 3|
| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...| 4|
| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker| 5|
| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO| 6|
| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2| 7|
| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi| 8|
| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine| 9|
| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo| 10|
| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett| 11|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo| 12|
+----------------+------+------+--------+---------+-------+-----------+--------------------+---+
Create a column containing random numbers.
In [97]:
sw_ds.withColumn("random",rand).show()+----------------+------+------+--------+---------+-------+-----------+--------------------+--------------------+
| name|height|weight|eyecolor|haircolor| jedi| species| friends| random|
+----------------+------+------+--------+---------+-------+-----------+--------------------+--------------------+
|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine| 0.48438915958199347|
| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...| 0.7600297009795522|
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi| 0.8104677720561386|
| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn| 0.4435893397387436|
| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...| 0.8700910546745284|
| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker|0.012168341985767994|
| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO| 0.6960843307761542|
| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2| 0.47676267860146915|
| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi| 0.6218918737597758|
| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine| 0.1761098443681477|
| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo| 0.5723497914126543|
| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett| 0.04547363864142695|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo| 0.5367659290577105|
+----------------+------+------+--------+---------+-------+-----------+--------------------+--------------------+
Calculate the lenght of strings in a column. For example count the length of the character names in our Dataset.
In [98]:
sw_ds.withColumn("name_lenth", length(sw_ds("name"))).show()+----------------+------+------+--------+---------+-------+-----------+--------------------+----------+
| name|height|weight|eyecolor|haircolor| jedi| species| friends|name_lenth|
+----------------+------+------+--------+---------+-------+-----------+--------------------+----------+
|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine| 16|
| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...| 14|
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi| 14|
| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn| 14|
| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...| 8|
| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker| 15|
| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO| 5|
| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2| 5|
| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi| 4|
| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine| 10|
| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo| 9|
| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett| 5|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo| 16|
+----------------+------+------+--------+---------+-------+-----------+--------------------+----------+
We can also get the levenshtein distance between two string columns:
In [99]:
sw_ds.withColumn("name_species_diff", levenshtein(sw_ds("name"), sw_ds("species"))).show()+----------------+------+------+--------+---------+-------+-----------+--------------------+-----------------+
| name|height|weight|eyecolor|haircolor| jedi| species| friends|name_species_diff|
+----------------+------+------+--------+---------+-------+-----------+--------------------+-----------------+
|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine| 15|
| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...| 12|
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi| 13|
| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn| 12|
| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...| 8|
| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker| 12|
| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO| 5|
| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2| 5|
| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi| 1|
| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine| 9|
| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo| 9|
| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett| 5|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo| 14|
+----------------+------+------+--------+---------+-------+-----------+--------------------+-----------------+
Finally we can find the location of a substring within a string by using locate. In the example we look for the first occurrence of letter “S” in the name of the characters.
In [100]:
sw_ds.withColumn("Loc_y", locate("S", sw_ds("name"))).show()+----------------+------+------+--------+---------+-------+-----------+--------------------+-----+
| name|height|weight|eyecolor|haircolor| jedi| species| friends|Loc_y|
+----------------+------+------+--------+---------+-------+-----------+--------------------+-----+
|Anakin Skywalker| 188| 84| blue| blond| jedi| human| Sheev Palpatine| 8|
| Luke Skywalker| 172| 77| blue| blond| jedi| human|Han Solo, Leia Sk...| 6|
| Leia Skywalker| 150| 49| brown| brown|no_jedi| human| Obi-Wan Kenobi| 6|
| Obi-Wan Kenobi| 182| 77|bluegray| auburn| jedi| human| Yoda, Qui-Gon Jinn| 0|
| Han Solo| 180| 80| brown| brown|no_jedi| human|Leia Skywalker, L...| 5|
| Sheev Palpatine| 173| 75| blue| red|no_jedi| human| Anakin Skywalker| 1|
| R2-D2| 96| 32| null| null|no_jedi| droid| C-3PO| 0|
| C-3PO| 167| 75| null| null|no_jedi| droid| R2-D2| 0|
| Yoda| 66| 17| brown| brown| jedi| yoda| Obi-Wan Kenobi| 0|
| Darth Maul| 175| 80| yellow| none|no_jedi|dathomirian| Sheev Palpatine| 0|
| Chewbacca| 228| 112| blue| brown|no_jedi| wookiee| Han Solo| 0|
| Jabba| 390| null| yellow| none|no_jedi| hutt| Boba Fett| 0|
|Lando Calrissian| 178| 79| brown| blank|no_jedi| human| Han Solo| 0|
+----------------+------+------+--------+---------+-------+-----------+--------------------+-----+
11. Friendcount example
There is no Spark tutorial without the beloved wordcount example 😉
I prepared a slightly modified version of the wordcount task. Let’s calculate how many times a charater was referred as a friend in the friends column.
I solve this problem in two ways.
First solution
In the first solution I use only the friends column and do the following steps:
- map — select the column friends
- flatMap and split — split the strings in the friends column at “, ” — as a result every full name will be in a new row
- groupByKey — the key is the new (splitted) column
- count — get the counts
So the result is how many times a character was mentioned as a friend.
If you wanted to run wordcount, then split the text at spaces by using split(” “)
In [101]:
sw_ds
.map(x => x.friends)
.flatMap(_.split(", "))
.groupByKey(_.toString)
.count()
.show()+----------------+--------+
| value|count(1)|
+----------------+--------+
| C-3PO| 1|
| Han Solo| 3|
| Sheev Palpatine| 2|
| Leia Skywalker| 2|
| Boba Fett| 1|
| Qui-Gon Jinn| 1|
| Yoda| 1|
| Luke Skywalker| 1|
| Obi-Wan Kenobi| 3|
|Anakin Skywalker| 1|
| Chewbacca| 1|
| R2-D2| 1|
+----------------+--------+
If the friends column has missing values, i.e. the type is Option[String] then we have to use the .getOrElse(“”) to handle missing values.
In [102]:
ds_missing
.map(x => x.friends)
.flatMap(_.getOrElse("").split(", "))
.groupByKey(_.toString)
.count()
.show()+----------------+--------+
| value|count(1)|
+----------------+--------+
| Sheev Palpatine| 1|
| Leia Skywalker| 1|
| Luke Skywalker| 1|
| Obi-Wan Kenobi| 3|
|Anakin Skywalker| 1|
| | 1|
+----------------+--------+
Second solution
In the second solution I keep the name column from the original Dataset as well. Thus we will see the name — friend pairs for every friend referred in a new row. This could be useful in case of a more complex question (for exmple how many friends of the character have letter “S” in their names). We could also count the number of friends listed by each character and the number of times a character was referred as a friend from the same Dataset.
To get the name — friend pair Dataset do the following steps:
- use map to select columns name and friends splitted at string “, “
- use withcolumn to create a new column containig the exploded splitted friends. The explode creates a new row for every item in the splitted friend column. The first argument in the withColumn function is the name of the newly created column. If we write here _2, then we overwrite the splitted friend column.
Let’s see the code in action:
In [103]:
import org.apache.spark.sql.functions.explode
Out[103]:
import org.apache.spark.sql.functions.explode
In [104]:
sw_ds
.map(x => (x.name, x.friends.split(", ")) )
.withColumn("friend", explode($"_2"))
.show()+----------------+--------------------+----------------+
| _1| _2| friend|
+----------------+--------------------+----------------+
|Anakin Skywalker| [Sheev Palpatine]| Sheev Palpatine|
| Luke Skywalker|[Han Solo, Leia S...| Han Solo|
| Luke Skywalker|[Han Solo, Leia S...| Leia Skywalker|
| Leia Skywalker| [Obi-Wan Kenobi]| Obi-Wan Kenobi|
| Obi-Wan Kenobi|[Yoda, Qui-Gon Jinn]| Yoda|
| Obi-Wan Kenobi|[Yoda, Qui-Gon Jinn]| Qui-Gon Jinn|
| Han Solo|[Leia Skywalker, ...| Leia Skywalker|
| Han Solo|[Leia Skywalker, ...| Luke Skywalker|
| Han Solo|[Leia Skywalker, ...| Obi-Wan Kenobi|
| Han Solo|[Leia Skywalker, ...| Chewbacca|
| Sheev Palpatine| [Anakin Skywalker]|Anakin Skywalker|
| R2-D2| [C-3PO]| C-3PO|
| C-3PO| [R2-D2]| R2-D2|
| Yoda| [Obi-Wan Kenobi]| Obi-Wan Kenobi|
| Darth Maul| [Sheev Palpatine]| Sheev Palpatine|
| Chewbacca| [Han Solo]| Han Solo|
| Jabba| [Boba Fett]| Boba Fett|
|Lando Calrissian| [Han Solo]| Han Solo|
+----------------+--------------------+----------------+
In the example above there is a _2 column containing the result of the split and a third friend column for every item in the _2 columns. In the next example I use _2 as the name of the new column, thus overwrite the splitted friend column, and rename the columns. Then a new case class is defined an the result is converted to Dataset. We will do more transfromation on this Dataset.
In [105]:
case class NameFriend(name: String, friend: String)
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
Out[105]:
defined class NameFriend
In [106]:
val NameFriend_df =sw_ds
.map(x => (x.name, x.friends.split(", ")) )
.withColumn("_2", explode($"_2"))
.toDF(Seq("name", "friend"): _*)
Out[106]:
NameFriend_df: DataFrame = [name: string, friend: string]
In [107]:
val NameFriend_ds = NameFriend_df.as[NameFriend]
Out[107]:
NameFriend_ds: Dataset[NameFriend] = [name: string, friend: string]
In [108]:
NameFriend_ds.show()+----------------+----------------+
| name| friend|
+----------------+----------------+
|Anakin Skywalker| Sheev Palpatine|
| Luke Skywalker| Han Solo|
| Luke Skywalker| Leia Skywalker|
| Leia Skywalker| Obi-Wan Kenobi|
| Obi-Wan Kenobi| Yoda|
| Obi-Wan Kenobi| Qui-Gon Jinn|
| Han Solo| Leia Skywalker|
| Han Solo| Luke Skywalker|
| Han Solo| Obi-Wan Kenobi|
| Han Solo| Chewbacca|
| Sheev Palpatine|Anakin Skywalker|
| R2-D2| C-3PO|
| C-3PO| R2-D2|
| Yoda| Obi-Wan Kenobi|
| Darth Maul| Sheev Palpatine|
| Chewbacca| Han Solo|
| Jabba| Boba Fett|
|Lando Calrissian| Han Solo|
+----------------+----------------+
Finally we will answer three different questions using the NameFriend_ds Dataset:
A. How many times the characters were referred as a friend?
Solution:
- groupByKey where the key is the splitted and exploded referred friends column
- count — calculate the number of occurrence of refrerred friends
- orderBy — sort the values by decreasing popularity
In [109]:
//A.
NameFriend_ds
.groupByKey(_.friend)
.count()
.orderBy($"count(1)".desc)
.show()+----------------+--------+
| value|count(1)|
+----------------+--------+
| Han Solo| 3|
| Obi-Wan Kenobi| 3|
| Leia Skywalker| 2|
| Sheev Palpatine| 2|
| C-3PO| 1|
| Boba Fett| 1|
| Qui-Gon Jinn| 1|
| Luke Skywalker| 1|
|Anakin Skywalker| 1|
| R2-D2| 1|
| Yoda| 1|
| Chewbacca| 1|
+----------------+--------+
Han Solo and Obi-Wan Kenobi were the most popular, they were mentioned by 3 other charates as their friends.
B. How many friends were listed by the characters?
Solution:
- groupByKey where the key is name of the characters
- count — calculate the number of occurrence of a name
- orderBy — sort the values by decreasing number of listed friends
In [110]:
//B.
NameFriend_ds
.groupByKey(_.name)
.count()
.orderBy($"count(1)".desc)
.show()[Stage 133:==================================================> (190 + 1) / 200]+----------------+--------+
| value|count(1)|
+----------------+--------+
| Han Solo| 4|
| Luke Skywalker| 2|
| Obi-Wan Kenobi| 2|
| C-3PO| 1|
| Leia Skywalker| 1|
| Sheev Palpatine| 1|
| Darth Maul| 1|
| Jabba| 1|
| Yoda| 1|
|Lando Calrissian| 1|
|Anakin Skywalker| 1|
| Chewbacca| 1|
| R2-D2| 1|
+----------------+--------+
Han Solo listed 4 friends, Luke listed 2, etc.
C. How many friends were listed with letter “S” in their names by the characters?
Solution:
- create a case class containing a new Integer column
- withColumn — add the new column with the position of letter “S”
- convert the result into Dataset
- filter rows where the position of “S” is greater than 0 (the remaining rows contain friend with letter “S”)
- groupByKey — where the key is name of the characters
- count the number of rows by characters in the filtered Dataset
- orderBy — sort the values by decreasing number of friends with letter “S” in their names
In [111]:
case class NameFriendS_ds(name: String, friend: String, S_in_friend:Integer)
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
Out[111]:
defined class NameFriendS_ds
In [112]:
//C.
NameFriend_ds
.withColumn("S_in_friend", locate("S", (NameFriend_ds("friend"))) )
.as[NameFriendS_ds]
.filter(x=>x.S_in_friend>0)
.groupByKey(_.name)
.count()
.orderBy($"count(1)".desc)
.show()+----------------+--------+
| value|count(1)|
+----------------+--------+
| Han Solo| 2|
| Luke Skywalker| 2|
| Sheev Palpatine| 1|
| Darth Maul| 1|
|Anakin Skywalker| 1|
|Lando Calrissian| 1|
| Chewbacca| 1|
+----------------+--------+
We can see for example that Han Solo and Luke Skywalker have two friends whose name contain letter “S”. Characters not listed in the output have no friends with letter “S”.
Link to official documentations
More functions could be found in spark.apache.org documentation: http://spark.apache.org/docs/latest/api/scala/index.html#org.apache.spark.sql.Dataset
Originally published at www.balabit.com on December 12, 2016 by Eszter Windhager-Pokol.