Partitioning Polars DataFrame on S3 with Apache Arrow

Matteo Arellano
9 min readSep 27, 2023

--

This entire research project on Apache Arrow began when I wanted to efficiently partition a large Polars data table in AWS S3, but the function was not yet implemented. I then discovered that Apache Arrow can read Polars DataFrames and partition them, as well as read partitions from S3 folders and convert the type to Polars DataFrame in a highly efficient way. First, I will show you the solution to the problem and then we will talk a little about Apache Arrow and its theoretical implications for the development of the data science field. To partition a data table in Polars on S3, we can run the following code:

import polars as pl
import boto3
import pyarrow

# Sample Data
data = {
"date": ["2023-01-01", "2023-01-02", "2023-01-03", "2023-01-04"],
"value": [10, 20, 30, 40],
"name": ["Alice", "Bob", "Charlie", "David"]
}

# Create Polars Table
df = pl.DataFrame(data)

# Convert polars DataFrame into Arrow Table and write it into S3
pyarrow.dataset.write_dataset(df.to_arrow(),
's3://mybucket/data',
format='parquet',
partitioning=['date'])

An extremely useful feature of PyArrow and Polars is the ability to scan cloud or partitioned datasets via their pl.scan_pyarrow_dataset() method. This method can be used to interact efficiently with large volumes of data distributed across multiple locations or partitions. For example, if you have a dataset stored in an AWS S3 bucket and distributed across several partitions, PyArrow can scan, read, and load the data directly into memory without needing to download the full files. This means you can perform operations on your data quickly and efficiently, even when the data is stored remotely and is of large volume. Additionally, the scan function in PyArrow also allows you to filter data at the time of reading, which can help reduce the amount of data you need to load into memory and further optimize your data processing operations.

import polars as pl
import pyarrow.dataset as ds

# Read partitioned content from S3 (i.e. many files)
s3_loc = 's3://mybucket/data/year=2020/category=1'
myds = ds.dataset(s3_loc)

# Evaluate DataFrame lazily from many files
df = pl.scan_pyarrow_dataset(myds).collect()

Returning to our initial example where we created partitions, in the PyArrow documentation, we can see this function pyarrow.dataset.write_dataset :

Source: Documentation

Let’s delve further into the parameters you’ve highlighted from the function.

1. `base_dir`:

  • What: The root directory where the dataset will be written.
  • How to Use: Provide a string path to the desired directory.
  • Why it’s Useful: It specifies the location where your dataset will be saved, allowing for organized storage and easy retrieval.

2. `basename_template`:

  • What: A template string to generate basenames of the written data files.
  • How to Use: Use the token `{i}` in the string, which will be replaced with an auto-incremented integer. If not provided, it defaults to `part-{i}.` + `format.default_extname`.
  • Why it’s Useful: Allows for custom naming conventions for the output files, aiding in data organization and identification.

3. `format`:

  • What: The format in which the dataset will be written.
  • How to Use: Supported values include “parquet”, “ipc”/”arrow”/”feather”, and “csv”.
  • Why it’s Useful: Different formats have different advantages in terms of compression, interoperability, and analytics capabilities. This parameter lets you choose the most suitable one for your use case.

4. `partitioning`:

  • What: The partitioning scheme for the dataset.
  • How to Use: You can specify a partitioning scheme using the `partitioning()` function or provide a list of field names for directory-based partitioning.
  • Why it’s Useful: Partitioning can significantly improve query performance by reducing the amount of data that needs to be read. It organizes data into separate directories or files based on column values.

5. `schema`:

  • What: The schema of the data being written.
  • How to Use: Provide a `Schema` object that defines the structure of your data.
  • Why it’s Useful: Ensures data consistency and integrity. Especially useful when writing from an iterable source where the schema might not be implicitly defined.

6. `max_rows_per_file`:

  • What: The maximum number of rows per output file.
  • How to Use: Provide an integer value.
  • Why it’s Useful: Helps in controlling the size of individual files. Useful for managing large datasets by breaking them into smaller, more manageable files.

7. `create_dir`:

  • What: Determines whether directories should be created.
  • How to Use: Set to `True` (default) to create directories or `False` to avoid creating them.
  • Why it’s Useful: Some filesystems might not require directories. By setting this to `False`, you can write data without creating unnecessary directory structures.

8. `file_visitor`:

  • What: A function that’s called for each file created during the write process.
  • How to Use: Provide a function that takes a `WrittenFile` instance. This instance has a `path` attribute (string path to the created file) and a `metadata` attribute (Parquet metadata of the file).
  • Why it’s Useful: Allows for custom actions or logging for each written file. For example, you can collect all written file paths or gather metadata for further processing.

These parameters provide flexibility and control over how your dataset is written, stored, and organized. By understanding and utilizing them effectively, you can optimize your data storage and retrieval processes.

Below, I show an example using them:

import pyarrow as pa
import pyarrow.dataset as ds

# Sample data to be written
data = {
'date': ['2023–01–01', '2023–01–02', '2023–01–03', '2023–01–04'],
'value': [10, 20, 30, 40],
'city': ['NY', 'LA', 'NY', 'SF']
}
table = pa.table(data)

# Parameters for write_dataset
base_dir = '/path/to/output_directory' # Replace with your desired directory path
basename_template = 'output-part-{i}.parquet'
format = 'parquet'
partitioning = ['date', 'city'] # Partitioning by date and city
schema = table.schema
max_rows_per_file = 2 # Limiting to 2 rows per file for this example
# Function to log written files
written_files = []

def file_visitor(written_file):
written_files.append(written_file.path)

# Writing the dataset
ds.write_dataset(
data=table,
base_dir=base_dir,
basename_template=basename_template,
format=format,
partitioning=partitioning,
schema=schema,
max_rows_per_file=max_rows_per_file,
file_visitor=file_visitor
)
# Print the paths of written files
print(written_files)

This implementation provides significant solutions to data science workflows. First, the ability to partition large data tables on AWS S3 allows for a more efficient and manageable data analysis process. This facilitates the distribution and processing of large data sets. Secondly, the interoperability between Apache Arrow and Polars allows for more flexible data manipulation, which enhances the performance and efficiency of the workflow. Finally, the ability to automatically convert the data type to Polars DataFrame improves the quality of data analysis, as Polars is known for its fast and efficient performance for in-memory operations.

Polars

Did you know that… Polars DataFrame has some notable features?

Like Arrow, Polars also uses a columnar memory format, but it’s not the Arrow format. It is optimized for performance in specific scenarios, especially multithreaded or parallel operations. One of the distinctive features of Polars is its support for lazy evaluation. This means that operations on a DataFrame are not executed immediately, but are queued and executed in a single step, which can lead to performance optimizations. Additionally, Polars is designed to be multithreaded from its foundation, allowing operations to be parallelized and take advantage of modern multicore CPUs. All of this makes working with Polars a highly efficient and optimized experience.

What is Apache Arrow?

Apache Arrow is an open source project that aims to accelerate the performance of data analysis. It provides a standardized abstraction layer for data columns, allowing for efficient in-memory processing. This project is key to improving interoperability between different data processing systems, allowing smoother integration and faster analysis performance.

Parquet files are an optimized columnar storage format used in the Hadoop ecosystem to conduct high-performance data queries. They provide efficient compression and an evolving schema, along with efficient data reading through their columnar structure. Parquet is especially designed to work with complex and nested data, making it ideal for working with Apache Arrow.

Apache Arrow and Parquet are two projects that complement each other in the big data ecosystem. Parquet is an optimized columnar storage format for use with Apache Hadoop, offering high read performance. Apache Arrow, on the other hand, is a specification for in-memory storage. Arrow can read data from Parquet and convert it into its in-memory format, allowing for more efficient data processing. This interaction between Parquet and Arrow provides a large improvement in data analysis speed.

Next, we will discuss some of the key advantages of using PyArrow and the Arrow memory format in your data analysis workflow.

a) First of all, PyArrow provides native support for reading and writing Parquet files. Since both Parquet and Arrow are columnar formats, conversion between them is efficient. This results in greater effectiveness and performance when manipulating and processing large data sets. Additionally, Parquet’s columnar storage is especially beneficial for analysis workloads. When combined with Arrow’s in-memory columnar representation, a notable increase in query performance is achieved. This is because data partitioning reduces the amount of information to be read, allowing for faster, more efficient queries.

Column compression vs. per-row storage

The columnar format stores data from columns contiguously in memory and since each column has a uniform data type throughout the column and its data often repeats (especially for categories) it means that compression algorithms can work better. Also for extracting analytical queries from certain columns, the process is speeded up.

b) Another significant advantage of PyArrow is its interoperability between programming languages. The Arrow memory format is language-independent, which means that data in Arrow format can be generated in one language and consumed in another without the need for conversion. This proves particularly useful in multilingual environments or when integrating different systems that use different programming languages.

c) In addition to interoperability, the Arrow columnar memory format is designed for efficient data transfer. Its structure allows for fast serialization and deserialization, making it suitable for high-performance data transfer. When transferring data, for example, from a server to AWS S3, the Arrow format reduces serialization overhead and accelerates data transfers. Moreover, Arrow is also compatible with compression, helping to further reduce transfer times and storage costs.

Apache Arrow is a cross-language development platform for in-memory analytics

Data partitioning is a use case where Apache Arrow and Parquet files work seamlessly together. This process involves splitting large data files into smaller chunks. This allows us to read only the data we need in our data science workflows or load everything at once, but efficiently. With Arrow and Parquet, we can select and load only the necessary columns for analysis, significantly improving efficiency and performance. Moreover, if the data is optimally partitioned, we can leverage Arrow’s parallel and distributed processing capabilities to further accelerate queries and computations.

When we talk about interoperability between systems, Apache Arrow introduces an in-memory transport layer to facilitate efficient movement and representation of data in memory (RAM) across different systems or processes. The term “In-Memory Transport Layer” refers to a mechanism that enables the efficient movement and representation of data in memory between different systems or processes. Under this concept, “in memory” means that the data is stored in the system’s RAM, allowing faster access and manipulation compared to disk storage. “Transport layer” refers to the mechanism or protocol that enables data to be moved or communicated between different parts of a system or between different systems.

Apache Arrow implements this in-memory transport layer in the following way:

  • Standardized Memory Format: Apache Arrow defines a standardized memory format for columnar data, ensuring that in-memory data structures are consistent across different systems and languages.
  • Zero-Copy Deserialization: Traditional data transport often involves serialization (conversion of data into a format suitable for transfer) and then deserialization (conversion back). The Arrow memory format allows “zero-copy” reads, meaning data can be read directly without needing deserialization, improving performance.
  • Language-Independent: The Arrow memory format is designed to be used with multiple programming languages without conversion. This means that data in Arrow format can be generated in one language (e.g., C++) and consumed in another (e.g., Python) without the need for data conversion.
  • Inter-Process Communication (IPC): Arrow supports efficient IPC mechanisms, allowing data to be shared between processes quickly.

In essence, an in-memory transport layer like Apache Arrow’s ensures that data can be efficiently represented, accessed, and transferred in memory, offering significant performance benefits, especially in data-intensive applications.

To continue learning about this great project with Python, I recommend reading the documentation because of its immense potential.

Thank you,

Matteo Arellano

--

--

Matteo Arellano

Growth Mindset Human Being. Inspired to be a student and a teacher at the same time.