Introduction to PySpark Parquet API: Read and Write with Parameters

Understanding Parquet File Format in PySpark and its Parameters for Efficient Data Processing

Ahmed Uz Zaman
7 min readApr 5, 2023
Photo by David Emrich on Unsplash

Intro

The DataFrame API for Parquet in PySpark provides a high-level API for working with Parquet files in a distributed computing environment. The API is designed to work with the PySpark SQL engine and provides a simple way to read, write, and manipulate data in Parquet format.

Advantages of using the DataFrame API for Parquet in PySpark include:

  1. High performance: The API is designed to work with the distributed computing framework of PySpark, which provides high-performance data processing capabilities.
  2. Columnar storage: Parquet is a columnar storage format, which makes it an efficient way to store and process large datasets.
  3. Compression: Parquet files can be compressed, which can reduce storage requirements and improve data processing performance.
  4. Schema evolution: Parquet files support schema evolution, which makes it possible to add or remove columns from a dataset without having to reprocess the entire dataset.

The DataFrame API for Parquet in PySpark can be used in several ways, including:

  1. Reading Parquet files: The read.parquet() method can be used to read Parquet files into a PySpark DataFrame.
  2. Writing Parquet files: The write.parquet() method can be used to write a PySpark DataFrame to a Parquet file.
  3. Filtering and selecting data: The DataFrame API provides a set of methods for filtering and selecting data in a PySpark DataFrame, including filter() and select().
  4. Aggregating data: The DataFrame API provides a set of methods for aggregating data in a PySpark DataFrame, including groupBy() and agg().
  5. Joining data: The DataFrame API provides a set of methods for joining data in a PySpark DataFrame, including join() and crossJoin().

Examples

Reading Parquet files

Suppose you have a Parquet file at /path/to/parquet/file.parquet with the following schema:

# Schema
root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)

You can read the Parquet file into a PySpark DataFrame using the following code:

from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("ReadingParquet").getOrCreate()

df = spark.read.parquet("/path/to/parquet/file.parquet")
df.show()

# Output
name | age
-------+-----
Alice | 30
Bob | 40
Charlie| 50

This will display the contents of the DataFrame in a tabular format.

Writing Parquet files

If we have the same schema as used above, then you can write the DataFrame to a Parquet file using the following code:

df.write.parquet("/path/to/output/parquet/file.parquet")

# Output
name | age
-------+-----
Alice | 30
Bob | 40
Charlie| 50

This will write the contents of the DataFrame to a Parquet file at the specified path.

Filtering and selecting data

If we have the same schema as used above, then you can filter and select data in the DataFrame using the following code:

filtered_df = df.filter(df.age > 30).select(df.name)
filtered_df.show()

# Output
name | age
-------+-----
Alice | 30
Bob | 40
Charlie| 50

This will filter the DataFrame to include only rows where the age is greater than 30, and select only the name column. The resulting DataFrame will be displayed in a tabular format.

Aggregating data

Suppose you have a PySpark DataFrame called df with the following schema:

root
|-- name: string (nullable = true)
|-- age: integer (nullable = true)
|-- salary: double (nullable = true)

You can aggregate data in the DataFrame using the following code:

from pyspark.sql.functions import avg

agg_df = df.groupBy(df.age).agg(avg(df.salary))
agg_df.show()

# Output
name | age | salary
-------+-----+-------
Alice | 30 | 50000.0
Bob | 40 | 60000.0
Charlie| 50 | 70000.0

This will group the DataFrame by age, and calculate the average salary for each age group. The resulting DataFrame will be displayed in a tabular format.

Joining data

Suppose you have two PySpark DataFrames called df1 and df2 with the following schemas:

df1:
+---+-------+
|id |name |
+---+-------+
|1 |John |
|2 |Jane |
|3 |David |
+---+-------+

df2:
+---+-------+
|id |country|
+---+-------+
|1 |USA |
|2 |Canada |
|4 |France |
+---+-------+

You can join the DataFrames using the following code:

joined_df = df1.join(df2, "id", "left")
joined_df.show()

# Output
+---+------+-------+
|id |name |country|
+---+------+-------+
|1 |John |USA |
|2 |Jane |Canada |
|3 |David |null |
+---+------+-------+

This will join the two DataFrames on the id column, and display the resulting DataFrame in a tabular format.

Types of Parameters in Parquet

PySpark provides a number of options and parameters that can be used when reading from and writing to Parquet files. Here are some common options:

Reading Parquet files

  • path: The path to the Parquet file(s) to read from. This can be a single file or a directory containing multiple files.
  • mergeSchema: Whether to merge all Parquet schemas into a single schema. This is useful when reading from multiple files with different schemas.
  • inferSchema: Whether to infer the schema from the Parquet file. If set to True, PySpark will automatically determine the schema based on the Parquet file metadata. Otherwise, you can provide a schema manually using the schema parameter.
  • header: Whether to treat the first row of the file as a header. This is useful when the Parquet file has a header row that should be used as column names.
  • compression: The compression codec used for the Parquet file. Possible values are "uncompressed", "snappy", "gzip", "lzo", "lz4", "brotli", and "zstd".

Writing Parquet files

  • path: The path to write the Parquet file(s) to.
  • mode: The mode to use when writing to the file. Possible values are "overwrite", "append", "ignore", and "error". If set to "overwrite", any existing file with the same path will be replaced. If set to "append", the new data will be appended to the existing file. If set to "ignore", the write operation will be silently ignored if the file already exists. If set to "error", an error will be raised if the file already exists.
  • compression: The compression codec to use when writing the Parquet file. Possible values are "uncompressed", "snappy", "gzip", "lzo", "lz4", "brotli", and "zstd".
  • partitionBy: A list of column names to use for partitioning the Parquet file. This is useful for creating partitioned Parquet files, where the data is stored in separate directories based on the values of one or more columns. For example, if you partition by the column "date", the data for each date will be stored in a separate directory.
  • bucketBy: A column name to use for bucketing the Parquet file. This is useful for creating bucketed Parquet files, where the data is divided into buckets based on the values of one or more columns. Bucketing can improve query performance when the data is accessed based on the bucketed columns.
  • numBuckets: The number of buckets to use when bucketing the Parquet file. This option is only used when bucketBy is specified.

Example for Read and Write in Parquet

Here’s an example that demonstrates how to use all of the above options for reading and writing Parquet files in PySpark:

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

spark = SparkSession.builder.appName("ParquetReadWrite").getOrCreate()

# Define a schema for the data
schema = StructType([
StructField("id", IntegerType(), True),
StructField("name", StringType(), True),
StructField("age", IntegerType(), True),
StructField("gender", StringType(), True)
])

# Generate some sample data
data = [
(1, "John", 30, "Male"),
(2, "Jane", 35, "Female"),
(3, "David", 25, "Male"),
(4, "Emily", 40, "Female")
]
df = spark.createDataFrame(data, schema)

# Write the data to a Parquet file with partitioning and bucketing
df.write.mode("overwrite") \
.partitionBy("gender") \
.bucketBy(2, "age") \
.sortBy("name") \
.option("compression", "snappy") \
.parquet("/path/to/output/parquet")

# Read the data from the Parquet file with schema inference and merging
df = spark.read \
.option("mergeSchema", "true") \
.option("header", "true") \
.option("compression", "snappy") \
.parquet("/path/to/output/parquet")

# Show the result
df.show()

In this example, we first define a schema for the data that we want to write to the Parquet file. We then generate some sample data and create a DataFrame df from it.

Next, we use the write method of the DataFrameWriter object to write df to a Parquet file. We specify the following options:

  • mode: We set this to "overwrite" to replace any existing file with the same path.
  • partitionBy: We set this to "gender" to partition the data by the "gender" column.
  • bucketBy: We set this to (2, "age") to bucket the data into 2 buckets based on the "age" column.
  • sortBy: We set this to "name" to sort the data within each bucket by the "name" column.
  • option("compression", "snappy"): We set this to "snappy" to use the Snappy compression codec.

We then use the read method of the DataFrameReader object to read the data from the Parquet file. We specify the following options:

  • option("mergeSchema", "true"): We set this to "true" to merge all Parquet schemas into a single schema.
  • option("header", "true"): We set this to "true" to treat the first row of the file as a header.
  • option("compression", "snappy"): We set this to "snappy" to use the Snappy compression codec.

Finally, we use the show method to display the resulting DataFrame.

Conclusion

In this article, we have learned about how to use PySpark Parquet API to read and write data. Then we can use it to perform various Data Transformations, Data Analysis, Data Science, etc. Do check out my other articles on PySpark DataFrame API, SQL Basics, and Built-in Functions. Enjoy Reading.

--

--

Ahmed Uz Zaman

Lead QA Engineer | ETL Test Engineer | PySpark | SQL | AWS | Azure | Improvising Data Quality through innovative technologies | linkedin.com/in/ahmed-uz-zaman/