Spark Tutorial — Using Filter and Count

Luck Charoenwatana
LuckSpark
Published in
12 min readMar 20, 2018

--

Since raw data can be very huge, one of the first common things to do when processing raw data is filtering. Data that is not relevant to the analysis shall be discarded as much as possible in the first few steps. This would make the code less prone to errors, consumes less resources, and runs faster.

This tutorial will guide you how to perform basic filtering on your data based on RDD transformations.

Objectives

What you will learn includes

  • How to read data from file to an RDD using .textFile()
  • How to sample and print elements of an RDD using .take()
  • How to filter data using .filter() with .contains()
  • How to use NOT with .contains()
  • How to count the number of RDD elements using .count()

Information regarding Spark setup and environment used in this tutorial are provided on this Spark Installation (another version in Thai here).

For absolute beginners, please also read Hello World programming on Spark if you have not.

Ok. Let’s get started.

The data

The first dataset that I will use throughout a number of my Spark tutorials is the results of football matches, which can be found here.

For this tutorial, let’s take a look at and download the English Premier League season 2016/2017. The data is a .csv file. It looks like this when being opened in a text editor, Sublime in this case.

Match results of English Premier League season 2016/2017 (E0.csv), open in a Sublime.
  • The data contains a number of rows, 381 to be exact, and each row contains several fields separated by commas.
  • The first line contains the information of the header row. It is no the actual data but rather the description of the data. The full description can be found here.
  • Here I put some descriptions along with the first dataset (the second line in the file)
Here are parts of the first data (from the second row)
E0,13/08/16,Burnley,Swansea,0,1,A,0,0,D,J Moss,...
Here are descriptions (with data)Div = League Division (E0)
Date = Match Date (dd/mm/yy) (13/08/16)
HomeTeam = Home Team (Burnley)
AwayTeam = Away Team (Swansea)
FTHG and HG = Full Time Home Team Goals (0)
FTAG and AG = Full Time Away Team Goals (1)
FTR and Res = Full Time Result (H=Home Win, D=Draw, A=Away Win) (A)
HTHG = Half Time Home Team Goals (0)
HTAG = Half Time Away Team Goals (0)
HTR = Half Time Result (H=Home Win, D=Draw, A=Away Win) (D)
Referee = Match Referee (J Moss)

The file looks more or less like this when you open it from a spreadsheet program.

  • Notice that the program automatically put the description line as a header row.
  • The program, however, mistakenly separate the first column (Div) apart from the other columns.
Match results of English Premier League season 2016/2017 (E0.csv), open in a spreadsheet application.
  • I rename the file (from E0.csv) to E02016.csv. Throughout the rest of this article, I will refer to the file using this new name.

Directory Structure

I name this project as 002_filtering. You can name it anything as you wish. My path for this project is ~/scalaSpark/002_filtering/. I created a separate directory specifically for all raw data to be read by Spark at ~/scalaSpark/input/. Here are the summary of relevant directories:

  • project directory : ~/scalaSpark/002_filtering/
  • .sbt file : ~/scalaSpark/002_filtering/002filtering.sbt
  • .scala file : ~/scalaSpark/002_filtering/src/main/scala/002filtering.scala
  • E02016.csv : ~/scalaSpark/input/E02016.csv

Setup the .sbt file

Here are what you have to put in the sbt file.

name := "002filtering"
version := "1.0"
scalaVersion := "2.11.8"
libraryDependencies += "org.apache.spark" % "spark-core_2.11" % "2.3.0"
  • The last line : libraryDependencies += “org.apache.spark” % “spark-core_2.11” % “2.3.0” is Spark dependency.
  • In this tutorial we will use only basic RDD functions, thus only spark-core is needed.
  • The number 2.11 refers to version of Scala, which is 2.11.x. The number 2.3.0 is Spark version.

Write the Scala code

Now it is time to write our code to the process the E02016.csv file. We will start from short codes, then run, then add some more codes, then run, repeatedly. I will explain the codes along the way.

1. Setup SparkContext

  • Create the 002filtering.scala file and add these lines to it.
  • Then save the file.
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object filtering002
{
def main(args: Array[String])
{
println("\n\n>>>>> START OF PROGRAM <<<<<\n\n");
val conf = new SparkConf().setAppName("002filtering")
val sc = new SparkContext(conf);
println("\n\n>>>>> END OF PROGRAM <<<<<\n\n");
}
}
  • In line 3 I named the object as filtering002. This is just because the object name cannot start with numbers.
  • The code begins by importing SparkContext and SparkConf. SparkConf is basically the configuration parameters of SparkContext.
  • SparkContext is the class for the Spark world, where your codes are managed and run. You can refer to the Spark’s main API page or SparkContext API page for more information.
  • The val conf = new SparkConf() line creates the configuration for SparkContext. In this case, only 1 parameter, AppName, is defined.
  • The val sc = new SparkContext(conf) line creates a SparkContext object using the SparkConf defined.
  • val is a Scala reserve word use to declare a new variable. The variables declared with val is immutable. An immutable variable is a variable that its value cannot be changed later. This similar to a constant in some other languages. It is common to use immutable variables in Spark. If we need to alter an immutable variable, we commonly create a new modified immutable one rather than directly modify the existing one. You will gradually learn why this is Spark’s way of coding as you proceed.
  • The two println lines just print out strings. These are what we expect to see on the output after running the code.

Run

Although we have not done anything much here, but let’s run to code to make sure things go well so far.

cd ~/scalaSpark/002_filtering       # goto project home
sbt package # compile the project
### Wait for the compilation.
### The compilation should end with "Success"
### Then run the spark-submit command belowspark-submit ./target/scala-2.11/002filtering_2.11-1.0.jar
  • Output of sbt package
The beginning output lines from sbt package command.
Then finishing output lines from sbt package command.
  • Output of spark-submit
The beginning output lines from spark-submit command.
Then finishing output lines from spark-submit command.
  • From the spark-submit command, you will see lines of output.
  • You should clearly see the START OF PROGRAM and END OF PROGRAM lines in the output. This indicates that you have done well so far.

Now, let’s read the E02016.csv file into Spark and do some interesting things to it.

2. Read .csv file into Spark

  • Spark allows you to read several file formats, e.g., text, csv, xls, and turn it in into an RDD. We then apply series of operations, such as filters, count, or merge, on RDDs to obtain the final outcome.
  • To read the file E02016.csv into an RDD, add this line to the scala file.
val logfile = "./../input/E02016.csv"
//OR val logfile = "/Users/luckspark/scalaSpark/input/E02016.csv"
val logrdd = sc.textFile(logfile)
  • Note that I use the relative path “./../input/E02016.csv" to refer to the data file. This is the path relative to the location that you run the spark-submit command. For example, I commonly execute the spark-submit command at the project directory, which is ~/scalaSpark/002_filtering in this case. That is, my location is ~/scalaSpark/002_filtering and the file location is ~/scalaSpark/input/E02016.csv. Therefore, the relative path from my location to the file is ./../input/E2016.csv. If I run the spark-submit command at any other location, file does not exist error would be thrown at runtime.
  • Alternatively, you can use an absolute path instead, as I put in the //OR comment line.
  • Next, The sc.textFile() reads the logfile and convert it into an RDD named logrdd.

3. Print elements of an RDD

  • One of the main reasons for printing out RDD elements is to see whether the data format or pattern are as what we expected. The raw data is generally huge and it is rare to print ALL elements of an RDD.
  • Rather than printing out all elements, using .take() to print out RDD elements is a good idea. It picks samples of elements for us. Every time the code is executed, samples taken are generally the same. This is, however, not always, both the data itself or the order of the data could be different.
  • Now that we have read all lines from E02016.csv file onto the logrdd RDD.
  • Let’s print the logrdd and see if the data is correctly read using a combination of take, foreach, and println, like this
logrdd.take(10).foreach(println)
  • The dot (.) is how you connect the methods together. Methods are processed from left to right. The output of the method on the left becomes an input of the next method on the right. Here the output of .take(10) becomes the input to the .foreach(println) method.
  • .take(10) take 10 sample rows from the logrdd RDD and repeatedly print each of them out.
  • Now, let’s recap and run the code. Below is the entire code with the new lines added.
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object filtering002
{
def main(args: Array[String])
{
println("\n\n>>>>> START OF PROGRAM <<<<<\n\n")
val conf = new SparkConf().setAppName("002filtering")
val sc = new SparkContext(conf)
val logfile = "./../input/E02016.csv"
val logrdd = sc.textFile(logfile)
logrdd.take(10).foreach(println) println("\n\n>>>>> END OF PROGRAM <<<<<\n\n")
}
}
  • Re-compile and run the code using the same commands: sbt package and then spark-submit ./target/scala-2.11/002filtering_2.11–1.0.jar
  • The output is shown below.
  • At the very end, above the END OF PROGRAM string, you shall see the contents of the E02016.csv displayed.
Read E02016.csv and print 10 sample rows.
  • Let’s compare it with the data displayed in text editor.
E02016.csv opened using Sublime.

4. Data selection by row using .filter( )

Now, as you can see, there are too much data. Let’s filter something out to make things more meaningful.

First, let’s filter out the header row. This is obvious since it is mixed with the data and would definitely cause errors to analysis outcomes. For example, if we wish to count the total number of matches played in the season, since the data is one match per line, simply counting the number of lines would give us the answer. This would, however, mistakenly include the header line in the counting, giving out the answer of 381 rather than 380 matches per season. This is clearly undesirable. Thus, let’s discard the header row now.

  • You can select to filter out rows by criteria using .filter()
  • Let’s try this code.
val f1 = logrdd.filter(s => s.contains("E0"))
f1.take(10).foreach(println)
  • In human language, the val f1 = logrdd.filter(s => s.contains(“E0”)) would read, “copy every element of logrdd RDD that contains a string “E0” as new elements in a new RDD named f1”.
  • The second line simply print samples of the new f1 RDD (that only contains E0 in the line).
  • In details, the code first define a new RDD named f1.
  • Then it reads all elements of the logrdd RDD and applies the criterion of the filter to it.
  • The s in s => s.contains() represents each element of the logrdd RDD.
  • You can replace boths with any other letters or strings you wish. The most commonly seen in Spark world is line, i.e., .filter(line => line.contains(“E0”).
  • Algorithmically, I choose the string “E0” because the header line is the only line without such string. Try it yourself with other strings.

Let’s recap and run the code.

import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object filtering002
{
def main(args: Array[String])
{
println("\n\n>>>>> START OF PROGRAM <<<<<\n\n")
val conf = new SparkConf().setAppName("002filtering")
val sc = new SparkContext(conf)
val logfile = "./../input/E02016.csv"
val logrdd = sc.textFile(logfile)
logrdd.take(10).foreach(println) val f1 = logrdd.filter(s => s.contains("E0"))
f1.take(10).foreach(println)
println("\n\n>>>>> END OF PROGRAM <<<<<\n\n")
}
}
  • From the output, you will notice 2 sets of data printed out, one from logrdd and the other one from f1. Notice f1 does not contain the header line any more.

5. Using Not with .contains( )

In the previous topic we have covered the .filter(s => s..contains()) in which we choose to keep every lines containing “E0”. How about if we wish to keep lines that “do not contain something”?

To do this, we add the “NOT”, which is !, into the statement. It is a bit tricky where to put the ! sign, thus I think it is worth mentioning it here.

  • Below is a statement to keep lines that do not contain a string “FTHG”.
val f2 = logrdd.filter(s => !(s.contains("FTHG")))
  • Notice the position of the ! and next time you can use it correctly.
  • Try this line of code and you shall get the same result as in the previous topic.

6. Count the number of elements in an RDD

From the previous section, we discarded the header line and there were no header lined printed out from the f1.take(10).foreach(println)line. However, as I explained earlier that the take(10) just takes some samples of RDD elements, is it possible that perhaps the header line is still there but just had not been sampled? Let’s count the lines to help verifying this.

  • .count() method is Spark’s action. It counts the number of elements of an RDD.
  • It returns a Long integer. Therefore, we can simply print it out.
println(logrdd.count() + " " + f1.count())
  • Here I print the count of logrdd RDD first, add a space, then follow by the count of f1 RDD.
  • The entire code is shown again here (with just 1 line added from the previous one).
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
object filtering002
{
def main(args: Array[String])
{
println("\n\n>>>>> START OF PROGRAM <<<<<\n\n")
val conf = new SparkConf().setAppName("002filtering")
val sc = new SparkContext(conf)
val logfile = "./../input/E02016.csv"
val logrdd = sc.textFile(logfile)
logrdd.take(10).foreach(println) val f1 = logrdd.filter(s => s.contains("E0"))
f1.take(10).foreach(println)
println(logrdd.count() + " " + f1.count())println("\n\n>>>>> END OF PROGRAM <<<<<\n\n")
}
}
  • After you run it, you shall see the “381 380” line, just above the END OF PROGRAM string, as shown below.
Output of the .count() method

There you go, now you know that 1 line of data is discarded (presumably, the header line).

7. Print all elements of an RDD

For some, counting the number of lines of the f1 RDD may be not enough to prove that the header line has really been removed. How about printing out all elements of the f1 RDD?

  • Printing out all elements of an RDD is a bit tricky.
  • At this stage, you can look at an RDD as it is an Array, with one important exception — it is distributed. Spark is designed to be run on a large number of machines where data are divided and distributed among them. The data could even be divided into several partitions in one machine. For example, if you have 100 rows of data, perhaps the first 10 are given to the first machine, the next 10 are given to the other machine, and so on.
  • When printing out, spark use machines’ stdout. Differeent machien means different stdout.
  • In local mode, it is ok to print out using
  • f1.foreach(println) OR f1.take(f1.count().toInt).foreach(println)
  • take() expects an Int integer as a parameter but count() returns a Long integer. Thus, the .toInt method is needed here.
  • If you wish to print the exact element of the RDD, e.g, something like printing out the value at the index i of an Array, you have to convert the RDD to a local array using .collect() method. The array returned will be stored on one machine (the driver machine), meaning that it is not a distributed dataset any more. This .collect() could cause errors if the local machine has insufficient amount of memory to hold the entire set of data.
  • In cluster mode where the driver and the executor machines are different, outputs of the print command will be displayed on each machine’s stdout. Therefore, you will not see all outputs at the driver node. That is, if you try to print all elements of an RDD, you will not see all outputs from the driver machine. Using collect() fixes the problem if you have sufficient memory at the driver node.

8. Try it yourself

Let’s recap what you have learnt:

  • read from file to RDD using .textFile()
  • sample and print elements of RDD using .take()
  • filter data using .filter() and .contains()
  • using NOT with .contains()
  • count the number of RDD elements using .count()

Here are some interesting things you are able to do using methods you have learnt. Try it out yourself.

  • Print and count only the Liverpool matches (i.e., 20 matches).
  • Print and count only the Liverpool vs Chelsea matches (i.e., only 2 matches).
  • Print and count only matches that Liverpool won.
  • Print only matches played in November.
  • Count the number of matches that Liverpool lost before the new year (of that season)

That’s all for this tutorial. Peace.

--

--