Analytics Vidhya
Published in

Analytics Vidhya

SparkSQL and DataFrame (High Level API) Basics using Pyspark

In the previous article, we looked at Spark RDDs which is the fundamental part (unstructured)of Spark core. In this article we will look at structured part of Spark core; SparkSQL and DataFrames. SparkSQL is the module in Spark for processing structured data also using DataFrames.

DataFrames

DataFrame is a structured data collection formed of rows which is distributed across worker nodes (executer) of Spark. Fundamentally DataFrames are like tables in a relational database with their own schemas and headers.

DataFrames consist of data rows created from different data formats like files (text,csv,json..) or Spark own RDDs.

In this article, i will explain you important querying operations on Spark DataFrames with using PySpark API. So let’s start by creating a DataFrame.

Creating DataFrames

DataFrames can be created from either txt, csv, json and parquet file formats or from existing RDDs. Let’s start with creating DataFrame from RDD;

from pyspark.sql.types import Rowlist_rdd=sc.parallelize([('Messi',33),('Ronaldo',35),('Salah',28),('Neymar',28),('Mbappe',21)])

players = list_rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
players_df = spark.createDataFrame(players)players_df.show()+-------+---+
| name|age|
+-------+---+
| Messi| 33|
|Ronaldo| 35|
| Salah| 28|
| Neymar| 28|
| Mbappe| 21|
+-------+---+

Now let’s create DataFrame from different types of file formats;

dataframe_json = spark.read.json('data\json_data.json') dataframe_txt = spark.read.text('data\text_data.txt') dataframe_csv = spark.read.csv('data\csv_data.csv') dataframe_parquet = spark.read.load('data\parquet_data.parquet')

Now that we’ve already learned how to create a DataFrame, we can dive into some important DataFrame methods by using example datasets.

Firstly we will start with methods used for inspecting the data in DataFrame.

df= spark.read\
.option('header','true')\
.option('inferschema','true')\
.csv('data\customers.csv')

show()

The show() function displays the content of DataFrame including first 20 rows. Also show(n) method returns first n number of rows of DataFrame.

df.show(5)+---+---------+---+------------+----------+------+
| id|firstname|age| city|profession|salary|
+---+---------+---+------------+----------+------+
|100| Brianna| 32|San Fernando| doctor| 1540|
|101| Mariele| 18| Cairo| developer| 5800|
|102| Marnia| 30| Baku| developer| 7136|
|103| Riannon| 56|Philadelphia| doctor| 4986|
|104| Paule| 25| Taipei| engineer| 6553|
+---+---------+---+------------+----------+------+
only showing top 5 rows

printSchema()

The printSchema() function prints the schema of DataFrame in a tree format.

df.printSchema()root
|-- id: integer (nullable = true)
|-- firstname: string (nullable = true)
|-- age: integer (nullable = true)
|-- city: string (nullable = true)
|-- profession: string (nullable = true)
|-- salary: integer (nullable = true)

count()

The count() function counts the number of rows in DataFrame.

df.count()100

columns()

The columns() function returns the columns of DataFrame.

df.columns()['id', 'firstname', 'age', 'city', 'profession', 'salary']

describe()

The describe(n) function computes the summary statistics of selected (n) columns in DataFrame.

df.describe("age","salary").show()+-------+------------------+------------------+
|summary| age| salary|
+-------+------------------+------------------+
| count| 100| 100|
| mean| 41.87| 4788.18|
| stddev|14.444358585603409|2042.7282577351048|
| min| 17| 1051|
| max| 65| 7985|
+-------+------------------+------------------+

So far we’ve learned bunch of functions used for inspecting the data. Now we can proceed to operations for queries.

select()

The select() function shows only selected column or columns of DataFrame.

df.select("firstname","age").show(5)+---------+---+
|firstname|age|
+---------+---+
| Brianna| 32|
| Mariele| 18|
| Marnia| 30|
| Riannon| 56|
| Paule| 25|
+---------+---+

when()

The when() function shows selected column and designated cases depending on condition on a specified column that we ask for.

df.select("firstName",when(df.age > 30, 1).otherwise(0)).show(5)+---------+--------------------------------------+
|firstName|CASE WHEN (age > 30) THEN 1 ELSE 0 END|
+---------+--------------------------------------+
| Brianna| 1|
| Mariele| 0|
| Marnia| 0|
| Riannon| 1|
| Paule| 0|
+---------+--------------------------------------+

like()

The like() function shows selected column and filter out the condition we are looking for by assigning “True” or “False”.

df.select("firstName",df.city.like("Baku")).show(5)+---------+--------------+
|firstName|city LIKE Baku|
+---------+--------------+
| Brianna| false|
| Mariele| false|
| Marnia| true|
| Riannon| false|
| Paule| false|
+---------+--------------+

Startswith()

The startswish() function searchs the letters in the brackets starting from the beginning of the first word in specified column.

df.select("firstName",df.profession.startswith("dev")).show(5)+---------+---------------------------+
|firstName|startswith(profession, dev)|
+---------+---------------------------+
| Brianna| false|
| Mariele| true|
| Marnia| true|
| Riannon| false|
| Paule| false|
+---------+---------------------------+

Endswith()

The endswith() function searchs the letters in the brackets starting from the end of the last word in specified column.

df.select("firstName",df.profession.endswith("er")).show(5)+---------+------------------------+
|firstName|endswith(profession, er)|
+---------+------------------------+
| Brianna| false|
| Mariele| true|
| Marnia| true|
| Riannon| false|
| Paule| true|
+---------+------------------------+

Substring()

The substring() function extracts the substring from the column with provided length of the string you wanted to extract.

df.select(df["firstName"].substr(1, 3).alias("name")).show(5)+----+
|name|
+----+
| Bri|
| Mar|
| Mar|
| Ria|
| Pau|
+----+

between()

The between() function filters column between provided conditions.

df.select("firstName",df.age.between(22, 28)).show(5)+---------+-----------------------------+
|firstName|((age >= 22) AND (age <= 28))|
+---------+-----------------------------+
| Brianna| false|
| Mariele| false|
| Marnia| false|
| Riannon| false|
| Paule| true|
+---------+-----------------------------+

There are also adding, updating and removing columns functions available in the Spark high-level API. Let’s take a look at some of these functions below.

withColumn()

The withcolumn() function adds or replaces columns in an existing DataFrame. Also it can be used for converting the datatype of an existing column of a DataFrame.

df.withColumn("yearly_salary",df.salary*12).show(5)+---+---------+---+------------+----------+------+-------------+
| id|firstname|age| city|profession|salary|yearly_salary|
+---+---------+---+------------+----------+------+-------------+
|100| Brianna| 32|San Fernando| doctor| 1540| 18480|
|101| Mariele| 18| Cairo| developer| 5800| 69600|
|102| Marnia| 30| Baku| developer| 7136| 85632|
|103| Riannon| 56|Philadelphia| doctor| 4986| 59832|
|104| Paule| 25| Taipei| engineer| 6553| 78636|
+---+---------+---+------------+----------+------+-------------+

df.withColumn("yearly_salary",col("yearly_salary").cast("float"))\
.show(5)
+---+---------+---+------------+----------+------+-------------+
| id|firstname|age| city|profession|salary|yearly_salary|
+---+---------+---+------------+----------+------+-------------+
|100| Brianna| 32|San Fernando| doctor| 1540| 18480.0|
|101| Mariele| 18| Cairo| developer| 5800| 69600.0|
|102| Marnia| 30| Baku| developer| 7136| 85632.0|
|103| Riannon| 56|Philadelphia| doctor| 4986| 59832.0|
|104| Paule| 25| Taipei| engineer| 6553| 78636.0|
+---+---------+---+------------+----------+------+-------------+

withColumnRenamed()

The withColumnRenamed() function ,as the name suggests, renames an existing column of a DataFrame.

df.withColumnRenamed("firstname","name").show(5)+---+-------+---+------------+----------+------+-------------+
| id| name|age| city|profession|salary|yearly_salary|
+---+-------+---+------------+----------+------+-------------+
|100|Brianna| 32|San Fernando| doctor| 1540| 18480|
|101|Mariele| 18| Cairo| developer| 5800| 69600|
|102| Marnia| 30| Baku| developer| 7136| 85632|
|103|Riannon| 56|Philadelphia| doctor| 4986| 59832|
|104| Paule| 25| Taipei| engineer| 6553| 78636|
+---+-------+---+------------+----------+------+-------------+

toDF()

The toDF() function returns a new DataFrame with renaming all columns of it.

people.toDF("id_no", "name", "years_old","address","job","wage","yearly_wage").show(5)+-----+-------+---------+------------+---------+----+-----------+
|id_no| name|years_old| address| job|wage|yearly_wage|
+-----+-------+---------+------------+---------+----+-----------+
| 100|Brianna| 32|San Fernando| doctor|1540| 18480|
| 101|Mariele| 18| Cairo|developer|5800| 69600|
| 102| Marnia| 30| Baku|developer|7136| 85632|
| 103|Riannon| 56|Philadelphia| doctor|4986| 59832|
| 104| Paule| 25| Taipei| engineer|6553| 78636|
+-----+-------+---------+------------+---------+----+-----------+

drop()

The drop() function drops an existing column from DataFrame.

df.drop("yearly_salary").show(5)+---+---------+---+------------+----------+------+
| id|firstname|age| city|profession|salary|
+---+---------+---+------------+----------+------+
|100| Brianna| 32|San Fernando| doctor| 1540|
|101| Mariele| 18| Cairo| developer| 5800|
|102| Marnia| 30| Baku| developer| 7136|
|103| Riannon| 56|Philadelphia| doctor| 4986|
|104| Paule| 25| Taipei| engineer| 6553|
+---+---------+---+------------+----------+------+

Finally we can proceed some other functions which are very useful for data analysis. Here we will touch on some populer ones that we generally use in pyspark.

groupBy()

The groupBy() function splits the data into groups based on same values.

df.groupBy("age").count().show(5)+---+-----+
|age|count|
+---+-----+
| 31| 1|
| 65| 2|
| 34| 1|
| 28| 1|
| 27| 1|
+---+-----+

filter()

The filter() function filters the data by asked condition.

df.filter((df.age>30 )&(df.firstname.startswith("B"))).show(5))+---+----------+---+------------+----------+------+-------------+
| id| firstname|age| city|profession|salary|yearly_salary|
+---+----------+---+------------+----------+------+-------------+
|100| Brianna| 32|San Fernando| doctor| 1540| 18480|
|173|Bernardine| 54| Chennai| engineer| 5399| 64788|
|198| Babita| 41| Chişinău| pilot| 7158| 85896|
+---+----------+---+------------+----------+------+-------------+

agg()

The agg() function computes aggregates and returns the result as a columns of a DataFrame with using functions like sum,avg,max,min,count.

df.agg(sum("salary"),avg("salary"),max("salary"),min("salary"),count("salary")).show(5)+-----------+-----------+-----------+-----------+-------------+
|sum(salary)|avg(salary)|max(salary)|min(salary)|count(salary)|
+-----------+-----------+-----------+-----------+-------------+
| 478818| 4788.18| 7985| 1051| 100|
+-----------+-----------+-----------+-----------+-------------+

join()

The join() function joins a DataFrame with another DataFrame by matching specified common column with using the chosen join expression.

df.show()+---+---------+---+------------+----------+------+
| id|firstname|age| city|profession|salary|
+---+---------+---+------------+----------+------+
|100| Brianna| 32|San Fernando| doctor| 1540|
|101| Mariele| 18| Cairo| developer| 5800|
|102| Marnia| 30| Baku| developer| 7136|
|103| Riannon| 56|Philadelphia| doctor| 4986|
|104| Paule| 25| Taipei| engineer| 6553|
+---+---------+---+------------+----------+------+
df2.show()+---+------+----------+--------------+
| id|gender| country|marital_status|
+---+------+----------+--------------+
|100|female| Spain| single|
|101|female| Eygpt| single|
|102|female|Azerbaijan| married|
|103|female| USA| divorced|
|104|female| Taiwan| married|
+---+------+----------+--------------+
df_join=df.join(df2,"id","inner").show()+---+---------+---+------------+----------+------+------+----------+--------------+
| id|firstname|age| city|profession|salary|gender| country|marital_status|
+---+---------+---+------------+----------+------+------+----------+--------------+
|100| Brianna| 32|San Fernando| doctor| 1540|female| Spain| single|
|101| Mariele| 18| Cairo| developer| 5800|female| Eygpt| single|
|102| Marnia| 30| Baku| developer| 7136|female|Azerbaijan| married|
|103| Riannon| 56|Philadelphia| doctor| 4986|female| USA| divorced|
|104| Paule| 25| Taipei| engineer| 6553|female| Taiwan| married|
+---+---------+---+------------+----------+------+------+----------+--------------+

union()

The union() function returns a new DataFrame bonding union of rows in current and another DataFrame.

df2.show()+---+---------+---+--------------+--------------+------+
| id|firstname|age| city| profession|salary|
+---+---------+---+--------------+--------------+------+
|105| Averyl| 35|Guatemala City| engineer| 5767|
|106| Mady| 23| Freetown|police officer| 6087|
|107| Ardeen| 36| Suez| musician| 2879|
|108| Audrie| 17| Puebla| musician| 7792|
|109| Mignon| 34| Saint-Pierre| developer| 2922|
+---+---------+---+--------------+--------------+------+
df.union(df2).show()+---+---------+---+--------------+--------------+------+
| id|firstname|age| city| profession|salary|
+---+---------+---+--------------+--------------+------+
|100| Brianna| 32| San Fernando| doctor| 1540|
|101| Mariele| 18| Cairo| developer| 5800|
|102| Marnia| 30| Baku| developer| 7136|
|103| Riannon| 56| Philadelphia| doctor| 4986|
|104| Paule| 25| Taipei| engineer| 6553|
|105| Averyl| 35|Guatemala City| engineer| 5767|
|106| Mady| 23| Freetown|police officer| 6087|
|107| Ardeen| 36| Suez| musician| 2879|
|108| Audrie| 17| Puebla| musician| 7792|
|109| Mignon| 34| Saint-Pierre| developer| 2922|
+---+---------+---+--------------+--------------+------+

alias()

The alias() function renames the new DataFrame column while displaying DataFrame.

df.agg(sum("salary").alias("sum"),avg("salary").alias("average")).show(5)+------+-------+
| sum|average|
+------+-------+
|478818|4788.18|
+------+-------+

sort() & orderBy()

The sort() and orderBy() functions sort the data of a column either alphabetic or numeric in ascending or descending order.

df.sort("age").show(5)+---+---------+---+---------+--------------+------+-------------+
| id|firstname|age| city| profession|salary|yearly_salary|
+---+---------+---+---------+--------------+------+-------------+
|108| Audrie| 17| Puebla| musician| 7792| 93504|
|101| Mariele| 18| Cairo| developer| 5800| 69600|
|129| Kerrin| 18|Cartagena| pilot| 1093| 13116|
|123| Joceline| 18| Konya| worker| 4468| 53616|
|134| Estell| 18|Milwaukee|police officer| 5102| 61224|
+---+---------+---+---------+--------------+------+-------------+
df.orderBy(desc("age")).show(5)+---+---------+---+--------------------+-----------+------+-------------+
| id|firstname|age| city| profession|salary|yearly_salary|
+---+---------+---+--------------------+-----------+------+-------------+
|138| Tracey| 65| Port Vila| teacher| 3978| 47736|
|141| Tomasina| 65|Santa Cruz de Ten...|firefighter| 2333| 27996|
|127| Sadie| 64| Helsinki| musician| 1051| 12612|
|151| Philis| 64| Timbuktu|firefighter| 3194| 38328|
|110|Konstance| 62| Fukuoka| developer| 1507| 18084|
+---+---------+---+--------------------+-----------+------+-------------+

replace()

The replace() function replaces matching values with specified new values.

df.replace("Cairo","Egypt","city")\
.replace(["Brianna", "Paule"], ["Maria", "Pepper"], "firstname")\
.show(5)
+---+---------+---+------------+----------+------+-------------+
| id|firstname|age| city|profession|salary|yearly_salary|
+---+---------+---+------------+----------+------+-------------+
|100| Maria| 32|San Fernando| doctor| 1540| 18480|
|101| Mariele| 18| Egypt| developer| 5800| 69600|
|102| Marnia| 30| Baku| developer| 7136| 85632|
|103| Riannon| 56|Philadelphia| doctor| 4986| 59832|
|104| Pepper| 25| Taipei| engineer| 6553| 78636|
+---+---------+---+------------+----------+------+-------------+

fillna()

The fillna() function replaces null values with one that we desiganted.

df_null.show(6)+---+---------+----+--------------+----------+------+
| id|firstname| age| city|profession|salary|
+---+---------+----+--------------+----------+------+
|100| Brianna|null| San Fernando| doctor| 1540|
|101| Mariele| 18| Cairo| developer| 5800|
|102| Marnia| 30| Baku| developer| 7136|
|103| Riannon| 56| Philadelphia| doctor| null|
|104| Paule| 25| Taipei| engineer| 6553|
|105| Averyl| 35|Guatemala City| engineer| 5767|
+---+---------+----+--------------+----------+------+
df_null.fillna({"age":20,"salary":5000}).show(5)+---+---------+---+------------+----------+------+
| id|firstname|age| city|profession|salary|
+---+---------+---+------------+----------+------+
|100| Brianna| 20|San Fernando| doctor| 1540|
|101| Mariele| 18| Cairo| developer| 5800|
|102| Marnia| 30| Baku| developer| 7136|
|103| Riannon| 56|Philadelphia| doctor| 5000|
|104| Paule| 25| Taipei| engineer| 6553|
+---+---------+---+------------+----------+------+

dropna()

The dropna() function drops rows with null values.

df_null.dropna().show(5)+---+---------+---+--------------+--------------+------+
| id|firstname|age| city| profession|salary|
+---+---------+---+--------------+--------------+------+
|101| Mariele| 18| Cairo| developer| 5800|
|102| Marnia| 30| Baku| developer| 7136|
|104| Paule| 25| Taipei| engineer| 6553|
|105| Averyl| 35|Guatemala City| engineer| 5767|
|106| Mady| 23| Freetown|police officer| 6087|
+---+---------+---+--------------+--------------+------+

toJSON()

The toJSON() function converts a DataFrame rows into a JSON documents.

df.toJSON().take(5)['{"id":100,"firstname":"Brianna","age":32,"city":"San Fernando","profession":"doctor","salary":1540,"yearly_salary":18480}',
'{"id":101,"firstname":"Mariele","age":18,"city":"Cairo","profession":"developer","salary":5800,"yearly_salary":69600}',
'{"id":102,"firstname":"Marnia","age":30,"city":"Baku","profession":"developer","salary":7136,"yearly_salary":85632}',
'{"id":103,"firstname":"Riannon","age":56,"city":"Philadelphia","profession":"doctor","salary":4986,"yearly_salary":59832}',
'{"id":104,"firstname":"Paule","age":25,"city":"Taipei","profession":"engineer","salary":6553,"yearly_salary":78636}']

toPandas()

The toPandas() returns the Spark DataFrame into Pandas DataFrame.

df.toPandas().head()

cache()

The cache() function caches the results into an Spark RDD in memory, so that any re-runs of the cached DataFrame there will be significant performance benefit about processing time.

df.cache()df.is_cachedTrue

rdd

The rdd function converts Spark DataFrame into Spark RDD.

df.rdd.take(5)[Row(id=100, firstname='Brianna', age=32, city='San Fernando', profession='doctor', salary=1540, yearly_salary=18480),
Row(id=101, firstname='Mariele', age=18, city='Cairo', profession='developer', salary=5800, yearly_salary=69600),
Row(id=102, firstname='Marnia', age=30, city='Baku', profession='developer', salary=7136, yearly_salary=85632),
Row(id=103, firstname='Riannon', age=56, city='Philadelphia', profession='doctor', salary=4986, yearly_salary=59832),
Row(id=104, firstname='Paule', age=25, city='Taipei', profession='engineer', salary=6553, yearly_salary=78636)]

columns

The columns function returns all column names in a list format.

people.columns['id', 'firstname', 'age', 'city', 'profession', 'salary', 'yearly_salary']

dtypes

The dtypes function returns all column names and their data types in a list format.

df.dtypes[('id', 'int'),
('firstname', 'string'),
('age', 'int'),
('city', 'string'),
('profession', 'string'),
('salary', 'int'),
('yearly_salary', 'int')]

schema

The schema function returns the schema of the DataFrame as a StructType.

df.schemaStructType(List(StructField(id,IntegerType,true),StructField(firstname,StringType,true),StructField(age,IntegerType,true),StructField(city,StringType,true),StructField(profession,StringType,true),StructField(salary,IntegerType,true),StructField(yearly_salary,IntegerType,true)))

write & save

The write and save functions saves the DataFrame into a storage in a specified format.

df.write.save("customers.csv",format="csv")df.select("firstname","city").write.save("customers.csv",format="csv")

Data Cleaning Example

Up to this point we already have seen plenty of functions used for data processing. Now it will be better to add some other functions by an example about data cleaning as the last part of my article.

df_dirty.show(10)+---+---------+---+--------------+----------+------+
| id|firstname|age| city|profession|salary|
+---+---------+---+--------------+----------+------+
|100| BRianna| 32| San Fernando| Doctor| 1540|
|101| Mariele| 18| Cairo | Developer| null|
|102| marnia | 30| Baku| Developer| 7136|
|103| Riannon| 56| Philadelphia| Doctor| 4986|
|104| Paule| 25| Taipei | Engineer| 6553|
|105| Averyl| 35|Guatemala City| engineer| 5767|
|106| Mady| 23| Freetown| null| 6087|
|107| Ardeen| 36| Suez| musician| 2879|
|108| Audrie| 17| Puebla| musician| 7792|
|109| Mignon| 34| Saint-Pierre| developer| 2922|
+---+---------+---+--------------+----------+------+
df_dirty.withColumn("firstname",trim(initcap("firstname")))\
.withColumn("city",trim("city"))\
.withColumn("profession",lower(when(df_dirty.profession.isNull(),"unknown")\
.otherwise(df_dirty.profession)))\
.withColumn("salary",when(df_dirty.salary.isNull(),df_dirty.agg(round(avg("salary"))).head()[0])\
.otherwise(df_dirty.salary).cast("float")).show(10)
+---+---------+---+--------------+----------+------+
| id|firstname|age| city|profession|salary|
+---+---------+---+--------------+----------+------+
|100| Brianna| 32| San Fernando| doctor|1540.0|
|101| Mariele| 18| Cairo| developer|4778.0|
|102| Marnia| 30| Baku| developer|7136.0|
|103| Riannon| 56| Philadelphia| doctor|4986.0|
|104| Paule| 25| Taipei| engineer|6553.0|
|105| Averyl| 35|Guatemala City| engineer|5767.0|
|106| Mady| 23| Freetown| unknown|6087.0|
|107| Ardeen| 36| Suez| musician|2879.0|
|108| Audrie| 17| Puebla| musician|7792.0|
|109| Mignon| 34| Saint-Pierre| developer|2922.0|
+---+---------+---+--------------+----------+------+

Conclusion

In this article, I have tried to introduce you to brief some of the most common operations on DataFrame (structured part) in Apache Spark with using Pyspark API of which we will need during dealing with data. There are a few more operations defined on DataFrame, you can always improve your knowledge by searching SparkSQL Programming Guide and Python Api docs for pyspark in Apache Spark documentations.

I hope you will find this article helpful. In the next article, I will talk about Spark-Streaming which is also another very important part of Apache Spark programming.

I will be happy to hear any comments or questions from you. May the data be with you!

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store