Leveraging custom accumulators in Apache Spark 2.x

shreya chakraborty
2 min readMay 22, 2018

--

But first, what are accumulators?

  • As the name suggests you can use accumulators for computing stats on the data that your job runs on alongside the normal execution of the job. They primarily constitute sums and counters.

You can define and register accumulators in your sparkContext as such:

val dAcc: DoubleAccumulator = sc.doubleAccumulator("dc")
val lAcc: LongAccumulator = sc.longAccumulator("lc")

Let us assume you have a records dataframe defined. Adding a value to the above is as simple as calling the add method whilst performing actions on the dataframe

recordsDataframe.map{ record =>
...
dAcc.add(1.0)
lAcc.add(1L)
...
}

Spark also gives you the ability to define collections as accumulators. For example you can define a list and keep accumulating elements, which get appended to the list every time to call the add() method:

val listAcc= sc.collectionAccumulator[List[String]]("name")

listAcc.add(List("some element"))

The new AccumulatorV2 API in spark enables you to define clean custom accumulators for stats for your job, which you would otherwise end up computing by running similar jobs/SQL queries on your output data.

For example, let us say you have the following user-defined map-type metric to be available post the job run:


case class
SampleOutputStat(stats: Map[String, Long] = Map()) extends Serializable {
// method to define logic for adding metric up during a transformation
def add(record: String): SampleOutputStat = {
val existingCount = stats.getOrElse(record, 0L)
this.copy(stats = stats.filterKeys{key: String => !key.equals(record)} ++ Map(record -> (existingCount + 1L)))
}
// method to define logic for merging two metric instances during an action
def merge(other: SampleOutputStat) = {
this.copy(hostWiseStats = mergeMaps(left.stats, right.stats))
}

private def mergeMaps(l: Map[String, Long], r: Map[String, Long]): Map[String, Long] = {
(l.keySet union r.keySet).map { key =>
key -> (l.getOrElse(key, 0L) + r.getOrElse(key, 0L))
}.toMap
}
}

Now define your custom accumulator class as follows where it expects records of type String and consequently outputs a SampleOutputStat object which keeps track of the derived metrics you care about:

//Accumulator class
class
CustomAccumulator(var validationAccumulators: SampleOutputStat) extends AccumulatorV2[String, SampleOutputStat] {

def reset(): Unit = {
validationAccumulators = new SampleOutputStat()
}

def add(input: String): Unit = {
validationAccumulators = validationAccumulators.add(input)
}

def value: SampleOutputStat = {
validationAccumulators
}

def isZero: Boolean = {
validationAccumulators.isEmpty
}

def copy(): CustomAccumulator = {
new CustomAccumulator(validationAccumulators)
}

def merge(other: AccumulatorV2[String, SampleOutputStat]) = {
validationAccumulators = validationAccumulators.merge(other.value)
}

}

You will need to define the functionality for all the abstract methods as are defined in the base class, which would in turn be used while aggregating the metric value in the accumulator.

And that’s it!! Now this accumulator will behave in the same way as the primitive accumulators as you have defined before. Just register them accordingly with your sparkContext as

sparkContext.register(<instance of CustomAccumulator>, “name”)

and the associated metrics shall flow through the stages of your job, as in the case of a primitive type accumulator

--

--