How to Optimize Read CSV in Apache spark

Tarun Kumar
Airtel Digital
Published in
3 min readJul 1, 2020

Reading CSV is very common practice in big data. Normally we use spark api to read csv.

Using below method you can read csv 30% faster in spark.

e.g. spark.read().csv(“path”)

In one of our application we were reading and processing 150 Tera bytes of raw csv files daily. In order to improve performance of pipeline, we started with read process.

On doing analysis of how much time is spent in various java methods’ got this result.

Most of time was spent on converting bytes to String and vice versa and trim methods. Spark’s internal catalyst engine uses, InternalRow format. Read CSV method was converting bytes from internal row format to string , splitting, trimming and again converting to bytes format.

We could eliminate this all together. We read text format and did all parsing and trimming in Internal Row Format. We didn’t call String split and String trim methods as they are very heavy. You can clearly see, now most of time is spent just for doing some internal

This resulted in 30% read time save in spark.

Code is attached below. You need to read spark text read and call this class in map partition to iterate on each map partition and split and read as csv format.


public class MapPartitionInternalRow implements Serializable
{
public static RDD<InternalRow> execute(Dataset<Row> sql,
int schemaLength,
final char splitchar,
Work inputRowCountAccumulator) {
RDD<InternalRow> internalRowRDD = MapPartitionPerformanceUtil
.mapPartitionsInternalInternalRow(sql,
new FlatMapFunction<Iterator<InternalRow>,
InternalRow>() {
@Override
public Iterator<InternalRow> call(
Iterator<InternalRow> internalRowIterator)
throws Exception {
return new InternalRowIterator(internalRowIterator,
schemaLength,splitchar,
inputRowCountAccumulator );
}
});
return internalRowRDD;
}
private static int splitBySingleChar(final byte[] s,
final char splitChar,
int size,
UnsafeRowWriter writer) {
final int length = s.length;
int numField = 0;
int offset = 0;
int count = 0;
int index = 0;
if(length==0){
return 0;
}
for (int i = 0; i < length; i++) {
if (s[i] == splitChar) {
trim(index,s,offset,count,writer);
//writer.write(index,s,offset,count);
numField++;
index++;
offset = i + 1;
count = 0;
} else {
count++;
}
}
if (count > 0 || (count == 0 && s[length - 1] == splitChar)) {
trim(index,s,offset,count,writer);
numField++;
}
if(numField == size-1){
writer.setNullAt(index+1);
numField++;
}
return numField;
}
private static boolean isSpace(byte b){
return b==' ';
}
private static void trim(int index,
byte[] bytes,
int start,
int length,
UnsafeRowWriter writer){
int end = start+length-1;
while (start <= end && isSpace(bytes[start])){
start++;
}
while (start <= end && isSpace(bytes[end])){
end--;
}
if(start>end){
writer.setNullAt(index);
}else{
writer.write(index,bytes,start,end-start+1);
}
}static class InternalRowIterator implements Iterator {
private UnsafeRowWriter unsafeRowWriter ;
private Iterator<InternalRow> internalRowIterator;
private final int schemaLength;
private final char splitChar;
private Work work;
private boolean enableWork = false;
public InternalRowIterator(Iterator<InternalRow> internalRowIterator,
int schemaLength,
char splitChar,
Work work){
this.schemaLength = schemaLength;
this.splitChar = splitChar;
unsafeRowWriter = new UnsafeRowWriter(schemaLength, 120);
this.internalRowIterator = internalRowIterator;
this.work = work;
TaskContext taskContext = TaskContext.get();
String localProperty = taskContext.getLocalProperty(Constants.enableMeta);
if(Boolean.parseBoolean(localProperty) ){
enableWork = true;
}
if(work==null){
enableWork=false;
}
}InternalRow row;@Override
public boolean hasNext() {
if(row!=null){
return true;
}else {
while( internalRowIterator.hasNext()){
InternalRow next=internalRowIterator.next();
if(enableWork){
work.doWork(next);
}
unsafeRowWriter.reset();
unsafeRowWriter.zeroOutNullBytes();
UnsafeRow r=(UnsafeRow) next;
//String str=next.getString(0);
byte arr[] = next.getUTF8String(0).getBytes();//str.getBytes();
int numFields = splitBySingleChar(arr, splitChar, schemaLength, unsafeRowWriter);
row = unsafeRowWriter.getRow();
if(numFields == schemaLength){
return true;
}else {
System.out.println("Invalid Row :" + next.getUTF8String(0)+" "+System.currentTimeMillis());
}
}
if(enableWork){
work.markComplete();
}
return false;
}
}
@Override
public InternalRow next() {
InternalRow temp = row;
row = null;
return temp;
}
}
}

--

--