Mastering Accumulators in Apache Spark and not screwing yourself in the process

Rishi Arora
4 min readJun 19, 2023

Apache Spark is a powerful distributed computing framework known for its ability to process large-scale data efficiently. Among its features, accumulators play a crucial role in aggregating information and collecting metrics during Spark job execution.

Accumulators are shared variables that enable the accumulation of values across worker nodes in a distributed environment. They are primarily used for aggregating data or tracking global statistics during computation. Unlike regular variables, accumulators can be updated by worker nodes but can only be read by the driver program. This makes them an invaluable tool for collecting and processing essential information across a cluster.

Code Example 1: Calculating Word Count (2 minutes):
Let’s start by examining a practical example that demonstrates the usage of accumulators to count the occurrences of words in a dataset:

scala
val spark = SparkSession.builder().appName(“WordCount”).getOrCreate()
val sc = spark.sparkContextval

wordCount = sc.longAccumulator(“wordCount”)val data = sc.textFile(“input.txt”)data.foreach(line => {
val words = line.split(“ “)
wordCount.add(words.length)
})
println(“Total words: “ + wordCount.value)

Here, we create a long accumulator named “wordCount” and initialize it to zero. Next, we load the data from a text file and iterate over each line. For each line, we split it into individual words and increment the accumulator by the number of words in that line. Finally, we retrieve the accumulated value using the `value` property.

Here we use accumulators to gather some statistics for a word count program.

val spark = SparkSession.builder().appName(“WordCountWithStats”).getOrCreate()
val sc = spark.sparkContextval wordCount = sc.longAccumulator(“wordCount”)
val totalLines = sc.longAccumulator(“totalLines”)
val emptyLines = sc.longAccumulator(“emptyLines”)

val data = sc.textFile("input.txt")
data.foreach(line => {
totalLines.add(1)
if (line.trim.isEmpty)
emptyLines.add(1)
else {
val words = line.split(" ")
wordCount.add(words.length)
}
})
println("Total words: " + wordCount.value)
println("Total lines: " + totalLines.value)
println("Empty lines: " + emptyLines.value)
println("Average words per line: " + wordCount.value.toDouble / totalLines.value)
println("Percentage of empty lines: " + (emptyLines.value.toDouble / totalLines.value) * 100 + "%")

Real-life Example: Monitoring Data Quality

Accumulators can be beneficial in real-life scenarios, such as monitoring data quality during ETL (Extract, Transform, Load) processes.

Let’s consider a situation where we are processing customer data, and we want to monitor the number of invalid email addresses encountered:

val spark = SparkSession.builder().appName("DataQuality").getOrCreate()
val sc = spark.sparkContext

val invalidEmails = sc.longAccumulator("invalidEmails")
val customerData = spark.read.csv("customer_data.csv")
customerData.foreach(row => {
val email = row.getString(2)
if (!isValidEmail(email))
invalidEmails.add(1)
})
println("Total invalid emails: " + invalidEmails.value)
def isValidEmail(email: String): Boolean = {
// Perform email validation logic here
}

In this example, we create an accumulator named “invalidEmails” to track the number of invalid email addresses. We read customer data from a CSV file and iterate over each row. For each row, we extract the email address from the specified column (in this case, column 2) and check its validity using a custom isValidEmail function. If the email is deemed invalid, we increment the "invalidEmails" accumulator by 1. Finally, we print the total count of invalid emails.

Rare Use Case: Distributed Unique Identifier Generation :

Accumulators can also be used in rare and unconventional scenarios. For instance, consider a distributed system where you need to generate unique identifiers across multiple worker nodes:

val spark = SparkSession.builder().appName("UniqueIdGenerator").getOrCreate()
val sc = spark.sparkContext
val uniqueIdPrefix = "ID"

val uniqueIds = sc.collectionAccumulator[String]("uniqueIds")
val data = sc.parallelize(1 to 100)

data.foreach(_ => {
val uniqueId = generateUniqueId()
uniqueIds.add(uniqueId)
})

println("Total unique IDs generated: " + uniqueIds.value.length)

def generateUniqueId(): String = {
val uniqueNumber = scala.util.Random.nextInt(1000)
uniqueIdPrefix + uniqueNumber
}

In this example, we create a collection accumulator named “uniqueIds” to collect unique identifiers generated across worker nodes. We parallelize a range of numbers (1 to 100) as the input data. For each element in the dataset, we invoke the generateUniqueId function to create a unique identifier by appending a random number to the specified prefix. We then add the generated unique identifier to the "uniqueIds" accumulator. Finally, we print the total count of unique IDs generated.

When NOT to use accumulators

While accumulators are powerful, they can lead to unexpected results if used incorrectly. One common antipattern is attempting to access the accumulator’s value within a transformation operation, such as `map` or `filter`. Due to Spark’s lazy evaluation, this can result in incorrect or inconsistent values.

Incorrect Usage of Accumulators

scala
val spark = SparkSession.builder().appName(“IncorrectUsage”).getOrCreate()
val sc = spark.sparkContext

val wordCount = sc.longAccumulator(“wordCount”)
val data = sc.textFile(“input.txt”)

data.map(line => {
val words = line.split(“ “)
wordCount.add(words.length)
line
}).count()

println(“Total words: “ + wordCount.value)

In this example, we attempt to increment the accumulator within the `map` operation. However, due to Spark’s lazy evaluation, the `count` operation is executed before the `map` operation, resulting in an incorrect value for `wordCount`.

Accumulators are a powerful feature in Apache Spark, facilitating the collection of global statistics and metrics during distributed computations. Understanding how to leverage accumulators correctly is essential for accurate data processing. By using accumulators in actions rather than transformations, you can avoid common pitfalls and maximize the potential of Spark’s distributed computing capabilities.

Here, we covered the basics of accumulators, provided a code example showcasing their usage, and highlighted an antipattern to avoid. Armed with this knowledge, you can confidently incorporate accumulators into your Spark workflows and achieve more efficient and accurate data processing.

--

--