5 Tips for optimizing Spark Structured Streaming applications

Roey Shem Tov
7 min readAug 31, 2022

--

Developing a spark structured streaming application is not an easy job, but optimizing it is a whole different level…

In this article I’ll share 5 tips we found useful while developing and deploying Spark Structured Streaming application. Some of them are relevant for Spark batch applications as well. Any tip or configuration which I’ll write about in this article should be adjusted to your application and your infrastructure. It can take a while until you find the magic number of a configuration — which can help you to optimize your job.

Speculation

Apache Spark has the ‘speculative execution’ feature to handle the slow tasks in a stage due to environment issues like slow network, disk etc. If one task is running slowly in a stage, Spark driver can launch a speculation task for it on a different host. Between the regular task and its speculation task, Spark system will later take the result from the first successfully completed task and kill the slower one.(Credit to DataBricks for the explanation — you can find here great video that explain in depth about speculation)

Speculation in spark is configured by spark.speculation, which is by default false, and it got some related configuration which can be helpful.

spark.speculation.quantile(default 0.75) — Fraction of tasks which must be complete before speculation is enabled for a particular stage. We want to adjust this number to our application. Let’s take an example — In case we have 600 tasks in a stage, and during our performance investigation we find out that about 10 tasks are struggling each batch, the quantile will be ~0.98.

spark.speculation.multiplier(default 1.5) — How many times slower a task is than the median to be considered for speculation. When spark start to speculating tasks, it calculate the done tasks median runtime and use it for a threshold to start speculate running tasks. For Example — The median runtime of the tasks at the time the speculation takes effect is 1second, while the speculation multiplier is 2. Spark will speculate a running task after 2 seconds(calculatedMedianRunTime*speculationMultiplier).

Remember — Low value of those configurations will probably cause speculation to start too early which will cause unnecessary resources usage, On the other hand, high value will probably cause speculation to take effect too late and make degradation in our application performance, or in other words, increase our micro-batch duration.

You should be aware that speculation mechanism in Spark will not be helpful in a case of problematic tasks that their cause is Data Skew, Spill to disk, etc…
It will be more helpful in a case of a causes such as Network Issues/High Loaded Nodes, etc…

Notice: Launch a speculative task will not kill the original task, as soon as one of the tasks will be DONE the second will be KILLED.

Blacklisting

Blacklisting is a capability of spark to prevent tasks to be scheduled on executors which been blacklisted due too many tasks failures(configured by spark.blacklist.enabled which is false by default). The blacklisting algorithm can be control by various number of configurations related to the spark blacklist.

The need for blacklisting — Sometimes tasks can be launch on problematic executor/node. The definition for problematic executor can be network issue, disk issue, heavy load node, etc…
The above issues can make tasks that launched on a problematic executor fail, and we will want to avoid failing tasks, in case it happen repeatedly.

You can adjust the parameters regarding to blacklist according your application, but I found that the default values was fit to our case, We were only needed to enable two modes:

spark.blacklist.killBlacklistedExecutors (By default false) — allow Spark to automatically kill, and attempt to re-create, executors when they are blacklisted. Note that, when an entire node is added to the blacklist, all of the executors on that node will be killed. This parameter will allow spark to kill the executor itself in case of blacklisted — and therefor the blacklisted executor will not remain idle, but rather will be killed and spark will try to allocate new executor for your application

spark.blacklist.application.fetchFailure.enabled (By default false) — Spark will blacklist the executor immediately when a fetch failure happens. A Fetch Failure Exception, reported in a shuffle reduce task, indicates a failure in reading of one or more shuffle blocks from the hosting executors. Fetch Failures can occur from several reasons — But we found out that most of the time, in our case, when a task is failing due a fetchFailure exception, it is caused by an issue of the node’s local disk. Afterwards we’ll want to blacklist this node immediately.

Regex

This tip could be useful for your streaming application, if it uses spark built-in rlike method. Spark implementation for rlike method is java based (You can find the code implementation here) whose biggest drawback is that the running time is not linear relative to the input’s length. In case of long inputs, depends on the regex itself, we can face into long evaluation time of the regex. There is another implementation for regex engine that created by google, re2, and support in a linear runtime relative to the size of the input. The tradeoff here is that re2 does not support all regex features, such as backreferences or lookaround. Also re2 will not always have better performance than java regex, it is depends on the application regex usage and inputs. We managed to replace the rlike method of spark with our own implementation — which first try to compile the regex using the re2 and in case of failure we are fallback into java regex implementation.

import com.google.re2j.{Pattern => GPattern}
import org.apache.spark.internal.Logging

import java.util.regex.{Pattern => JPattern}
import org.apache.spark.sql.expressions.UserDefinedFunction
import org.apache.spark.sql.functions.udf

import java.util.concurrent.ConcurrentHashMap
import scala.util.Try
import java.lang.{Boolean => JBoolean}

object RegexOp extends Logging {

@transient private lazy val regexMapping =
new ConcurrentHashMap[String,Either[GPattern, JPattern]]()

val regexUDF: UserDefinedFunction = udf[JBoolean,String,String](isMatch)

private def isMatch(value: String, pattern: String) : JBoolean = {
value match {
case null => null
case _ =>
getOrBuild(pattern) match {
case Left(gpattern) => gpattern.matcher(value).find(0)
case Right(jpattern) => jpattern.matcher(value).find(0)
}
}
}

def getOrBuild(pattern: String): Either[GPattern,JPattern] = {
regexMapping.get(pattern) match {
case null =>
val matcher = Try {
val gPattern = GPattern.compile(pattern)
logDebug(s"Compiled pattern $pattern to google regex engine")
Left(gPattern)
} getOrElse {
val jPattern = JPattern.compile(pattern)
logDebug(s"Failed compile pattern $pattern to google engine, fall back to java")
Right(jPattern)
}
regexMapping.put(pattern, matcher)
matcher
case matcher => matcher
}
}

}

And after that in your main code — you can override rlike default spark implementation by spark conf

// Override spark built-in rlike in case of optimization google-regex enabled
if(sparkSession.conf.getOption("spark.optimization.enableGoogleRegex").exists(_.toBoolean)) {
sparkSession.udf.register("rlike", RegexOp.regexUDF)
}

Locality Wait

This tip could be useful if your streaming application is stateful. The configuration of spark.locality.wait(3seconds by default) is control on how long to wait to launch a data-local task before giving up and launching it on a less-local node. Generally there are few types of locality (Process, Node, Rack, Any), and we will want to keep the locality in the process or the node level to avoid network bandwidth. In our case, we encounter that there was a degradation in the micro-batch duration after a while, since the streaming application has been launched. We were able to notice that during the runtime of the application some executors has been killed, and that cause a lot of partitions go locally for a specific executor. Because of that each task had to wait for locality on the node, which increase the latency of each task to be launched and also made a distribution skew on the state.

For Example — In case we have 100 tasks, that each average runtime is 2 seconds, and 2 executors with 1 core each. 90 of the tasks are at Node Locality at executor A and the rest (10) are at Node Locality at executor B. We can run 2 tasks in parallel, so we can assume that 100 tasks will optimally take us about 100 seconds(100 tasks * 2 seconds / 2 cores). But in this case each task that it’s state locality on executor A will wait 3 seconds, by default, for node-level locality.

The results are that even if we have empty task slot at executor B most of the tasks will wait for locality at executor A — and that will increase significantly our micro-batch duration.

We were able to fix that issue by changing the spark.locality.wait configuration to zero, and by that made the state evenly distributed cross the executors and not waiting for unnecessary locality.

Kubernetes Executors Allocation

This tip could be useful if your streaming applications deployed on top of kubernetes. The configuration spark.kubernetes.allocation.batch.size(5 by default) is manage how many pods to lunch in once in each round of executor pod allocation. The interval of a round of executor pods allocation is configured by spark.kubernetes.allocation.batch.delay (1second by default). In case you have multiple streaming applications which scaling up and down often, or applications that proactively restarted — this configuration might be too low as a default. Assuming your application requests for 100 executors, the driver will ask for 5 executors each round and will wait till the executor’s PODs status will turn from PENDING to RUNNING. In case your kubernetes cluster has auto-scaler as well, this operation might take a long time and to resolve it you should adjust this configuration (According to your kubernetes cluster limitations).

Credits

Huge credit to my co-workers at the Data Platform group at Sentinel-One.

--

--