Data Engineering With Rust And Apache Arrow DataFusion 3/4 — Loading and Processing Data

MatthieuL
8 min readAug 21, 2022

--

Welcome to the third part of my article series, “Data Engineering With Rust And Apache Arrow DataFusion.” Access the first part here.

In the previous article, I developed a simple command-line interface using Clap. This CLI can process end-user commands specifications for input/output files and provides a subcommand to configure a filter operation. These command-line configurations are then validated and aggregated in a Rust struct.

A simple app to load, transform and write CSV or Parquet files — Image by Author

In this article, I demonstrate a data transformation with a Filter operator, which takes a column and a value as CLI parameters. I implement the processing part with Apache Arrow DataFusion.

Apache Arrow DataFusion is a query execution engine that enables easy read, write, and transform data operations. It uses the Apache Arrow in-memory format allowing fast storage access and processing for data analytics.

The Apache Arrow in-memory format is standardized and available in several languages (e.g., C++, Rust, Julia, Python, etc.). It is deployed in major data engineering projects (e.g., Apache Spark and Apache Parquet) and accelerate the implementation of complex operations: e.g., Parquet read/write, data transformations, and data sharing between processes locally or over the network.

The Apache Arrow DataFusion project uses this ecosystem and allows fast design and implementation of data transformation pipelines. It provides a query engine written in Rust with extensive SQL Support and a DataFrame API. In addition, this framework can leverage multi-core threading by parallelizing queries on CSV or Parquet files.

Program Structure

In the previous article, I have defined the following program structure.

→ Rust «mdata_app/src/main.rs» =

use std::fs;
use std::num::ParseFloatError;
use std::path::{Path, PathBuf};
use std::str::ParseBoolError;

use clap::{ArgEnum, Parser, Subcommand};
use datafusion::arrow::datatypes::DataType;
use datafusion::error::DataFusionError;
use datafusion::prelude::*;
use env_logger::{Builder, Env};
use log::{debug, info, LevelFilter, trace};
use thiserror::Error;

(1)
<<cli-args>>

(2)
<<structures>>

(3)
<<utilities>>

(4)
<<data-processing>>

(5)
<<program-main>>

(6)
<<unit-testing>>

In the following sections, I will focus on the implementation of the <<structures>> and <<data-processing>> code block.

Format and Error Management

Before diving into the data transformation code, I define two small enum utilities:

  • The WriteFormat enum specifies the file format handled by our application.
  • The MDataAppError enum is our error management type defining error cases.
→ Rust «structures» =

(1)
#[derive(Debug, PartialEq, Eq, PartialOrd, Ord, Clone, ArgEnum)]
enum WriteFormat {
Undefined = 0,
Csv = 1,
Parquet = 2,
}

(2)
#[derive(Error, Debug)]
enum MDataAppError {
#[error("invalid path encoding {path:?}")]
PathEncoding { path: PathBuf },
#[error("invalid input format {path:?}")]
InputFormat { path: PathBuf },
#[error("invalid output format {format:?}")]
OutputFormat { format: WriteFormat },
#[error("invalid filter value {error_message:?}")]
FilterValue { error_message: String },
#[error("transparent")]
DataFusionOp(#[from] DataFusionError),
}

(3)
impl From<ParseBoolError> for MDataAppError {
fn from(e: ParseBoolError) -> Self {
MDataAppError::FilterValue { error_message: e.to_string() }
}
}

impl From<ParseFloatError> for MDataAppError {
fn from(e: ParseFloatError) -> Self {
MDataAppError::FilterValue { error_message: e.to_string() }
}
}
  • (1): Three cases for the input/output files. Undefined is used to encode the case in which our program does not infer the file type. We use this value for the undefined input format. We infer the input format from the input's file name or directory format.
  • (2) : MDataAppError uses the thiserror crate providing the Error derive macro. With thiserror we quickly generate an error type with custom warning messages. The errors derived from this crate implement the std::error::Error trait.
  • (3): The program can trigger several error types. We define an automatic conversion rule from Parse[Bool/Float]Error to our custom type MDataAppError.

Error Management Focus

The Rust language does not implement an exception mechanism. Why? Exception systems enable complex error management by interrupting the control flow at any point. These exceptions can be caught and processed in many ways. But poorly used (e.g., a catch-all usage), they often produce a hard-to-read and maintain code (see, for example, [1] G. B. de Pádua and W. Shang, “Studying the relationship between exception handling practices and post-release defects”).

Rust differentiates between unrecoverable errors with the panic! macro and recoverable errors with the Result type. For unrecoverable errors, the program is stopped with the specified panic! error message. In the recoverable case, the error management in a Rust program must be implemented within the control flow of your program using return values.

How can we do it? A basic way is to use Option<T> type; in case of failure, you can return the caller a None value. But how to understand your function result? Is it an error or a particular case where this function returns nothing? With Option<T> you don't propagate any information about the error.

Error Management with Rust — Image by Author

A better option, Rust provides the std::Result<T, E> type for returning the function value of type T in case of success or an error of type E. With the ? operator, you can then propagate the error in your functions.

But how to manage a function that can return several internal error types and external error types (as in our example with DataFusionError)?

In this case, we implement an enumeration of error cases using the thiserror crate (2).

In the case of DataFusionError type, the following line allows a transparent forwarding of the error within our custom type.

#[error("transparent")]
DataFusionOp ( #[from] DataFusionError ),

Without this feature from the thiserror crate, we would need to implement the appropriate From type conversion.

impl From<DataFusionError> for MDataAppError {
fn from(e: DataFusionError) -> MDataAppError {
MDataAppError::DataFusionOp { error: e }
}
}

We only implement these conversions for string parsing errors Parse[Bool/Float]Error (3).

Main Processing Function

The data processing function mdata_app takes argument options opts as inputs and returns an empty result (with a potential error type). The processing layer involves three steps:

  • Initialization: create the Apache Arrow DataFusion execution context, and get the input and output path.
  • Infer the input format based on file name or directory contents. For this step, we use a custom utility function infer_file_type. Then, we register our inferred data source in the Apache Arrow DataFusion context.
  • Apply the transform operations and write the resulting file.
→ Rust «data-processing» =

(1)
async fn mdata_app(opts: MDataAppArgs) -> Result<(), MDataAppError> {
let ctx = SessionContext::new();
let input_path = get_os_path(&opts.input)?;
let output_path = get_os_path(&opts.output)?;

(2)
// the inferred format is returned
let inferred_input_format =
match infer_file_type(&opts.input, true) {
WriteFormat::Csv => {
(3)
ctx.register_csv("input", input_path, CsvReadOptions::new())
.await?;
WriteFormat::Csv
}
WriteFormat::Parquet => {
(3)
ctx.register_parquet("input", input_path,
ParquetReadOptions::default()).await?;
WriteFormat::Parquet
}
WriteFormat::Undefined => {
return Err(MDataAppError::InputFormat { path: opts.input })
;
}
};

(3)
<<data-transformation>>

Ok(())
}
  • (1) : the async keyword indicates an asynchronous function context.
  • (2): we call the function infer_file_type with our input_path argument. The inferred file type is based on the file extension or directory structure.
  • (3): we use the Apache Arrow DataFusion functions register_csv and register_parquet. These functions take as input the source name, path, and options. We register the data source in the Apache Arrow DataFusion query engine with its name; in our case "input". It enables us to access CSV or Parquet files in the following parts of the function using a generic approach.
  • (4): we finally apply the data load, transform, and write functions in the <<data-transformation>> block.

Apache Arrow DataFusion Operations

The Apache Arrow DataFusion library offers a “DataFrame-like” interface enabling easy and standard access to data operators. We load our initial registered function from our context ctx. We can then apply transformations to it. In this function, we use two transform operations limit and filter.

Final Data Transformation Pipeline — Image by Author
→ Rust «data-transformation» =

let df_input = ctx.table("input")?; (1)

// get the table schema
(2)
let schema = df_input.schema();

// print-it
if opts.schema {
info!("# Schema\n{:#?}", schema)
}

// process user filters
(3)
let df_flt =
if let Some(Filters::Eq { column: column_name, value }) = &opts.filter {
(4)
// get the data type of the filtered column
let filter_type = schema.field_with_name(None,
column_name)?.data_type();
(5)
// parse the filter value based on column type
let filter_value = match filter_type {
DataType::Boolean => lit(value.to_string().parse::<bool>()?),
DataType::Utf8 => lit(value.to_string()),
x if DataType::is_numeric(x) =>
lit(value.to_string().parse::<f64>()?),
_ => return Err(MDataAppError::FilterValue {
error_message: "invalid filter value".to_string()
})
};

(6)
// filter the current dataframe
df_input.filter(col(column_name).eq(filter_value))?
} else {
df_input
};

// if limit is active, only N first rows are written
let df_lmt = if opts.limit > 0 {
(7)
df_flt.limit(opts.limit)?
} else {
df_flt
};

// if the output format is undefined, default to input format
(8)
let output_format = match opts.format {
WriteFormat::Undefined => {
if inferred_input_format == WriteFormat::Undefined {
WriteFormat::Undefined
} else {
inferred_input_format
}
}
_ => opts.format.clone()
};

(9)
match output_format {
WriteFormat::Csv => df_lmt.write_csv(output_path).await?,
WriteFormat::Parquet => df_lmt.write_parquet(output_path, None).await?,
WriteFormat::Undefined => {
return Err(
MDataAppError::OutputFormat {
format: opts.format.clone(),
},
);
}
}
  • (1): we link a dataframe object to the previously mapped “input” file. With this object, we can call the other Apache Arrow DataFusion operators.
  • (2): with the schema operator, we access the inferred data schema used by the framework. I print it as debug information if the verbosity logging is enabled.
  • (3)-(6): if the end-user has specified a filter transform, we load and validate the parameters (3), get the column type from the schema information (4), extract and validate the filter value (5) and finally apply the filter operator with eq (equality) mode using the extracted end-user specification (6).
  • (7): we apply a simple limit operator, limiting the number of lines written.
  • (8)-(9): Finally, the output format is inferred or specified by the end-user (8). We write the final results in CSV or Parquet format (9).

We have finalized our data transformation function. Some remarks about this code:

  • I define all my data operations with the following code let df_xxx = <expression>. It shows that Rust is mainly an expression-based language: if, if let, match, ... code blocks return a value that we can assign to a variable. In the C/C++ languages, the if code block is a statement: it doesn't produce any value.
  • To each transformation is assigned a new variable df_xxx. It is optional, and you can also use variable shadowing in Rust. In this case, you define multiples let df = ... with the same variable name. Each new df variable shadows the previous one.

Wrap-up

That’s it! We have finalized a short and efficient function to process our data files. In this function, we have just a quick glimpse of the Apache Arrow DataFusion capabilities.

In this example, I have chosen the DataFrame API, but we can easily leverage the SQL language using the SQL API.

The final article will show how we can test this program using fake-generated datasets.

Stay tuned, and thanks for reading!

References

[1] G. B. de Pádua and W. Shang, “Studying the relationship between exception handling practices and post-release defects,” in Proceedings of the 15th international conference on mining software repositories, 2018, pp. 564–575.

--

--

MatthieuL

Matthieu Lagacherie | Computer Science Geek | AI Architect / Personal views on tech, programming and ML