Distributed Computing with Spark

Getting Started with PySpark

Connect to Spark Cluster using Python

Shashvat G
Analytics Vidhya

--

Photo by Warren Wong on Unsplash

In the last decade, there has been unprecedented demand for fast and reliable tools to handle and process the streaming of big data. One of the ways to tackle this is MapReduce- however, that does not only take a lot of time but is also unable to handle real-time data. Spark comes to the rescue! Spark was introduced by UC Berkeley Lab and open-sourced in 2010. It offers a solution that is both fast and general-purpose. One major advantage of Spark is that it runs computations in memory. Another one is the Lazy Computation mechanism which simply means no action until asked explicitly. This helps in quicker data processing and hence reducing time significantly.

Spark’s magic lies in its RDDs — Resilient Distributed Datasets. If you are familiar with Pandas Data frame, this will probably be easier for you. A level of abstraction of RDDs is Spark DataFrame which is simpler to work with and automatically optimized.

First things first, What is Spark?

Spark is a cluster computing framework that divides a task among a cluster of computers called nodes for fast and efficient processing. This splitting of data makes it easier to work with huge datasets. Each of the nodes works on computation assigned in parallel which is combined later to return the final result.

What’s PySpark then?

I’m glad you asked — It’s an interface for Apache Spark in Python which allows you to write Spark applications using Python APIs. In simple terms, PySpark is a way to connect to Spark cluster using Python.

Why do you need Spark Data Frame if you have Pandas Data Frame?

Good Question — If you have to think if you need Spark, probably you don’t need Spark at all. Honestly, unless you are dealing with really large data sets, Pandas work just fine. The primary difference between these is that pandas DataFrame’s are not distributed and runs on a single node. One of the advantages of the Spark DataFrame interface is that you can run SQL queries on the tables in your Spark cluster.

Pandas Dataframe Columns Vs Spark Data Frame Columns

Notably, updating a Spark DataFrame is somewhat different than working in pandas because the Spark DataFrame is immutable. This means that it can't be changed, and so columns can't be updated in place.

Installation

You can install Spark here. It’s worth mentioning that spark is written in Scala and interacts with the JVM interface using py4j. Hence, to run PySpark, you also need Java to be installed along with Python, and Apache Spark.

To get yourself familiar with the spark ecosystem, I would recommend running spark locally before configuring it to run on a cluster as it can be run standalone.

Now that we have seen how to get Spark, we can now connect to a cluster and go through some basics. Without further ado, Let’s crack on —

  • Connecting to a Spark Cluster using PySpark — First of all, we need a connection to the cluster. This is done using creating an instance of the SparkContext class. To verify which version of Spark you’re running. Try this: print(SparkContext.version)
  • Working with Spark DataFrames — Once we have a connection to the cluster, we can use that to createSparkSession object from your SparkContext. You can think of SparkContext as your connection to the cluster and SparkSession as your interface with that connection. To avoid multiple connections and sessions it is better to use SparkSession.builder.getOrCreate()
#Import SparkSession
from pyspark.sql import SparkSession
#Create Session
spark = SparkSession.builder.getOrCreate()
  • You can use spark.catalog.listTables() to see all tables in your cluster. To list table contents, show() can be used. Ex- Table1.show()will display 20 rows and all the columns from the table1.
  • The .createDataFrame() method takes a pandas DataFrame and returns a Spark DataFrame. Please note that the created data frames will not persist in the Session catalog as they are stored locally. This means that you can use all the Spark DataFrame methods on it, but you can't access the data in other contexts.
  • .select() method— Used to select columns from Spark Dataframe. Also, when you’re selecting a column using the df.colName notation, you can perform any column operation and it will return the transformed column. For instance — see below.
# Weekly Salary when you have monthly salaryWeek_Salary = (df.select(df.salary/4)).alias(“weekly salary”)
  • .filter() method — As you would have guessed, this is the Spark equivalent of SQL’s WHERE clause. The takes either an expression that would follow the WHERE clause of a SQL expression as a string or a Spark column of boolean (True/False) values.
#SQL
SELECT * FROM table WHERE column > 120
#PySpark
table.filter("column > value").show()
  • .createTempView() Spark DataFrame method can help you build a temporary table and accepts the name as an argument of the temporary table you'd like to register. This method registers the DataFrame as a table in the catalog, but as this table is temporary, it can only be accessed from the specific SparkSession used to create the Spark DataFrame.
  • spark.read.csv(file_path) can be used to read a file in PySpark. If your file has headers which it often will, you can pass an argument that tells spark to expect the first line to be the header.spark.read.csv(file_path,header = True)
  • .withColumn()method which takes two arguments and can be used to create a new column in the data set. First, a string with the name of your new column, and second the new column itself. Ex — Increase Price by 100. df.withColumn("price",col("price")+100).show()
  • Sometimes it makes sense to then take that table and work with it locally using a tool like pandas. Spark DataFrames make that easy with the .toPandas()
  • Another common database task is aggregation. That is, reducing your data by breaking it into parts and summarizing each part. It is also possible to GROUP BY (.groupBy()) more than one column. If you have two columns in a group by, their unique combination will be there in the output.

That’s it for now folks!

Takeaways

In this post, we walked through some of the frequently used basic queries to make an acquaintance with PySpark. Essentially, PySpark is a way to get Python to talk with Spark Cluster. If you have a bit of background in SQL and Python, you can jump on to PySpark ship 🚢 pretty quickly.

Spark has been widely adopted and is used extensively in Data Science and Machine Learning community as a large-scale analytical processing engine. The stack includes support for real-time as well as batch data processing with Spark Streaming, Spark Machine learning libraries(MLLib), and Spark SQL. Read more about them here!

Depending on your preference and problem, it might be worth having a look at Dask. Any questions, please give me a shout!

I’m glad you came this far! Thanks for your time. Whatever you are up to, Have a good one 😄

--

--

Shashvat G
Analytics Vidhya

Data Scientist | Analyst who aspires to continuously learn and grow in Data Science Space. Find him on LinkedIn https://www.linkedin.com/in/shashvat-gupta/