Important Pyspark SQL Functions with Examples
PySpark SQL has become synonymous with scalability and efficiency.
Understanding PySpark’s SQL module is becoming increasingly important as more Python developers use it to leverage the potential of distributed computing.
In this blog, we will look at a handpicked collection of some fundamental PySpark SQL functions, dissecting their importance and illustrating their use.
Advanced Functions
- APPROX_COUNT_DISTINCT: It is an aggregate function that returns a new
Column
for approximate distinct count of column col.
>>> df.agg(approx_count_distinct(df.age).alias('distinct_ages')).collect()
[Row(distinct_ages=2)]
- HASH Function: The
hash()
function generates a hash value for the specified columns, providing a unique identifier based on the column values.
>>> spark.createDataFrame([('ABC',)], ['a']).select(hash('a').alias('hash')).collect()
[Row(hash=-757602832)]
- STRING FUNCTIONS — INITCAP: The
initcap()
function capitalizes the first letter of each word in a string column.
>>> spark.createDataFrame([('ab cd',)], ['a']).select(initcap("a").alias('v')).collect()
[Row(v='Ab Cd')]
- ARRAY FUNCTIONS — ARRAY_CONTAINS: The
array_contains()
function checks whether a specified value exists in an array column.
>>> df = spark.createDataFrame([(["a", "b", "c"],), ([],)], ['data'])
>>> df.select(array_contains(df.data, "a")).collect()
[Row(array_contains(data, a)=True), Row(array_contains(data, a)=False)]
>>> df.select(array_contains(df.data, lit("a"))).collect()
[Row(array_contains(data, a)=True), Row(array_contains(data, a)=False)]
- PERCENTILE: PySpark SQL’s
percentile()
function enables the calculation of percentile values, offering insights into data distribution.
>>> df.groupBy("key").agg(
percentile("value", 0.5, lit(1)).alias("median")
).show()
+---+--------------------+
|key| median|
+---+--------------------+
| 0|-0.03449962216667901|
| 1| 9.990389751837329|
| 2| 19.967859769284075|
+---+--------------------+
- NTH_VALUE: The
nth_value()
function allows users to extract the nth value in an ordered set, adding a layer of flexibility to result retrieval.
It will return the offsetth non-null value it sees when ignoreNulls is set to true. If all values are null, then null is returned.
nth_value_result = df.withColumn("result", F.nth_value("column_name", 2).over(Window.orderBy("ordering_column")))
- Pivot: The
pivot()
function is instrumental in transforming data by rotating columns into rows.
df.groupBy(["col1"]).pivot('col2').sum('col3').show()
- Concatenation: The
concat()
function is employed for combining columns or strings within a DataFrame.
df.withColumn("concat",F.concat("col1","col2")).show()
- Conditional Logic (When): The
when()
function facilitates conditional logic within PySpark SQL queries.
ds.withColumn('cond',F.when((ds.TV>100)&(ds.Radio<40),1)\
.when(ds.Sales>10, 2)\
.otherwise(3)).show(4)
- COALESCE: The
coalesce()
function is a versatile tool for handling null values, returning the first non-null expression.
coalesced_df = df.withColumn("result", F.coalesce("col1", "col2", "col3"))
- ROUND: The
round()
function simplifies precision management in numerical columns.
rounded_values = df.withColumn("rounded_column", F.round("numeric_column", 2))
WINDOW Functions:
- LAG Function: The
lag()
function in a windowed context allows users to access previous rows' values, useful for trend analysis.
window_spec = Window.orderBy("ordering_column")
lag_result = df.withColumn("lagged_value", F.lag("column_name").over(window_spec))
- LEAD Function: Similar to the
lag()
function, thelead()
function allows you to access subsequent rows' values within a window.
window_spec = Window.orderBy("ordering_column")
lead_result = df.withColumn("leading_value", F.lead("column_name").over(window_spec))
- PERCENT_RANK: The
percent_rank()
function computes the relative rank of a row within a result set, providing insights into data distribution.
>>> from pyspark.sql import Window, types
>>> df = spark.createDataFrame([1, 1, 2, 3, 3, 4], types.IntegerType())
>>> w = Window.orderBy("value")
>>> df.withColumn("pr", percent_rank().over(w)).show()
+-----+---+
|value| pr|
+-----+---+
| 1|0.0|
| 1|0.0|
| 2|0.4|
| 3|0.6|
| 3|0.6|
| 4|1.0|
+-----+---+
FILTER (WHERE) Clause:
- FILTER with LIKE: Extending the basic
filter()
functionality, this example showcases the use of the LIKE clause for pattern matching.
filtered_df = df.filter(df["column_name"].like("pattern%"))
GROUP BY and AGGREGATE Functions:
- AVG (Average): The
avg()
function calculates the average of a numerical column within grouped data.
avg_age = df.groupBy("group_column").agg(F.avg("numeric_column").alias("average_value"))
- MAX and MIN: Utilizing
max()
andmin()
functions provides a quick overview of the maximum and minimum values in a grouped dataset.
max_min_values = df.groupBy("group_column").agg(F.max("numeric_column"), F.min("numeric_column"))
JOIN Operations:
- LEFT JOIN: While common, the
join()
function in action with a left join illustrates the seamless integration of datasets.
joined_df = df1.join(df2, "common_column", "left")
References
Follow @brilliantprogrammer for more such blogs and also clap if you like it.
Please reach out via Linkedin or Github in case of any questions!