PySpark Collection Functions: A Comprehensive Guide
Practical Examples of PySpark Array and Collection Functions
Intro
Collection functions in Spark are functions that operate on a collection of data elements, such as an array or a sequence. These functions allow you to manipulate and transform the data in various ways.
Here are some of the most commonly used collection functions in Spark:
Sample Data
# Import required PySpark modules
from pyspark.sql.functions import array_contains, array_sort, array_union, array_intersect
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("array_functions_example").getOrCreate()
# Create sample data
data = [("Alice", [2, 4, 6]),
("Bob", [1, 2, 3]),
("Charlie", [4, 5, 6])]
df = spark.createDataFrame(data, ["Name", "Numbers"])
df.show()
# Output
+-------+---------+
| Name| Numbers|
+-------+---------+
| Alice|[2, 4, 6]|
| Bob|[1, 2, 3]|
|Charlie|[4, 5, 6]|
+-------+---------+
1. ARRAY_CONTAINS
array_contains(col,value)
: Returns true if the input array contains the specified value, false otherwise.
# Using array_contains function to filter rows
df.filter(array_contains(df.Numbers, 4)).show()
# Output
+-----+---------+
| Name| Numbers|
+-----+---------+
|Alice|[2, 4, 6]|
+-----+---------+
This filters the rows in the DataFrame to only show rows where the “Numbers” array contains the value 4. The output only includes the row for Alice since only her array contains 4.
2. ARRAY_SORT
array_sort(col)
: Sorts the input array in ascending order.
# Using array_sort function to sort the array elements
df.select("Name", array_sort(df.Numbers).alias("Sorted_Numbers")).show()
# Output
+-------+--------------+
| Name|Sorted_Numbers|
+-------+--------------+
| Alice| [2, 4, 6]|
| Bob| [1, 2, 3]|
|Charlie| [4, 5, 6]|
+-------+--------------+
This selects the “Name” column and a new column called “Sorted_Numbers”, which contains the “Numbers” array sorted in ascending order. The output shows the sorted arrays for each row.
3. ARRAY_UNION
array_union(col1,col2)
: Returns an array of the elements in the union of col1 and col2, without duplicates.
# Using array_union function to find the unique elements in the arrays
df.select("Name", array_union(df.Numbers, df.Numbers).alias("Unique_Numbers")).show()
# Output
+-------+---------------+
| Name| Unique_Numbers|
+-------+---------------+
| Alice|[2, 4, 6, 2, 4]|
| Bob|[1, 2, 3, 1, 2]|
|Charlie|[4, 5, 6, 4, 5]|
+-------+---------------+
This selects the “Name” column and a new column called “Unique_Numbers”, which contains the unique elements in the “Numbers” array. The output shows the unique arrays for each row.
4. ARRAY_INTERSECT
array_intersect(col1,col2)
: Returns an array of the elements in the intersection of col1 and col2, without duplicates.
# Using array_intersect function to find the common elements in the arrays
df.select("Name", array_intersect(df.Numbers, [2, 4]).alias("Common_Numbers")).show()
# Output
+-------+--------------+
| Name|Common_Numbers|
+-------+--------------+
| Alice| [2, 4] |
| Bob| [2] |
|Charlie| [4] |
+-------+--------------+
This selects the “Name” column and a new column called “Common_Numbers”, which contains the elements that are common between the “Numbers” array and the array
[2, 4]
. The output shows the common elements for each row.
5. FILTER
filter(col,filter)
: the slice
function extracts the elements of the "Numbers" array as specified and returns a new array that is assigned to the "Sliced_Numbers" column in the resulting DataFrame.
# Select the "Name" column and a slice of the "Numbers" array column
sliced_df = df.select("Name", slice(df.Numbers, 2, 3).alias("Sliced_Numbers"))
# Output
+-------+--------------+
| Name|Sliced_Numbers|
+-------+--------------+
| Alice| [4, 6]|
| Bob| [2, 3]|
|Charlie| [5, 6]|
+-------+--------------+
In this example, we’re using the
slice
function to extract a slice of each array in the "Numbers" column, specifically the elements from the second index (inclusive) up to the fourth index (exclusive). The resulting DataFrame,sliced_df
, contains the "Name" column and a new column called "Sliced_Numbers" that contains the sliced arrays.
6. SLICE
slice(data,start,length)
: The slice
function extracts the elements of the "Numbers" array as specified and returns a new array that is assigned to the "Sliced_Numbers" column in the resulting DataFrame.
# Select the "Name" column and a slice of the "Numbers" array column
sliced_df = df.select("Name", slice(df.Numbers, 2, 3).alias("Sliced_Numbers"))
# Output
+-------+--------------+
| Name|Sliced_Numbers|
+-------+--------------+
| Alice| [4, 6]|
| Bob| [2, 3]|
|Charlie| [5, 6]|
+-------+--------------+
In this example, we’re using the
slice
function to extract a slice of each array in the "Numbers" column, specifically the elements from the second index (inclusive) up to the fourth index (exclusive). The resulting DataFrame,sliced_df
, contains the "Name" column and a new column called "Sliced_Numbers" that contains the sliced arrays.
7. EXPLODE
explode(col)
: The explode
function generates a new row for each element in the "Numbers" array. In the resulting DataFrame, each row now represents a single element in the array, and the "Name" column is repeated for each new row.
# Explode the "Numbers" array column into a new row for each element in the array
exploded_df = df.select("Name", explode(df.Numbers).alias("Number"))
# Output
+-------+------+
| Name|Number|
+-------+------+
| Alice| 2|
| Alice| 4|
| Alice| 6|
| Bob| 1|
| Bob| 2|
| Bob| 3|
|Charlie| 4|
|Charlie| 5|
|Charlie| 6|
+-------+------+
In this example, we’re using the
explode
function to transform the "Numbers" array column into a new row for each element in the array. The resulting DataFrame,exploded_df
, contains two columns: "Name" and "Number".
8. SIZE
size(col)
: The size
function returns the size of each array in the "Numbers" column. The resulting DataFrame, sized_df
, contains a new column called "Size" that contains the size of each array.
# Add a new column to the DataFrame containing the size of the "Numbers" array
sized_df = df.withColumn("Size", size(df.Numbers))
# Output
+-------+---------+----+
| Name| Numbers|Size|
+-------+---------+----+
| Alice|[2, 4, 6]| 3|
| Bob|[1, 2, 3]| 3|
|Charlie|[4, 5, 6]| 3|
+-------+---------+----+
In this example, we’re using the
size
function to compute the size of each array in the "Numbers" column. We add a new column to the DataFrame called "Size" that contains the size of each array.
9. FROM_JSON
from_json()
: The from_json
function to convert the "json" column to a struct type using the specified schema.
10. TO_JSON
to_json()
: the to_json
function to select columns from the "data" struct and convert them to JSON.
from pyspark.sql.functions import from_json, to_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Define a schema for the JSON data
schema = StructType([
StructField("name", StringType()),
StructField("age", IntegerType()),
StructField("address", StringType())
])
# Create a PySpark DataFrame with sample data
data = [
('{"name":"Alice", "age": 25, "address":"123 Main St"}',),
('{"name":"Bob", "age": 30, "address":"456 Elm St"}',),
('{"name":"Charlie", "age": 35, "address":"789 Oak St"}',)
]
df = spark.createDataFrame(data, ["json"])
# Convert the "json" column to a struct type using the specified schema
json_df = df.withColumn("data", from_json(df.json, schema))
# Select the "name" and "age" columns from the "data" struct and convert them to JSON
json_string_df = json_df.select(to_json(json_df.data.name, json_df.data.age).alias("json_string"))
# Show the resulting DataFrame
json_string_df.show()
# Output
+--------------------+
| json_string|
+--------------------+
|{"name":"Alice","...|
|{"name":"Bob","ag...|
|{"name":"Charlie"...|
+--------------------+
The
to_json
function returns a DataFrame with a single column called "json_string", which contains the JSON representation of the "name" and "age" columns from the "data" struct.
11. FROM_CSV
from_csv()
: The from_csv
function to convert the "csv" column to a struct type using the specified schema.
12. TO_CSV
to_csv()
: The to_csv
function to select the columns from the "data" struct and convert them to CSV.
from pyspark.sql.functions import from_csv, to_csv
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
# Define a schema for the CSV data
schema = StructType([
StructField("name", StringType()),
StructField("age", IntegerType()),
StructField("address", StringType())
])
# Create a PySpark DataFrame with sample data
data = [
('Alice,25,123 Main St',),
('Bob,30,456 Elm St',),
('Charlie,35,789 Oak St',)
]
df = spark.createDataFrame(data, ["csv"])
# Convert the "csv" column to a struct type using the specified schema
csv_df = df.select(from_csv(df.csv, schema).alias("data"))
# Select the "name" and "age" columns from the "data" struct and convert them to CSV
csv_string_df = csv_df.select(to_csv(csv_df.data.name, csv_df.data.age).alias("csv_string"))
# Show the resulting DataFrame
csv_string_df.show()
# Output
+------------------+
| csv_string |
+------------------+
|Alice,25\r\n |
|Bob,30\r\n |
|Charlie,35\r\n |
+------------------+
The
to_csv
function returns a DataFrame with a single column called "csv_string", which contains the CSV representation of the "name" and "age" columns from the "data" struct.
Conclusion
The above article explains a few collection functions in PySpark and how they can be used with examples. This is a part of PySpark functions series by me, check out my PySpark SQL 101 series and other articles. Enjoy Reading..
Apache Spark Functions Guide — https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html?