PySpark Collection Functions: A Comprehensive Guide

Practical Examples of PySpark Array and Collection Functions

Ahmed Uz Zaman
6 min readMar 17, 2023
Photo by Jason Leung on Unsplash

Intro

Collection functions in Spark are functions that operate on a collection of data elements, such as an array or a sequence. These functions allow you to manipulate and transform the data in various ways.

Here are some of the most commonly used collection functions in Spark:

Sample Data

# Import required PySpark modules
from pyspark.sql.functions import array_contains, array_sort, array_union, array_intersect
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("array_functions_example").getOrCreate()

# Create sample data
data = [("Alice", [2, 4, 6]),
("Bob", [1, 2, 3]),
("Charlie", [4, 5, 6])]

df = spark.createDataFrame(data, ["Name", "Numbers"])
df.show()

# Output
+-------+---------+
| Name| Numbers|
+-------+---------+
| Alice|[2, 4, 6]|
| Bob|[1, 2, 3]|
|Charlie|[4, 5, 6]|
+-------+---------+

1. ARRAY_CONTAINS

array_contains(col,value) : Returns true if the input array contains the specified value, false otherwise.

# Using array_contains function to filter rows
df.filter(array_contains(df.Numbers, 4)).show()

# Output
+-----+---------+
| Name| Numbers|
+-----+---------+
|Alice|[2, 4, 6]|
+-----+---------+

This filters the rows in the DataFrame to only show rows where the “Numbers” array contains the value 4. The output only includes the row for Alice since only her array contains 4.

2. ARRAY_SORT

array_sort(col) : Sorts the input array in ascending order.

# Using array_sort function to sort the array elements
df.select("Name", array_sort(df.Numbers).alias("Sorted_Numbers")).show()

# Output
+-------+--------------+
| Name|Sorted_Numbers|
+-------+--------------+
| Alice| [2, 4, 6]|
| Bob| [1, 2, 3]|
|Charlie| [4, 5, 6]|
+-------+--------------+

This selects the “Name” column and a new column called “Sorted_Numbers”, which contains the “Numbers” array sorted in ascending order. The output shows the sorted arrays for each row.

3. ARRAY_UNION

array_union(col1,col2) : Returns an array of the elements in the union of col1 and col2, without duplicates.

# Using array_union function to find the unique elements in the arrays
df.select("Name", array_union(df.Numbers, df.Numbers).alias("Unique_Numbers")).show()

# Output
+-------+---------------+
| Name| Unique_Numbers|
+-------+---------------+
| Alice|[2, 4, 6, 2, 4]|
| Bob|[1, 2, 3, 1, 2]|
|Charlie|[4, 5, 6, 4, 5]|
+-------+---------------+

This selects the “Name” column and a new column called “Unique_Numbers”, which contains the unique elements in the “Numbers” array. The output shows the unique arrays for each row.

4. ARRAY_INTERSECT

array_intersect(col1,col2) : Returns an array of the elements in the intersection of col1 and col2, without duplicates.

# Using array_intersect function to find the common elements in the arrays
df.select("Name", array_intersect(df.Numbers, [2, 4]).alias("Common_Numbers")).show()

# Output
+-------+--------------+
| Name|Common_Numbers|
+-------+--------------+
| Alice| [2, 4] |
| Bob| [2] |
|Charlie| [4] |
+-------+--------------+

This selects the “Name” column and a new column called “Common_Numbers”, which contains the elements that are common between the “Numbers” array and the array [2, 4]. The output shows the common elements for each row.

5. FILTER

filter(col,filter) : the slice function extracts the elements of the "Numbers" array as specified and returns a new array that is assigned to the "Sliced_Numbers" column in the resulting DataFrame.

# Select the "Name" column and a slice of the "Numbers" array column
sliced_df = df.select("Name", slice(df.Numbers, 2, 3).alias("Sliced_Numbers"))

# Output
+-------+--------------+
| Name|Sliced_Numbers|
+-------+--------------+
| Alice| [4, 6]|
| Bob| [2, 3]|
|Charlie| [5, 6]|
+-------+--------------+

In this example, we’re using the slice function to extract a slice of each array in the "Numbers" column, specifically the elements from the second index (inclusive) up to the fourth index (exclusive). The resulting DataFrame, sliced_df, contains the "Name" column and a new column called "Sliced_Numbers" that contains the sliced arrays.

6. SLICE

slice(data,start,length) : The slice function extracts the elements of the "Numbers" array as specified and returns a new array that is assigned to the "Sliced_Numbers" column in the resulting DataFrame.

# Select the "Name" column and a slice of the "Numbers" array column
sliced_df = df.select("Name", slice(df.Numbers, 2, 3).alias("Sliced_Numbers"))

# Output
+-------+--------------+
| Name|Sliced_Numbers|
+-------+--------------+
| Alice| [4, 6]|
| Bob| [2, 3]|
|Charlie| [5, 6]|
+-------+--------------+

In this example, we’re using the slice function to extract a slice of each array in the "Numbers" column, specifically the elements from the second index (inclusive) up to the fourth index (exclusive). The resulting DataFrame, sliced_df, contains the "Name" column and a new column called "Sliced_Numbers" that contains the sliced arrays.

7. EXPLODE

explode(col) : The explode function generates a new row for each element in the "Numbers" array. In the resulting DataFrame, each row now represents a single element in the array, and the "Name" column is repeated for each new row.

# Explode the "Numbers" array column into a new row for each element in the array
exploded_df = df.select("Name", explode(df.Numbers).alias("Number"))

# Output
+-------+------+
| Name|Number|
+-------+------+
| Alice| 2|
| Alice| 4|
| Alice| 6|
| Bob| 1|
| Bob| 2|
| Bob| 3|
|Charlie| 4|
|Charlie| 5|
|Charlie| 6|
+-------+------+

In this example, we’re using the explode function to transform the "Numbers" array column into a new row for each element in the array. The resulting DataFrame, exploded_df, contains two columns: "Name" and "Number".

8. SIZE

size(col) : The size function returns the size of each array in the "Numbers" column. The resulting DataFrame, sized_df, contains a new column called "Size" that contains the size of each array.

# Add a new column to the DataFrame containing the size of the "Numbers" array
sized_df = df.withColumn("Size", size(df.Numbers))

# Output
+-------+---------+----+
| Name| Numbers|Size|
+-------+---------+----+
| Alice|[2, 4, 6]| 3|
| Bob|[1, 2, 3]| 3|
|Charlie|[4, 5, 6]| 3|
+-------+---------+----+

In this example, we’re using the size function to compute the size of each array in the "Numbers" column. We add a new column to the DataFrame called "Size" that contains the size of each array.

9. FROM_JSON

from_json() : The from_json function to convert the "json" column to a struct type using the specified schema.

10. TO_JSON

to_json() : the to_json function to select columns from the "data" struct and convert them to JSON.

from pyspark.sql.functions import from_json, to_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define a schema for the JSON data
schema = StructType([
StructField("name", StringType()),
StructField("age", IntegerType()),
StructField("address", StringType())
])

# Create a PySpark DataFrame with sample data
data = [
('{"name":"Alice", "age": 25, "address":"123 Main St"}',),
('{"name":"Bob", "age": 30, "address":"456 Elm St"}',),
('{"name":"Charlie", "age": 35, "address":"789 Oak St"}',)
]
df = spark.createDataFrame(data, ["json"])

# Convert the "json" column to a struct type using the specified schema
json_df = df.withColumn("data", from_json(df.json, schema))

# Select the "name" and "age" columns from the "data" struct and convert them to JSON
json_string_df = json_df.select(to_json(json_df.data.name, json_df.data.age).alias("json_string"))

# Show the resulting DataFrame
json_string_df.show()

# Output
+--------------------+
| json_string|
+--------------------+
|{"name":"Alice","...|
|{"name":"Bob","ag...|
|{"name":"Charlie"...|
+--------------------+

The to_json function returns a DataFrame with a single column called "json_string", which contains the JSON representation of the "name" and "age" columns from the "data" struct.

11. FROM_CSV

from_csv() : The from_csv function to convert the "csv" column to a struct type using the specified schema.

12. TO_CSV

to_csv() : The to_csv function to select the columns from the "data" struct and convert them to CSV.

from pyspark.sql.functions import from_csv, to_csv
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define a schema for the CSV data
schema = StructType([
StructField("name", StringType()),
StructField("age", IntegerType()),
StructField("address", StringType())
])

# Create a PySpark DataFrame with sample data
data = [
('Alice,25,123 Main St',),
('Bob,30,456 Elm St',),
('Charlie,35,789 Oak St',)
]
df = spark.createDataFrame(data, ["csv"])

# Convert the "csv" column to a struct type using the specified schema
csv_df = df.select(from_csv(df.csv, schema).alias("data"))

# Select the "name" and "age" columns from the "data" struct and convert them to CSV
csv_string_df = csv_df.select(to_csv(csv_df.data.name, csv_df.data.age).alias("csv_string"))

# Show the resulting DataFrame
csv_string_df.show()

# Output
+------------------+
| csv_string |
+------------------+
|Alice,25\r\n |
|Bob,30\r\n |
|Charlie,35\r\n |
+------------------+

The to_csv function returns a DataFrame with a single column called "csv_string", which contains the CSV representation of the "name" and "age" columns from the "data" struct.

Conclusion

The above article explains a few collection functions in PySpark and how they can be used with examples. This is a part of PySpark functions series by me, check out my PySpark SQL 101 series and other articles. Enjoy Reading..

Apache Spark Functions Guide — https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html?

--

--

Ahmed Uz Zaman

Lead QA Engineer | ETL Test Engineer | PySpark | SQL | AWS | Azure | Improvising Data Quality through innovative technologies | linkedin.com/in/ahmed-uz-zaman/