Apache Spark With PySpark — A Step-By-Step Approach

Amine Bellamkaddem
The Startup
Published in
9 min readJul 14, 2020

In a recent machine learning (ML) project at Carnegie Mellon, I’ve had to process relatively large streams of movie data. One of the objectives of the project has been predicting movies’ popularity based on some inputs (budget, revenue, vote count, average …)

Working on ML-based projects means — in most cases — that you need to care about ingestion of a huge amount of data in near real-time but also as streaming and/or batch jobs. In this article we’ll follow a step-by-step approach using Apache Spark with PySpark and Python, to explore and clean the data needed for the movies’ popularity ML project.

The objective of the project is to use machine learning to predict the popularity of movies for a movie-streaming business, based on historical data including budget, movie revenue, user vote count, just to name a few.

In order to enable the learning process of our ML module, we first need to feed it quality data that it will be able to learn from. To achieve this, it’s necessary for us to first clean the raw data we get from a Kafka broker, which involves actions such as removing duplicates, splitting strings, extracting substrings etc.

Apache Spark is a large-scale data processing framework that can help us achieve these objectives by performing filtering and transformation operations on both unstructured and structured data. In the rest of this post, we dive into the architecture, workflow, and implementation details of our scenario.

Apache Spark Architecture

Apache Spark is an open source distributed framework for large-scale data processing. It was developed by the University of California, Berkeley’s AMPLab in 2009 and donated to the Apache Software Foundation in 2013.

From a high-level perspective, a Spark application is a driver program that executes the user’s main function as well as different parallel tasks on a cluster. The driver program handles scheduling, dispatching, and input/output operations through APIs, in Python as well as other programming languages such as Java and Scala.

Spark has 2 API interfaces: Resilient Distributed Datasets (RDD), which is part of the “core” layer in figure 1, and Spark SQL which is more powerful than RDD. We’ll use both APIs in this use-case.

apache spark architecture

This article focuses on RDD and Spark SQL APIs. The 3 other layers (Spark Streaming, MLlib, and GraphX) will be the subject of future articles. But just a quick glance:

  • Spark Streaming: Performs high-throughput, scalable, and fault-tolerant stream processing of live data streams using Spark Core scheduling capability.
  • Spark MLlib: is a distributed machine-learning library.
  • GraphX: is a distributed graph-processing library for graphs and graph-parallel computation.

The movie use-case

In this use-case, we’ll work with both RDD and Spark SQL APIs. Figures 2 depicts the different steps we need to follow until we produce the CSV file containing the data we need for the learning process.

apache spark step by step
Figure 2

As shown in Figure 2, data comes from a Kafka broker and saved into text files. Spark loads a text file and performs some data cleaning using RDD operations and then saves the result into another text file. A Python script retrieves more data about movies from an external API and saves them into a Json file. Spark loads the json file, performs some Spark SQL operations, and then saves the final and clean version into a CSV file. We’ll be focusing mainly on Spark operations.

RDD will be used in phase 1 of Figure 2, while Spark SQL will be the choice of phase 2.

Resilient Distributed Datasets (RDD)

An RDD is the basic abstraction in Spark, which consists of a collection of elements that can be operated on in parallel. It is a read-only collection of data distributed across the nodes of the cluster. RDDs have two fundamental types of operations:

  • Actions: return values, such as the count of lines in a file, the number of words in a string …
  • Transformations: returns pointers to new RDDs

The data coming from Kafka has the following format:

kafka with spark
Figure 3

For illustration purposes, we’ll be using a file containing 1,746,485 lines, and after the cleaning process we’ll have just 4,359 lines.

We are interested in the second part of each line. E.g. GET /data/m/roxanne+1987/1.mpg. This string contains the movie ID and the user ID. Here are some examples:

  • 2020–04–07T08:28:27,532383,GET /data/m/roxanne+1987/0.mpg (movie id = roxanne+1987, user id = 1987)
  • 2020–04–07T08:28:57,877811,GET /data/m/terminator+2+judgment+day+1991/0.mpg (movie id = terminator+2+judgment+day+1991, user id = 1991)

Note that we have lots of Movie-user ID duplicates that we need to remove. We also have malformed lines that don’t follow the pattern “GET /data/m/movieId-userId/….”

Our mission in phase 1 consists of parsing these lines, removing duplicates, and retrieving movie id and user id for each line (e.g. roxanne+1987)

Let’s get started and play with the data we have using the PySpark interactive shell.

spark rdd pyspark
Figure 4

First, we created a pointer to our text file using SparkContext’s “textFile” method. Then we executed 2 actions: “count” (number of lines in the file) and “first” (first line). Note that these actions didn’t update the movieFile RDD.

Now, let’s filter the lines containing the word “titanic”.

apache spark rdd operations
Figure 5

The “filter” action performs a transformation, meaning that it creates a new RDD containing the filtered items. We displayed the count as well as the top 3 lines of the new created RDD (sorted in descending order). Similarly, we can filter lines not having a given substring:

>>> movieOtherThanTitanic = movieFile.filter(lambda line: “titanic” not in line)

Now let’s retrieve movie and user IDs. Here are the steps:

  • Removing malformed lines (not containing the substring “/data/m/”)
  • Split the lines of the file and get the second part of each line (e.g. /data/m/roxanne+1987/0.mpg)
  • Split again to get just movie and user IDs (e.g. roxanne+1987)
  • Remove duplicates

Note that we can merge operations in a single command, but we go step by step for illustration purposes.

spark actions transformations
Figure 6

The ‘filter” transformation in Figure 6 removed malformed lines. Then we split lines and fetched the second part (index [1]). The “TakeSample” action shows a random sample of the RDD data. We split again to fetch movie and user Ids, and then removed duplicates using the “distinct” transformation. As you can see, our RDD has now 4,359 lines.

Now let’s save movieRdd3 into a text file:

apache spark rdd
Figure 7

coalesce” reduced the number of partitions to 1, meaning that we pulled all the data into a single executor. In this use-case, it is fine because we don’t have a lot of data (4,359 short lines). Otherwise, we should increase the parameter passed to “coalesce” or completely remove it, which will result in multiple files, 1 file per partition (part-00000, part-00001, part-00002 etc). “saveAsTextFile” saved movieRdd3 into the file “files/movie_ids/part-00000”. Finally, we loaded the generated file to check its content.

At this point, we completed Phase 1 (Figure 2), which results in a text file containing movie and user IDs, without duplicates nor other useless data (which we considered useless in this context)

Figure 8 is a Python program that does the same job we’ve done so far, and that can be executed as an application.

spark python
Figure 8

Getting Movie Data

This step consists of calling an external API to get movies (and users) data using the IDs we saved in phase 1. This was done using a Python script and is out of scope of this use case. So at this point, we already have a Json file containing all the movie data we need for our model’s learning process. Figure 9 is an example of this file’s content.

spark with python
Figure 9

Spark SQL

Spark SQL API provides more structured data processing capabilities than the RDD API. Spark SQL gives more information about the structure of the data being used.

Let’s begin by analyzing the Json file created in the previous section. As shown in Figure 10, we created a DataFrame containing the Json content using SparkSession’s method “read.json”. Then we printed a tree-like schema of our data using “printSchema()”. This is very useful to gain a better understanding of the data at hand. We can easily see the list of fields we have as well as their datatypes

spark sql in practice
Figure 10

Now that we have an idea of the structure of the data, let’s execute some SQL queries.

apache spark sql api
Figure 11

As you can see, we can easily “select” data by fields and then “show” the result. Here, we specified a limit of 10 items. Then, we showed the count of movies grouped by “status”.

Figure 12 shows an example where we applied a filter (popularity > 15)

spark pyspark python
Figure 12

Spark SQL also allows running SQL queries programmatically which gives more flexibility. First, we need to register the DataFrame as an SQL temporary view using “createOrReplaceTempView”, and then we can execute queries as shown in Figure 13

spark sql api
Figure 13

Data exploration using Spark SQL API is straightforward. For example, based on the output of Figure 11, we clearly see that the vast majority of movies are “Released”, so decisions such as removing the status field or not from our dataset could be taken during this process.

Now, let’s remove and rename some fields.

apache spark pyspark
Figure 14

We deleted some fields using the “drop” method and then renamed the column “homepage” to “website” using the method “withColumnRenamed”. “printSchema” prints the schema of the new DataFrame.

Note: we just tried different methods for illustration purposes, but more in-depth analysis of data is required as part of the feature engineering process, which is out of the scope of this post.

Once we are done with the necessary transformations, let’s save the DataFrame into a CSV file. Figure 14 shows the method used for saving a CSV file. The “header” attribute is by default set to false, so we need to explicitly set it to true if we need column headers. In this example, the CSV file is saved under the path “files/clean_movies/part-00000.csv”. Similarly, we can save a Json file using the method write.json(“file_name”).

pyspark spark in practice
Figure 15

Figure 16 is a Python program that does the same job we’ve done using Spark SQL API, and that can be executed as an application.

pyspark in practice
Figure 16

Apache Spark Limitations

As we’ve seen so far, Apache Spark is very useful when it comes to ingestion and exploration of a huge amount of data. It’s widely used by industries, especially in AI and machine learning fields. However, Spark has some limitations, and the main of which are:

  • No support for real-time data processing: data stream is partitioned into batches, transformed using map, reduce, filter etc, and then treated as micro-batch jobs. So when we have hard-deadline constraints, Apache Spark might not be the right tool.
  • No file management system: Spark doesn’t have its own file management system and therefore needs to be integrated with other tools such as Hadoop.
  • Spark is costly in terms of memory consumption: Spark requires lots of RAM to enable in-memory data stream processing.

That’s all for this part. We started with non-structured data containing only IDs, we performed some cleaning and parsing using the RDD API, then we applied more advanced Spark SQL operations to clean the data, and finally, we exported the final and clean version into a CSV, which is ready for the learning process. In the next part, we’ll look at Spark Streaming, MLlib, and GraphX.

--

--