Important Pyspark SQL Functions with Examples

Deepanshu tyagi
DataEngineering.py
Published in
3 min readJan 4, 2024
Apache Pyspark

PySpark SQL has become synonymous with scalability and efficiency.

Understanding PySpark’s SQL module is becoming increasingly important as more Python developers use it to leverage the potential of distributed computing.

In this blog, we will look at a handpicked collection of some fundamental PySpark SQL functions, dissecting their importance and illustrating their use.

Advanced Functions

  • APPROX_COUNT_DISTINCT: It is an aggregate function that returns a new Column for approximate distinct count of column col.
>>> df.agg(approx_count_distinct(df.age).alias('distinct_ages')).collect()
[Row(distinct_ages=2)]
  • HASH Function: The hash() function generates a hash value for the specified columns, providing a unique identifier based on the column values.
>>> spark.createDataFrame([('ABC',)], ['a']).select(hash('a').alias('hash')).collect()
[Row(hash=-757602832)]
  • STRING FUNCTIONS — INITCAP: The initcap() function capitalizes the first letter of each word in a string column.
>>> spark.createDataFrame([('ab cd',)], ['a']).select(initcap("a").alias('v')).collect()
[Row(v='Ab Cd')]
  • ARRAY FUNCTIONS — ARRAY_CONTAINS: The array_contains() function checks whether a specified value exists in an array column.
>>> df = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
>>> df.select(array_contains(df.data, "a")).collect()
[Row(array_contains(data, a)=True), Row(array_contains(data, a)=False)]
>>> df.select(array_contains(df.data, lit("a"))).collect()
[Row(array_contains(data, a)=True), Row(array_contains(data, a)=False)]
  • PERCENTILE: PySpark SQL’s percentile() function enables the calculation of percentile values, offering insights into data distribution.
>>> df.groupBy("key").agg(
percentile("value", 0.5, lit(1)).alias("median")
).show()
+---+--------------------+
|key| median|
+---+--------------------+
| 0|-0.03449962216667901|
| 1| 9.990389751837329|
| 2| 19.967859769284075|
+---+--------------------+
  • NTH_VALUE: The nth_value() function allows users to extract the nth value in an ordered set, adding a layer of flexibility to result retrieval.

It will return the offsetth non-null value it sees when ignoreNulls is set to true. If all values are null, then null is returned.

nth_value_result = df.withColumn("result", F.nth_value("column_name", 2).over(Window.orderBy("ordering_column")))
  • Pivot: The pivot() function is instrumental in transforming data by rotating columns into rows.
df.groupBy(["col1"]).pivot('col2').sum('col3').show()
  • Concatenation: The concat() function is employed for combining columns or strings within a DataFrame.
df.withColumn("concat",F.concat("col1","col2")).show()
  • Conditional Logic (When): The when() function facilitates conditional logic within PySpark SQL queries.
ds.withColumn('cond',F.when((ds.TV>100)&(ds.Radio<40),1)\
.when(ds.Sales>10, 2)\
.otherwise(3)).show(4)
  • COALESCE: The coalesce() function is a versatile tool for handling null values, returning the first non-null expression.
coalesced_df = df.withColumn("result", F.coalesce("col1", "col2", "col3"))
  • ROUND: The round() function simplifies precision management in numerical columns.
rounded_values = df.withColumn("rounded_column", F.round("numeric_column", 2))

WINDOW Functions:

  • LAG Function: The lag() function in a windowed context allows users to access previous rows' values, useful for trend analysis.
window_spec = Window.orderBy("ordering_column")
lag_result = df.withColumn("lagged_value", F.lag("column_name").over(window_spec))
  • LEAD Function: Similar to the lag() function, the lead() function allows you to access subsequent rows' values within a window.
window_spec = Window.orderBy("ordering_column")
lead_result = df.withColumn("leading_value", F.lead("column_name").over(window_spec))
  • PERCENT_RANK: The percent_rank() function computes the relative rank of a row within a result set, providing insights into data distribution.
>>> from pyspark.sql import Window, types
>>> df = spark.createDataFrame([1, 1, 2, 3, 3, 4], types.IntegerType())
>>> w = Window.orderBy("value")
>>> df.withColumn("pr", percent_rank().over(w)).show()
+-----+---+
|value| pr|
+-----+---+
| 1|0.0|
| 1|0.0|
| 2|0.4|
| 3|0.6|
| 3|0.6|
| 4|1.0|
+-----+---+

FILTER (WHERE) Clause:

  • FILTER with LIKE: Extending the basic filter() functionality, this example showcases the use of the LIKE clause for pattern matching.
filtered_df = df.filter(df["column_name"].like("pattern%"))

GROUP BY and AGGREGATE Functions:

  • AVG (Average): The avg() function calculates the average of a numerical column within grouped data.
avg_age = df.groupBy("group_column").agg(F.avg("numeric_column").alias("average_value"))
  • MAX and MIN: Utilizing max() and min() functions provides a quick overview of the maximum and minimum values in a grouped dataset.
max_min_values = df.groupBy("group_column").agg(F.max("numeric_column"), F.min("numeric_column"))

JOIN Operations:

  • LEFT JOIN: While common, the join() function in action with a left join illustrates the seamless integration of datasets.
joined_df = df1.join(df2, "common_column", "left")

References

Follow @brilliantprogrammer for more such blogs and also clap if you like it.

Please reach out via Linkedin or Github in case of any questions!

--

--