Photo by Mark Fletcher-Brown on Unsplash

Are You Using Parquet with Pandas in the Right Way?

Hussein Awala
Tech Blog
Published in
11 min readJul 24, 2023

--

Apache Parquet is an open-source columnar storage format that is designed to efficiently store and process large amounts of structured data. It was developed as part of the Apache Hadoop ecosystem and is widely used in big data processing frameworks like Apache Spark.

Why do we use Parquet?

The Parquet format is specifically optimized for analytical workloads and is commonly used in data engineering and data science tasks. It provides several key benefits that make it an excellent choice for storing and processing data:

  1. Columnar Storage: Unlike row-based storage formats, Parquet organizes data in a columnar fashion, which means that all values of a single column are stored together. This layout allows for better compression and encoding of similar data, leading to reduced storage space and improved query performance.
  2. Compression: Parquet supports various compression algorithms, such as Snappy, Gzip, and Zstandard, which further contribute to reducing storage requirements and can speed up data reading from disk.
  3. Schema Evolution: Parquet allows for schema evolution, meaning you can easily add, remove, or modify columns in the data without requiring a full rewrite of the dataset. This flexibility is particularly useful in scenarios where the schema evolves over time.
  4. Cross-Platform Compatibility: Parquet is designed to be a portable format, enabling seamless data exchange between different systems and programming languages.

Why do we use Parquet with pandas?

Pandas is a popular Python library for data manipulation and analysis. When working with large datasets in pandas, the performance of traditional file formats like CSV or JSON can be suboptimal due to their inherent limitations, such as row-based storage and lack of compression.

By using Parquet files with pandas, you can take advantage of the benefits provided by the columnar storage format. Reading data from Parquet files into pandas DataFrames can be significantly faster compared to row-based formats, especially when dealing with large datasets. Additionally, since pandas natively supports Parquet, it can efficiently leverage the format’s compression and encoding techniques, resulting in reduced memory usage and faster data processing.

Moreover, Parquet’s schema evolution feature aligns well with pandas’ flexibility, allowing you to easily handle data with varying column structures or add new columns to existing DataFrames without much overhead.

Parquet file structure

Apache Parquet file structure

At its core, a Parquet file is composed of a collection of row groups, and each row group contains a set of column chunks. These column chunks store the actual data for each column of the dataset.

Magic Number:

The magic number is a sequence of bytes at the beginning of a Parquet file (in the header), that identifies it as a Parquet file format. It is a fixed set of bytes (PAR1) that all Parquet files must begin with, allowing software to quickly identify whether a file is in the Parquet format.

Row Groups

A Parquet file is divided into one or more row groups. Each row group is essentially a horizontal partition of the dataset, containing a specific number of rows. This design enables efficient read and write operations, as it allows applications to read only the required row groups instead of scanning the entire file.

Pages

Within each row group, data is further divided into pages. Pages are the smallest unit of read and write in Parquet files. They represent a contiguous set of values for a particular column. Pages can be compressed independently, allowing for efficient compression and decompression.

Column Statistics

For every column chunk within a row group, Parquet can store column statistics. These statistics provide summary information about the data in each column, such as minimum and maximum values, the number of nulls, and other relevant data characteristics. Storing column statistics is beneficial for query optimization, as it allows Parquet readers to skip unnecessary row groups during data scanning, thus improving query performance.

Footer

The footer is located at the end of the Parquet file and contains essential metadata about the file’s structure. It includes information about the schema of the dataset, the compression algorithms used for each column chunk, and the data location of each row group. The footer provides a high-level overview of the file, enabling quick access to critical metadata for processing the Parquet file.

Processing Parquet files using pandas

When working with Parquet files in pandas, you have the flexibility to choose between two engines: fastparquet and pyarrow. Both engines are third-party libraries that provide support for reading and writing Parquet files, and pandas seamlessly integrates with them to offer enhanced performance and efficiency.

Fastparquet is a popular Python library optimized for fast reading and writing of Parquet files. It is known for its speed and low memory footprint, making it an excellent choice for working with large datasets.

Pyarrow is part of the Apache Arrow project and is designed to provide efficient data interchange between different systems and languages. It offers seamless integration with pandas, allowing for fast and optimized data processing with Parquet files.

In this section, I will explain how to predicate your filters to pyarrow to reduce the size of the dataset processed by pandas, in order to reduce the processing time and the resource consumption.

Creating a simple file for testing

import pandas as pd
import numpy as np

parquet_file_path = "test_data.parquet"

# Number of rows to generate
num_rows = 10**8 # 100M

# Generate the DataFrame
data = {
"user_id": np.arange(num_rows),
"value": np.random.randint(-10000, 10001, size=num_rows)
}
df = pd.DataFrame(data)

# Write the result to a Parquet file with 20 row groups (5M records per row group)
df.to_parquet(parquet_file_path, index=False, row_group_size=5 * 10**6)

The above script creates a single Parquet file sorted by user_id, consists of 20 row groups, each row group contains 5 millions rows. You can check the row groups metadata using pyarrow parquet module:

import pyarrow.parquet as pq

parquet_file = pq.ParquetFile(parquet_file_path)

for i in range(parquet_file.metadata.num_row_groups):
user_id_col_stats = parquet_file.metadata.row_group(i).column(0).statistics
print(f"row group: {i}, num of rows: {user_id_col_stats.num_values}, min: {user_id_col_stats.min}, max: {user_id_col_stats.max}")

And here is the output:

row group: 0, num of rows: 5000000, min: 0, max: 4999999
row group: 1, num of rows: 5000000, min: 5000000, max: 9999999
row group: 2, num of rows: 5000000, min: 10000000, max: 14999999
row group: 3, num of rows: 5000000, min: 15000000, max: 19999999
row group: 4, num of rows: 5000000, min: 20000000, max: 24999999
row group: 5, num of rows: 5000000, min: 25000000, max: 29999999
row group: 6, num of rows: 5000000, min: 30000000, max: 34999999
row group: 7, num of rows: 5000000, min: 35000000, max: 39999999
row group: 8, num of rows: 5000000, min: 40000000, max: 44999999
row group: 9, num of rows: 5000000, min: 45000000, max: 49999999
row group: 10, num of rows: 5000000, min: 50000000, max: 54999999
row group: 11, num of rows: 5000000, min: 55000000, max: 59999999
row group: 12, num of rows: 5000000, min: 60000000, max: 64999999
row group: 13, num of rows: 5000000, min: 65000000, max: 69999999
row group: 14, num of rows: 5000000, min: 70000000, max: 74999999
row group: 15, num of rows: 5000000, min: 75000000, max: 79999999
row group: 16, num of rows: 5000000, min: 80000000, max: 84999999
row group: 17, num of rows: 5000000, min: 85000000, max: 89999999
row group: 18, num of rows: 5000000, min: 90000000, max: 94999999
row group: 19, num of rows: 5000000, min: 95000000, max: 99999999

Querying the file using pandas

Photo by Kenny Eliason on Unsplash

Typically, when working with Parquet files in pandas, the common approach involves loading the data into a pandas DataFrame and then performing the necessary data processing tasks on the loaded DataFrame:

%%time

pd.read_parquet(parquet_file_path).query("user_id == 8767068")

CPU times: user 4.07 s, sys: 12.5 s, total: 16.6 s
Wall time: 31.6 s

> {'user_id': {8767068: 8767068}, 'value': {8767068: 5893}}

As observed, the process of scanning and loading the entire Parquet file into a pandas DataFrame, followed by filtering to obtain matched rows, took approximately 16 seconds. However, pandas does not inherently pushdown filters to the Parquet engine, and it does not fully utilize the benefits of Parquet metadata.

To optimize the process, we can manually pushdown our filters to the pyarrow engine, by adding a predicate filter to the reading method, leveraging the file metadata and pyarrow’s processing optimizations. By doing so, we can significantly improve the query performance and reduce the processing time for data filtering and retrieval:

%%time

pd.read_parquet(parquet_file_path, filters=[("user_id", "=", 8767068)]).to_dict()

CPU times: user 132 ms, sys: 120 ms, total: 252 ms
Wall time: 868 ms

> {'user_id': {0: 8767068}, 'value': {0: 5893}}

In the last query, locating the desired row took only 252 ms, which is an impressive 65 times faster than the previous approach. When comparing the row index between the two queries, we observe that the second query with pyarrow returned a DataFrame containing only the specific row with an index of 0, that’s because when we provide a filter predicate, the rows which do not match this filter will be removed from scanned data before transforming it to DataFrame. This not only makes the retrieval significantly faster but also results in more efficient utilization of system resources due to the reduced data processing and memory overhead.

What about not sorted columns?

Pyarrow follows a specific process for data retrieval. Initially, it reads the footer metadata (file metadata) to determine the data’s range, denoted as (file_min, file_max). If the desired value falls outside this range, Pyarrow swiftly skips reading the file and returns an empty result. On the other hand, if the value lies within the range, Pyarrow proceeds to iterate through the row groups. For each row group, it first reads the column statistics and checks whether the value falls within the range of values for that row group, denoted as (row_group_min, row_group_max). Based on this check, Pyarrow decides whether to scan the entire row group or skip it altogether.

It’s essential to note that the second query’s performance is not solely attributed to this process. Other factors contributing to the fast query execution include Pyarrow’s overall efficiency and the small dataset size. In cases where the dataset is relatively small, there is no need to deserialize a large Pyarrow result and load it into a Pandas dataframe, further enhancing the query performance.

To illustrate this, we can conduct a filtering experiment on our dataset using the second column, which is not sorted. This process will showcase the efficiency and capability of Pyarrow in handling unsorted data. By applying the filtering operation on the unsorted column, we can observe how Pyarrow optimizes the query and efficiently retrieves the required information, showcasing its powerful performance even with unsorted data:

for i in range(parquet_file.metadata.num_row_groups):
user_id_col_stats = parquet_file.metadata.row_group(i).column(1).statistics
print(f"row group: {i}, num of rows: {user_id_col_stats.num_values}, min: {user_id_col_stats.min}, max: {user_id_col_stats.max}")

Result:
row group: 0, num of rows: 5000000, min: -10000, max: 10000
row group: 1, num of rows: 5000000, min: -10000, max: 10000
row group: 2, num of rows: 5000000, min: -10000, max: 10000
row group: 3, num of rows: 5000000, min: -10000, max: 10000
row group: 4, num of rows: 5000000, min: -10000, max: 10000
row group: 5, num of rows: 5000000, min: -10000, max: 10000
row group: 6, num of rows: 5000000, min: -10000, max: 10000
row group: 7, num of rows: 5000000, min: -10000, max: 10000
row group: 8, num of rows: 5000000, min: -10000, max: 10000
row group: 9, num of rows: 5000000, min: -10000, max: 10000
row group: 10, num of rows: 5000000, min: -10000, max: 10000
row group: 11, num of rows: 5000000, min: -10000, max: 10000
row group: 12, num of rows: 5000000, min: -10000, max: 10000
row group: 13, num of rows: 5000000, min: -10000, max: 10000
row group: 14, num of rows: 5000000, min: -10000, max: 10000
row group: 15, num of rows: 5000000, min: -10000, max: 10000
row group: 16, num of rows: 5000000, min: -10000, max: 10000
row group: 17, num of rows: 5000000, min: -10000, max: 10000
row group: 18, num of rows: 5000000, min: -10000, max: 10000
row group: 19, num of rows: 5000000, min: -10000, max: 10000

After examining the statistics of the second column, we discovered that all the row groups share the same minimum and maximum values. As a result, querying the file using any value within this range will not yield any advantage from utilizing these statistics:

%%time
# apply the filter on the dataframe
pd.read_parquet(parquet_file_path).query("value == 6666").count().to_dict()

Result:
CPU times: user 3.8 s, sys: 10.7 s, total: 14.5 s
Wall time: 26.4 s
{'user_id': 4994, 'value': 4994}

%%time
# the filter to pyarrow
pd.read_parquet(parquet_file_path, filters=[("value", "=", 6666)]).count().to_dict()

Result:

CPU times: user 3.02 s, sys: 1.28 s, total: 4.31 s
Wall time: 6.28 s
{'user_id': 4994, 'value': 4994}

Even without benefiting from the metadata, we observed a significant improvement in query performance by adding the predicate filter to Pyarrow. In fact, this approach resulted in the query being executed approximately three times faster compared to not pushing down the filter. This highlights the efficiency and effectiveness of Pyarrow in handling filters directly, showcasing its ability to enhance query speed even in scenarios where metadata statistics may not be fully exploited.

Filtering None values

Most of the time, before processing our dataset, it is essential to filter out the None values. To achieve this, we typically load the file into a dataframe and then utilize the dropna method to eliminate the rows with None values. However, there is currently an issue with the pyarrow reader, as it does not yet support filtering None values directly. Nevertheless, there is a clever workaround we can employ.

In the following code, I demonstrate how to create a Parquet file with 2 columns, where approximately 50% of the values in the first column are set to None:

parquet_file_path = "another_test.parquet"

# Set the number of rows for the DataFrame
num_rows = 10**8 # 100M

# Generate random data for the second column
second_column_data = np.random.rand(num_rows)

# Create a mask to set 50% of the first column to None
mask = np.random.rand(num_rows) < 0.5
first_column_data = np.where(mask, None, np.random.rand(num_rows))

# Create the DataFrame
data = {"Column1": first_column_data, "Column2": second_column_data}
df = pd.DataFrame(data)

# Write the result to a Parquet file with 20 row groups (5M records per row group)
df.to_parquet(parquet_file_path, index=False, row_group_size=5 * 10**6)

Loading the whole file and filtering the None values using pandas requires more than 20 seconds (~8s with dropping the None values):

%%time

pd.read_parquet(parquet_file_path).sum().to_dict()

Result:
CPU times: user 3.96 s, sys: 3.78 s, total: 7.74 s
Wall time: 8.69 s
{'Column1': 24999424.127602533, 'Column2': 50000578.561534435}

%%time

pd.read_parquet(parquet_file_path).dropna(subset=["Column1"]).sum().to_dict()

Result:
CPU times: user 4.99 s, sys: 15.5 s, total: 20.4 s
Wall time: 46.7 s
{'Column1': 24999424.127602667, 'Column2': 25001477.173362922}

To enable pyarrow to filter None values, we can utilize the filter ≤ MAX_VALUEor ≥ MIN_VALUE. In doing so, any rows with a None value in the filtered column will be excluded from the loaded dataset:

%%time

# Column1 type is DOUBLE, so max value is 2**53
pd.read_parquet(parquet_file_path, filters=[("Column1", "<=", 2**53)]).sum().to_dict()

Result:
CPU times: user 4.44 s, sys: 2.2 s, total: 6.64 s
Wall time: 5.83 s
{'Column1': 24999424.127602667, 'Column2': 25001477.173362922}

As observed, the result is obtained in just 6.64 seconds, which is 3 times faster than using the pandas dropna method.

Summary

In conclusion, when working with Parquet files using pandas, it is crucial to leverage the power of pushing down the predicate filter to PyArrow to optimize performance and reduce memory usage. By utilizing PyArrow’s support for various filters such as =, ==, !=, <, >, <=, >=, in, and not in, we can efficiently add predicate filters to the Parquet metadata. This optimization allows us to load only the relevant data into memory, minimizing unnecessary reads and significantly improving overall processing speed. Embracing predicate filter in PyArrow ensures that we harness the full benefits of Parquet’s metadata-driven storage, making data analysis and manipulation more efficient and scalable.

--

--

Hussein Awala
Tech Blog

Sr. Data Engineer @Voodoo & Apache Airflow Committer & PMC member