Faster transforming datasets with Spark Internal Row

Dilip Kasana
Airtel Digital
Published in
2 min readJul 1, 2020

While working on large dataset our goal is to work with best possible efficiency using optimized code and save cpu as much as possible.

In Our one application we process 150TB’s data every day and many times we need to transform the data beyonds the SQL’s limits.

While working over spark’s Datasets in java/scala beyond the SQL’s limit we have to use map /mapPartition API’s of dataframe and to perform other operation like join/filter/write/read/zipPartition in custom ways.

It has given 10x faster performance over default mapPartition, saved 90% CPU cost using InternalRow as shown in below code.

The Internal row’s implantation for Dataset internal operations is of Unsafe Row.An Unsafe Row implementation of Row is backed by raw bytes instead of Java objects where Each tuple has three parts: [null bit set] [values] [variable length portion].

Instead of using the default mapPartition implementation over dataset we first got the Dataset of Internal Row by calling below method and defined mapPartitionsInternalInternalRow API which is used for faster access :

def mapPartitionsInternalInternalRow(dataRDD: Dataset[Row], f: FlatMapFunction[JIterator[InternalRow], InternalRow]): RDD[InternalRow] = {
def fn: (Iterator[InternalRow]) => Iterator[InternalRow] = {
(x: Iterator[InternalRow]) => f.call(x.asJava).asScala
} dataRDD.queryExecution.toRdd.mapPartitionsInternal(fn, false)
}RDD<InternalRow> internalRowRDD = parquet.queryExecution().toRdd();

This is the Internal representation of the Dataset, which are the raw bytes of data without schema. Internal Row is an abstract class for row used internally in Spark SQL, which only contains the columns as internal types.

This is the general code snippet used for mapPartition people use over a spark Dataset :

long count = parquet.mapPartitions(new MapPartitionsFunction<Row, Row>() {
@Override
public Iterator<Row> call(Iterator<Row> input) throws Exception {
return new Iterator<Row>() {
@Override
public boolean hasNext() {
return input.hasNext();
}
@Override
public Row next() {
return input.next();
}
};
}
}, RowEncoder.apply(parquet.schema())).count();
}

We have used below code snippets.

long count = mapPartitionsInternalInternalRow(parquet, new FlatMapFunction<Iterator<InternalRow>, InternalRow>() {
@Override
public Iterator<InternalRow> call(Iterator<InternalRow> internalRowIterator) throws Exception {
return new Iterator<InternalRow>() {
@Override
public boolean hasNext() {
return internalRowIterator.hasNext();
}
@Override
public InternalRow next() {
return internalRowIterator.next();
}
};
}
}).count();

This above code snippets has been tested on real data of 200 MB of parquet data having almost 400 columns and 1 million rows where in the below spark UI run 3 Iterations with Row and next 3 Iterations are for optimized InternalRow.The first one is for caching the data.

The Optimized Iterations are performed in 3 Seconds compare to 30 Seconds.

--

--