How to spread receivers over worker hosts evenly in Spark streaming

In Spark Streaming, you can spawn multiple receivers to increase parallelism, e.g., such that each receiver reads from one of the partitions in Kafka. Then you combine the resulting streams and process them by batches. The code is sketched as follows:

val ssc = new StreamingContext(sc, Seconds(batchInterval))
val dstream = if (numReceivers == 1)
ssc.receiverStream(new MyKafkaReceiver(storageLevel))
else {
val streams = (0 until numReceivers).map(receiverNo => {
ssc.receiverStream(new MyKafkaReceiver(storageLevel))
})
ssc.union(streams)
}
dstream.foreachRDD(…) // your business logic

In this code snippet, the receivers will be “randomly” scheduled on a set of worker hosts. Sounds perfect, right? Ideally, the receivers should be spread over the worker hosts as even as possible to read from the data sources via separate network interfaces. However, this default random scheduling often cannot guarantee an even distribution: First, it depends on which executors are known to the scheduler at the moment. It takes time for executors to be created and registered, which is not simultaneous especially in a large distributed system. Secondly, the scheduling is to map receivers to the set of executors. Note that a worker host may run multiple executors. Consequently, sometimes you will observe that some hosts run several receivers while some others are not running any receiver. This inbalance also has performance implications on the subsequent stream processing due to data locality, yielding poor resource utilization. The default scheduler favors CPU/memory over networking resources.

To address these two limitations, we need to first figure out the entire set of worker hosts, namely, the machines that run executors in which your data receiving and processing will happen. Unfortunately, there is no convenient spark API that directly gives us this information. We have to solve this problem indirectly.

One approach is to make up a fake workload and attempt to schedule it at a very aggressive level of parallelism. In the workload, we do nothing but register the local hostname to an accumulator. In the end, the driver knows the set of hosts from the accumulator and then provides hints to the scheduler. Note that we must repeat this a few times until convergence or a number of trials so as to get all hosts, according to discussions above. The code is as follows.

implicit object HostsAccParam extends AccumulatorParam[Set[String]] {
def addInPlace(t1: Set[String], t2: Set[String]): Set[String] = t1 ++ t2
def zero(initialValue: Set[String]) = Set[String]()
}

def getActiveWorkerHostSet(sc: SparkContext): Set[String] = {
val hostsAccumulator = sc.accumulator[Set[String]](Set[String]())
var foundNewHosts: Boolean = true
var trials: Int = 0
while (foundNewHosts && trials < 5 ) {
trials += 1
val oldSet = hostsAccumulator.value
val dataSet = (1 to estimatedNumberWorkers * 10000 * trials)
val numTasks = estimatedNumberWorkers * 100 * trials
sc.parallelize(dataSet, numTasks).foreach(_ => {
val hostname = InetAddress.getLocalHost.getHostName
hostsAccumulator += Set(hostname)
})
val newSet = hostsAccumulator.value
foundNewHosts = (newSet.size > oldSet.size)
}
hostsAccumulator.value
}

The underlying assumption in the above approach is that the fake workload is aggressive “enough” with regard to the available resources. However, it breaks in one of my experimental environment in which the driver happens to run on a worker host with sufficient amount of resources. As a result, all the tasks are executed on that host alone and hence all receivers are scheduled to that one very host. In practice, it would be difficult to easily create a fake workload that is guaranteed to spread over all the worker hosts without the concern of wasting compute resources. That said, this approach may end up being worse than the default scheduling in some environments.

It turns out that a better approach is to leverage some API in SparkContext, the method getExecutorMemoryStatus(), which returns the block manager addresses that have been registered to the driver’s block manager master. When a SparkContext is created in system initialization, it creates a driver with a block manager master, to which block managers created in the driver and every executor will be registered. It covers all the hosts we need. The only concern is that the host addresses returned include not only those of the executors but also that of the driver. Fortunately, we can figure out the driver address and exclude it. The code is as follows.

def getActiveWorkerHosts(sc: SparkContext): Set[String] = {
val driverHost: String = sc.getConf.get(“spark.driver.host”)
var workerSet = Set[String]()
var foundNewHosts: Boolean = true
val beginTimeMillis = System.currentTimeMillis
var timeout = false
while (foundNewHosts && !timeout) {
Thread.sleep(3000)
val oldSet = workerSet
val allHosts = sc.getExecutorMemoryStatus.map(_._1.split(“:”)(0)).toList
workerSet = allHosts.diff(List(driverHost)).toSet
foundNewHosts = workerSet.diff(oldSet).nonEmpty
if (System.currentTimeMillis — beginTimeMillis >= 30000)
timeout = true
}
workerSet
}

Now we can distribute the receivers evenly to the set of worker hosts, via overriding a method preferredLocation that is provided in class Receiver (which returns None by default). The following code shuffles the set and then suggests to the scheduler that receivers should be run on a set of worker hosts in a round-robin manner:

val workerSet = getActiveWorkerHosts(sc)
val candidates = scala.util.Random.shuffle(workerSet.toSeq).toArray
(0 until numReceivers).map(receiverNo => {
val host = candidates(receiverNo % candidates.length)
ssc.receiverStream(new MyKafkaReceiver(storageLevel) {
override def preferredLocation: Option[String] = Some(host)
})
})

Furthermore, we can be a bit more considerate when the master hosts also run workers/executors in some economical configurations. Masters themselves communicate a lot, so we avoid adding communication-bound receivers to those hosts if possible. In the following code, we give a higher priority to hosts that are only workers and a lower priority to those that also run master daemons. When the candidates are assigned in a round-robin manner, hosts in the rear array will be assigned less frequently than those in the front.

val masterSet: Set[String] = sc.master.split(“spark://”)(1).split(“,”).map(_.split(“:”)(0)).toSet
val nonMasterWorkSet = workerSet — — masterSet
val bothMasterWorkerSet = workerSet & masterSet
val prioritizedSeq = scala.util.Random.shuffle(nonMasterWorkSet.toSeq) ++ bothMasterWorkerSet.toSeq
candidates = prioritizedSeq.toArray

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store