Running PySpark with Cassandra using spark-cassandra-connector in Jupyter Notebook

Ashok Tankala
Coinmonks
8 min readSep 6, 2018

--

We are facing several out of memory issues when we are doing operations on big data which present in our DB Cassandra cluster. So we decided its better to use Spark to solve this problem.

It became a tough & interesting journey for us because of one decision we took as a team, That is we want to use maximum 2 languages for our project at backend which are Node.js & Python. According to us, these 2 are right tools to solve the problems for this project. Because all data collection & transformation issues will be handled easily by Node.js, All other stuff like Big Data operations, Artifical Intelligence/Machine Learning problems will be solved using Python. You may wonder why we put this restriction in the era of polyglot microservices architecture.

It’s fun to work in multiple languages, I used to be a Java Developer then moved to Node.js and then moved to Python according to project requirements and it makes sense also at the time of building but its very tough to maintain them or finding right people who are polyglot or sometimes it doesn’t make sense to recruit a developer who has expertise in some language for maintaining one module.

A lot of teams or companies are facing issues due to polyglot programming. Couple of them expressed their concerns also.

Once I started working on PySpark everything went smoothly until I thought of using Cassandra. Very less documentation or examples available due to that I used a couple of examples related to PySpark and a couple of examples related to Scala. Using these I started my journey.

If you didn’t installed PySpark & Jupyter you can refer to my previous article. Without wasting much time lets get our hands dirty.

We need some good data to work on it. So, I choose movie lens data for this. You can get the latest data at here. I choose ml-latest.zip instead of ml-latest-small.zip so that we can play with reasonably large data.

Let’s load this data first in our Cassandra DB. Open your cqlsh shell. Create a keyspace for this. I created the movie_lens keyspace and started using it using below commands.

Then I created movies, ratings tables using below commands.

Loaded data using below commands.

Then you will get outputs like these

Everything set. It’s time to do coding.

Start your Jupyter notebook using below command.

Create a new notebook.

First, we need to set some arguments or configurations to make sure PySpark connects to our Cassandra node cluster.

Here we are saying that use spark-cassandra-connector to connect to our Cassandra cluster and its hostname is 127.0.0.1. That’s it. For example, you have multiple nodes in your Cassandra cluster then in the host configuration, we need to give all of their ips. For example,

First time when you run this it will take a while because it needs to download the jar and connect to our Cassandra cluster. You will get output like this

Then we need to create the Spark Context.

After this, we need to create SQL Context to do SQL operations on our data.

We are going to work on multiple tables so need their data frames to save some lines of code created a function which loads data frame for a table including key space given

Let’s load the movies, ratings data frames.

Now, Let’s understand the data. First, let’s see how movies data looks like.

Then you will get data like this

Then let’s see how ratings data looks like.

Then you will get data like this

Now, want to see top 20 reviewers in terms of number of ratings given

Got data like this

At the time of the above query running and data crunching, you will see in command prompt from where you started Jupyter Notebook.

It means its streaming data from DB(Cassandra Cluster) and doing operations on top of it.

Here I want to select 2 users and want to see how similar many movies they rated and how many unique movies either one of them reviewed.

To do all of this let’s see what schema looks like of ratings data frame.

Now let’s select the movies each user rated.

At this time spark doesn’t query any data. Spark works in a lazy manner until unless you want to do something related to data then only it will get the data. For every operation it’s going to get the data to avoid this we can cache it. Whenever first time it gets the data it just caches it and uses it from cache next time instead of getting again from DB.

Let’s cache the data frames.

Now, everything set we need to get some movies both users reviewed

It will give data like this

Want to verify or just to make sure wether did it correctly or not. So, choose a movie and check it. In this case, I choose a movie with id 3918 and checked.

To check whether the first user reviewed or not

Output:

To check whether the second user reviewed or not

Output:

To know number of movies both users reviewed

Output:

To know in total How many unique movies they reviewed

Output:

Once you played around don’t want that data to be retained in the cache then

If you want to shut down the PySpark context then

You will find this Jupyter Notebook at my GitHub Repository.

Peace. Happy Coding.
See my original article here.

--

--

Ashok Tankala
Coinmonks

I help aspiring & emerging leaders gain clarity & reach their potential so they can build a fulfilling life both personally and professionally - http://tanka.la