Zipping multiple RDDs at once

I’m using H2O for some machine learning and, when I have to score multiple models with data sharing some features, I find it useful to represent the data in a columnar format and pass only the columns necessary for the scoring to the H2O model. This helps me in saving memory (as I don’t have to replicate the shared features) and allows me to parallelise the scoring, having multiple threads concurrently scoring a model using only a set of columns of my DataFrame. However, this produces a number of DataFrames containing the predicted values. Ideally you want to zip these single-columns DataFrame to the original DataFrame. It happens that there are multiple methods to zip RDDs in Spark (.zip and .zipPartitions), but all of them only allow a fixed number of RDDs to be zipped. This forces you to iteratively zip one RDD after the other, slowing down the process and producing errors. Just recently I had to zip hundreds of RDDs and I ended up with a StackOverflowError. This was because Spark recursively generates a new RDD processor when to invoke the zip method and, after you reach a certain boundary the stack goes over the limit. How do we solve this issue? Ideally we want to have a zip functionality allowing us to zip an list of RDDs at once. This would be much more space and speed efficient. So I did and this is my implementation:

package org.apache.spark.rdd

import org.apache.spark.{TaskContext, Partition, SparkContext}

import scala.reflect.ClassTag

/**
* Created by admin on 22/10/16.
*/
private[spark] class ZippedPartitionsList[T: ClassTag](
sc: SparkContext,
rdds: Seq[RDD[T]],
var f: (Seq[Iterator[T]]) => Iterator[T],
preservesPartitioning: Boolean = false)
extends ZippedPartitionsBaseRDD[T](sc, rdds, preservesPartitioning) {

override def compute(s: Partition, context: TaskContext): Iterator[T] = {
tryPrepareParents()
val partitions = s.asInstanceOf[ZippedPartitionsPartition].partitions
f(rdds.zipWithIndex.map(t => t._1.iterator(partitions(t._2), context)))
}

override def clearDependencies() {
super.clearDependencies()
f = null
}
}


object RddExt {
implicit class RddZipExt[T: ClassTag](rdd: RDD[T]) {

def zipPartitionsList(
rdds: Seq[RDD[T]],
preservesPartitioning: Boolean,
f: (Seq[Iterator[T]]) => Iterator[T]
) : RDD[T] = {
new ZippedPartitionsList[T](
rdd.context, Seq[RDD[T]](rdd) ++ rdds, rdd.context.clean(f), preservesPartitioning
)
}


}
}

The implementation needs to be part of the apache spark package since it is using some private classed and methods. The approach works really well and I achieved a speed that is orders of magnitude faster than zipping RDDs one by one. The following code shows how you zip an arbitrary number of DataFrames:

val zipped = originals.rdd.zipPartitionsList(
predictions.map(_.rdd), true,
s => {
val rows = new mutable.MutableList[Row]
while(s.map(_.hasNext).foldLeft(true)((a,b) => a && b)) {
val values = s.foldLeft(Seq[Object]())((a,b) => a ++ b.next().toSeq.asInstanceOf[Seq[Object]])
rows += SparseRowFactory.create(newSchema, values :_*)
}
rows.iterator
}
)