Knut O. Hellan
Sep 7, 2018 · 3 min read

We have recently written a Spark job that reads a TSV file, modifies each row and writes the results to HBase. The original plan was to use withColumn to do this modification. Sadly, there seems to be a limit to the number of withColumn operations one can do to dataframes or maybe the limit is in complexity of the operation that should be put in the new column. We ended up using map and operating on each single row.

inputDF
.na.fill(“”)
.withColumn(“a”, lit(“”))
.withColumn(“b”, lit(“”))
.withColumn(“c”, lit(“”))
.withColumn(“d”, lit(“”))
.withColumn(“e”, lit(“”))
.map(row => extractRow(row))(encoder)

inputDF is the Dataset[Row] as read from a file. The addition of empty columns is done to ensure the Row has the right length. Now come the extractRow function:

def extractRow(row: Row): Row = {
val inputSeq = row.toSeq
val newFields = Seq(a, b, c, d, e)
Row.fromSeq(inputSeq.take(inputSeq.length — newFields.length) ++ newFields)
}

This feels awkward and brittle so we decided to try to create a Row from a Map. There is no direct way of doing so. However, one may retrieve the first Row of a Dataset and Dataset[Row] may be created from a case class so here goes:

case class MyRow(a: String, b: String, c: String, d: String, e: String)

And then instantiating that case class into newRow, create a Dataset and retrieve the first Row:

val newRow = MyRow(a = a, b = b, c = c, d = d, e = e)
spark.createDataFrame(Seq(newRow)).first()

Great, now each field in the Row is named so it’s not so brittle. I admit that creating a dataframe (Dataset[Row]) just to get a single Row smells like overhead, but how much overhead? Note that this needs a SparkSession in order to create the Dataset. The main SparkSession in the job lives in the driver and is not directly available in the executor where the Row processing is done. There are two solution to this. First, to broadcast the SparkSession. This must be done prior to running the map on inputDF, like this (note that we no longer need to create empty placeholder columns):

val broadcastSparkSession = spark.sparkContext.broadcast(spark)
inputDF
.na.fill(“”)
.map(row => extractRow(broadcastSparkSession.value, row))(encoder)

And extractRow needs to receive broadcastSparkSession:

def extractRow(spark: SparkSession, row: Row): Row = {
val newRow = MyRow(a = a, b = b, c = c, d = d, e = e)
spark.createDataFrame(Seq(newRow)).first()
}

The other alternative is to create a SparkSession in the executor if one does not exist (note that the broadcast variable is no longer needed):

inputDF
.na.fill(“”)
.map(row => extractRow(broadcastSparkSession.value, row))(encoder)

extractRow needs to reuse or create it’s own And extractRow needs to receive broadcastSparkSession:

def extractRow(row: Row): Row = {
val spark = SparkSession.builder.getOrCreate()
val newRow = MyRow(a = a, b = b, c = c, d = d, e = e)
spark.createDataFrame(Seq(newRow)).first()
}

I benchmarked these approaches on a MacBook Pro 15" (2015) with two input files. One with 100 rows and one with 1000 rows. This is a tiny sample set. In my benchmark, each row of the input has about 40 fields and the output Row of extractRow has 19 fields. I would expect the Dataset[Row] examples to have slightly more overhead which could potentially be most noticable on the smaller dataset. Here’s what I found:

None of the dataframe alternatives scale. Interestingly, the median of my 1000 row sequences was actually lower than for 100. Probably an artifact of some outlier in one of those. More seriously, while the dataframe approach is slightly slower for 100 rows, it is much slower for 1000 rows. I also ran once with 10000 rows and found that the Seq-based approach ran at 5seconds and 668 milliseconds while the dataframe approach needed 2 minutes and 19 seconds.

In conclusion, while creating Rows from sequences of field values is brittle, the alternative is too slow to be viable. The input file we would run this on has about 200 million rows and the difference in run time for these would be very noticeable. Now, I should probably contribute functionality for creating a Row from a Map to Spark. I might do that, but I have a couple of other things on my todo such as updating the Redis support in frontera which I have committed to when I find time.

Knut O. Hellan

Written by

I’m CTO of Companybook, the AI Growth Engine. In the past I have been an engineering manager in Google, co-founded a startup and been acquired by Zedge.

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade