PySpark for Data Noobs Part 2 — Getting Shakespearean
The code for this article is available on GitHub.
Introduction
In Part 1 of this series, we got a PySpark Docker container up and running. In this article, we’ll do a hello world type of data analysis. A popular example to start out with seems to be getting word counts from a tome of text. From my high school days, I can remember Shakespeare containing a lot of words. So let’s see what the most frequently used words are in his works. This exercise is quite easy thanks to PySpark’s SQL functions library. If you’re familiar with SQL then you’ll feel right at home. You basically create a SQL query using Python constructs. A DataFrame is a PySpark analog to a SQL table or a DataSet for those familiar with .NET.
Fun fact: a DataFrame is an abastraction over a Resilient Distributed Dataset (RDD). An RDD is a how PySpark is able to scale out operations on huge datasets by distributing the data across multiple compute nodes.
Let’s get started! Fire up your Jupyter Docker container and create a new notebook. Refer to this article if you need help with this.
Here are steps we are going to perform programmatically:
- Initialize a Spark session.
- Import a text file containing the complete works of William Shakespeare.
- Put the text into a DataFrame.
- Clean up the text, i.e. lower-case all words, remove punctuation marks, and remove whitespace.
- Run a query that gets a count of each word.
- Show the top 20 words.
Initialize Spark Session
Run the following code to initialize our Spark session. The convention is to use spark
as the name of the Spark session.
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
Import Text File
Importing text from the web is easy using the urllib.request
module. The code below downloads the text file and saves it to Spark's file system as shakespeare.txt.
import urllib.request
urllib.request.urlretrieve(f"http://www.gutenberg.org/files/100/100-0.txt", "shakespeare.txt")
After importing the file, you can run the
ls
command to verify the file exists in Spark's file system.
Import the Text Into a DataFrame
In order to work with data in PySpark, the data needs to be in a DataFrame. We will use a method on the spark session to do this.
textDf = spark.read.text("./shakespeare.txt")
Fun fact: When operating on DataFrames you may be curious as to what the DataFrame schema looks like. This is easily achieved by running the DataFrame’s
printSchema()
method. At this point each row in the DataFrame contains a single string:
root |-- value: string (nullable = true)
You can run the DataFrame’s
show()
method to visually see the rows in the DataFrame.
Clean Up Text
Now the hard work begins. We need to get a count of words in the data frame. My strategy is to do the following:
- Convert all words to lower case.
- Split all words on the space character so that I get the individual words.
- Remove empty words.
- Remove punctuation marks (if any) at the end of the word.
This is where we can use the power of thepyspark.sql
module. Import it first. The convention is to alias the module as F
.
from pyspark.sql import functions as F
Fun fact: Run
help(F)
to verify the module has been loaded. As an added bonus you will get the module's documentation.
Convert All Words to Lower Case
The code below converts all words to lower case. The result is saved to a new DataFrame named textLowerDf
. It isn't necessary to do this and it is definitely not memory efficient but when prototyping I find it easier to work with intermediate DataFrames.
textLowerDf = textDf.select(F.lower(F.col("value")).alias("words_lower"))
The above statement demonstrates the pattern we are going to use to the other cleanup tasks. You first select()
a column in the DataFrame. Then you apply a transformation (lower()
) to the column. Column values are obtained using the col()
function passing in the name of the column.
How do you get the name of the column in the DataFrame? By running textDf.printSchema().
This DataFrame contains one column named value
.
Finally, the resulting column is aliased with a known name. If this is omitted PySpark will come up with a default name that isn’t as friendly.
Run textLowerDf.show()
. Observe the text has been lowercased and the column's name is words_lower
.
Fun fact: If you’re used to running traditional database queries you might be wondering why the
show()
method needs to be executed in order to show results. While this may look like a pain it is in fact a virtue of PySpark. In PySpark, query exeuction is deferred. That means nothing will be run unless you actually want to see the data. In a production PySpark application you would chain multiple transformation statements together. Since execution is deferred PySpark can optimize the order of operations in your query. This is why you need to runshow()
to see results.For prototyping purposes you can disable deferred execution to avoid having to run
show()
by specifying a custom configuration when creating the Spark session:
from pyspark.sql import SparkSession spark = SparkSession.builder.config("spark.sql.repl.eagerEval.enabled", "True").getOrCreate()
Get Individual Words
Now we want each row in the DataFrame to contain an array of strings, one element for each word. This is done using the split()
transformation.
textSplitDf = textLowerDf.select(F.split(F.col("words_lower"), " ").alias("words_split"))
Can you follow what the above code does? It splits the words in each row on the space character. The DataFrame’s column will be named words_split
.
Again run textSplitDf.show()
and observe each row is now a string array. Run textSplitDf.printSchema()
and observe the same:
root
|-- words_split: array (nullable = true)
| |-- element: string (containsNull = true)
Grouping the words in order to get their counts would be easier if each row contained one word. Fortunately, there is a transformation for this! It is called explode()
. Let's create a new DataFrame with a word
column.
textExplodedDf = textSplitDf.select(F.explode(F.col("words_split")).alias("word"))
Run textSplitDf.printSchema()
and observe the DataFrame contains a single string column instead of an array of strings.
root
|-- word: string (nullable = true)
Run textSplitDf.show()
and observe each row contains one word. Also, observe that some words have trailing punctuation marks and some rows contain whitespace. Let's get rid of the empty rows first.
Remove Empty Words
Removing data can be done using a filter transformation. Let’s use the where()
transformation.
textExplodedDf = textExplodedDf.where(F.ltrim(F.col("word")) != "")
Run textSplitDf.show()
and observe the rows containing whitespace only have been removed.
Now let’s get rid of the trailing punctuation marks.
Remove Trailing Punctuation Marks
An easy but imperfect way to remove trailing punctuation marks is via regular expressions. Fortunately, PySpark SQL contains a transformation that will allow us to extract text that matches a regular expression.
textExplodedDf = textExplodedDf.select(F.regexp_extract(F.col("word"), "[a-z]+", 0).alias("word"))
The above code should be self-explanatory. The lower case letters are extracted from the word and the original value replaced.
Get a Count of Each Word
SQL aficionados would get a count of each word by executing a GROUP BY
followed by a COUNT
and an ORDER BY
. This is also what you would do in PySpark!
textWordCounts = textExplodedDf.groupBy("word").count().orderBy(F.col("count").desc())
Run textWordCounts.show()
and observe the top 20 words by frequency.
Refinement
The top 20 list contains several stop words which are just noise. It would be nice to filter these out based on a list of stop words. Ideally, we would tell PySpark to do a left join on the stop words DataFrame which will filter out the stop words from the textWordCounts
DataFrame. This is easy to do.
Before proceeding download stopwords.txt
from the GitHub repository and import it into PySpark.
Import the Stop Words into a DataFrame
This time let’s import the file and do the cleanup in a single code block.
stopWordsDf = (spark.read.text("./stopwords.txt")
.select(F.lower(F.col("value")).alias("words_lower"))
.select(F.rtrim(F.col("words_lower")).alias("word")))
Fun fact: Notice the parentheses surrounding the code block? This is a neat Python trick to split code on multiple lines. This makes the Python code more readable.
Run stopWordsDf.show()
and verify the stop words have been imported. The column name is word
. The DataFrame stopWordsDf
will be joined to, textWordCounts
, has a column with the same name. This is not by accident. In order to do joins there needs to be a column with the same name in both DataFrames.
Join the DataFrames
A left join on the DataFrames removes the stop words.
textWordCounts = textWordCounts.join(stopWordsDf, "word", "leftanti")
Query for a Stop Word
Run a SELECT/WHERE
query and verify the word the
is no longer in the DataFrame.
textWordCounts.where(F.col("word") == "the").count()
The result should be zero.
Fun fact:
count()
is one of those actions that executes the query so you don't need to runshow()
to see the result.
Wine, Ale, or Beer?
Which of these drinks appear most often?
(textWordCounts
.where((F.col("word") == "ale") |
(F.col("word") == "wine") |
(F.col("word") == "beer")).show())
Operationalize
We’ve run quite a bit of code. Your notebook is pretty long. If you want to operationalize the notebook, i.e. run it on a real Spark cluster you should optimize the code. Basically the analysis we did can be written as a single Python program. The data frame names have been snake-cased to conform to Python convention.
import urllib.requestfrom pyspark.sql import SparkSession
from pyspark.sql import functions as Fspark = SparkSession.builder.getOrCreate()file_name = "shakespeare.txt"urllib.request.urlretrieve(f"http://www.gutenberg.org/files/100/100-0.txt", file_name) stop_words_df = (spark.read.text("./stopwords.txt")
.select(F.lower(F.col("value")).alias("words_lower"))
.select(F.rtrim(F.col("words_lower")).alias("word")))word_counts_df = (spark.read.text(f"./{file_name}")
.select(F.lower(F.col("value")).alias("words_lower"))
.select(F.split(F.col("words_lower"), " ").alias("words_split"))
.select(F.explode(F.col("words_split")).alias("word"))
.select(F.regexp_extract(F.col("word"), "[a-z]+", 0).alias("word"))
.where(F.ltrim(F.col("word")) != "")
.join(stop_words_df, "word", "leftanti")
.groupBy("word")
.count()
.orderBy(F.col("count").desc()))