Schema Definition and Ranking using DataFrame and SQL in PySpark

Raviteja Pasarakonda
Analytics Vidhya
Published in
3 min readSep 21, 2020

Ranking is one of the most important concepts in Spark when it comes to analytics use-cases. This blog will be covering about implementing rank, dense rank, and row number in Spark. Along with it, will cover about creating a schema on top of data.

The difference between rank and denseRank is that denseRank leaves no gaps in ranking sequence when there are ties. That is, if you were ranking a competition using denseRank and had three people tie for second place, you would say that all three were in second place and that the next person came in third.

Problem statement: Rank the top 25 cities based on the total number of customers in ‘TX’ state.

Pre-requisites: Hortonworks or Cloudera VM

Steps of execution:

  1. Importing data from MySQL to HDFS
  2. Spark Execution

Importing data from MySQL to HDFS

The solution requires “customers” tables from MySQL to be imported in CSV format.

Checkout my post “Use-case on Sqoop, HDFS, Hive, and Spark” on how to import data from MySQL to HDFS and for more info on how to load data into MySQL.

#Create folder in HDFS
hdfs dfs -mkdir /user/hdfs/spark_usecase
#Customers Table: CSV Format
sqoop import --connect jdbc:mysql://localhost:3306/retail_db --username retail_dba --password hadoop --table customers --target-dir /user/cloudera/spark_usecase/customers;

Spark Execution using DataFrame

Required data is now imported to HDFS. Using Spark, data is now loaded by defining the schema while reading.

#Spark shell
pyspark
#Reading CSV data and defining schema
customers = spark.read.csv('/user/cloudera/spark_usecase/customers', schema='id int, fname string, lname string, email string, password string, street string, city string, state string, zipcode string')
#Importing required function(s)
from pyspark.sql.functions import count, col
#Filtering TX customers, finding # of customers per city
cus_count = customers.where(customers.state == 'TX').groupBy('state', 'city').agg(count('id').alias('cus_count')).orderBy('cus_count', ascending=False).limit(25)
#Importing required function(s)
from pyspark.sql import Window
#Creating window based on state and order by # of customers
window =
Window.partitionBy('state').orderBy(col('cus_count').desc())
#Importing required function(s)
from pyspark.sql.functions import rank, dense_rank, row_number
#Finding rank, dense rank and row number on data
result = cus_count.withColumn('rank', rank().over(window)).withColumn('dense_rank', dense_rank().over(window)).withColumn('row_num', row_number().over(window))
#Printing result
result.show(25)
#Writing result in JSON into HDFS
result.write.json('/user/cloudera/spark_usecase/output')

Spark Execution using SQL

The same use-case can be achieved using Spark SQL with the following approach.

#Spark shell
pyspark
#Reading CSV data and defining schema
customers = spark.read.csv('/user/cloudera/spark_usecase/customers', schema='id int, fname string, lname string, email string, password string, street string, city string, state string, zipcode string')
#Create a global temporary view with this DataFrame
customers.createTempView('customers')
#Create a global temporary view with this DataFrame
result = spark.sql('select state, city, cus_count, rank() over (partition by state order by cus_count desc) as rank, dense_rank() over (partition by state order by cus_count desc) as dense_rank, row_number() over (partition by state order by cus_count desc) as row_num from (select state, city, count(*) as cus_count from customers where state = "TX" group by state, city order by cus_count desc limit 25)')
#Printing result
result.show(25)
#Writing result in JSON into HDFS
result.write.json('/user/cloudera/spark_usecase/output')

Result

“Houston” ranks 1st with 91 customers, “Brownsville” and “Plano” ranks at 10 with an equal number of customers. The difference between rank and dense rank can be observed for the city “San Benito” where denseRank leaves no gaps while ranking.

Note: The result below is based on the data loaded in MySQL mentioned above.

+-----+--------------------+---------+----+----------+-------+
|state| city|cus_count|rank|dense_rank|row_num|
+-----+--------------------+---------+----+----------+-------+
| TX| Houston| 91| 1| 1| 1|
| TX| Dallas| 75| 2| 2| 2|
| TX| San Antonio| 53| 3| 3| 3|
| TX| El Paso| 42| 4| 4| 4|
| TX| Fort Worth| 27| 5| 5| 5|
| TX| Austin| 25| 6| 6| 6|
| TX| Laredo| 23| 7| 7| 7|
| TX| Amarillo| 19| 8| 8| 8|
| TX| Sugar Land| 17| 9| 9| 9|
| TX| Brownsville| 16| 10| 10| 10|
| TX| Plano| 16| 10| 10| 11|
| TX| Irving| 15| 12| 11| 12|
| TX| San Benito| 13| 13| 12| 13|
| TX| College Station| 13| 13| 12| 14|
| TX| Carrollton| 12| 15| 13| 15|
| TX| Weslaco| 12| 15| 13| 16|
| TX|North Richland Hills| 12| 15| 13| 17|
| TX| Mission| 11| 18| 14| 18|
| TX| Del Rio| 11| 18| 14| 19|
| TX| Richmond| 10| 20| 15| 20|
| TX| Mesquite| 10| 20| 15| 21|
| TX| Harlingen| 10| 20| 15| 22|
| TX| Katy| 10| 20| 15| 23|
| TX| Pharr| 9| 24| 16| 24|
| TX| San Marcos| 9| 24| 16| 25|
+-----+--------------------+---------+----+----------+-------+

Reference

Check out my post “How I completed CCA Spark and Hadoop Developer (CCA175) Certification” to know about my learning details. Also, Spark Documentation on method definitions.

--

--