Spark Streaming: Dynamic Scaling And Backpressure in Action

Priya Matpadi
7 min readOct 7, 2018

--

At Lookout, in platform services we run few spark streaming applications in production that ingest different telemetry events from mobile devices.

We have a public API to accept telemetry from mobile devices that have the Lookout client app installed. After authenticating and authorizing the devices, the microservice hosting the API endpoint accepts the telemetry payload and publishes it to different kafka topics based on the telemetry type. Kafka provides a buffer for the telemetry processors to consume the stream as fast as the consumer can, without affecting the throughput of the gateway microservice that interfaces directly with devices. The spark streaming applications consume the telemetry events from the kafka topics using Direct Stream Approach.

The spark streaming applications are all deployed on a single AWS EMR cluster. The applications are configured to share cluster resources using the YARN capacity scheduler mechanism, such that one application cannot starve other applications on the same cluster.

Just like typical streaming workloads, our spark streaming applications often see bursts of data due to situations like:

  1. Over-the-air programming
  2. Deploying new application version causes brief interruptions in ingestion
  3. Client bugs
  4. Throttling in downstream systems, like data stores, or other microservices

The processors must be able to handle such bursts or bottlenecks gracefully. Apache Spark allows controlling the ingestion rate in a couple of ways:

Dynamic Allocation

First, Apache Spark allows the application to adjust to variations in traffic by dynamically scaling up the number of executors to the maximum, or dynamically scaling down, as per the allowed capacity for the application’s queue in YARN capacity scheduler configuration.

Closely coupled to allocating appropriate capacity in YARN configuration, there are specific spark-defaults.conf overrides one must pay attention to:

Spark Streaming Dynamic Allocation Properties

Notes

  • These can be configured specifically for each application. See Spark Properties.
  • Adjust the minExecutors, executor.cores and executor.memory appropriately for your application needs.
  • spark.dynamicAllocation.enabled is mutually exclusive to spark.streaming.dynamicAllocation.enabled.
  • The complete list of spark streaming dynamic allocation properties are missing from the Apache Spark documentation. We found these from the open source code here, and verified them to work on Apache Spark v2.2.0

Backpressure

Backpressure refers to the situation where a system is receiving data at a higher rate than it can process during a temporary load spike.

If there is a sudden spike in traffic, this could cause bottlenecks in downstream dependencies, that slows down the stream processing. For example, if your stream is being persisted in AWS DynamoDb, and the consumed rate exceeds the write provisioned throughput for the DynamoDb table, the writes will start causing ProvisionedThroughputExceededException. This will slow down your processing. As another example, if your application indexes the ingested data in Elasticsearch, higher traffic could result in Elasticsearch cluster getting overwhelmed causing socket exceptions.

If you failed to handle the exceptions, your spark streaming application will die after some default number of retries. The retries can be controlled by overriding the below properties:

Even if you did a thorough job of handling the exceptions and raising appropriate alerts, in such situations if batch processing time exceeds batch interval time for too long, it can lead to exhaustion of resources, and the spark application will eventually die.

Fear not! Apache spark has a robust backpressure mechanism to ensure your application executes reliably. As with dynamic allocation, there are some spark-default properties you can override to adjust backpressure as per your application requirements.

The specific spark-defaults.conf overrides of interest here are:

We highly recommend this blog from Databricks to understand how spark streaming backpressure works, and how it can be tuned to your application requirements.

Dynamic Scaling And Backpressure in Action

Now we will see the impact of these spark configuration overrides in action. In the below scenario, our spark streaming applications were restarted after some down time.

We will closely look at the streaming statistics of one of the applications which experiences higher input rate compared to our other applications. The streaming batch interval for this application is 5s. We have allocated highest percentage of cluster capacity to this application.

Streaming tab in Spark UI provides great insight into how dynamic allocation and backpressure play together gracefully.

The kafka topic this application consumes from has 8 partitions. As per the maxRatePerPartition=3000 setting for this job, when the application starts, it ingests

3000*8*5 = 120,000 events

New batches queue up every 5s with 120K events.

Dynamic Scaling kicks in

Initially, as per the spark.streaming.dynamicAllocation.minExecutors=4 setting, spark launches 4 executors to pull the weight.

Since 4 executors are clearly not able to deal with the incoming traffic rate, dynamic scaling of executors kicks in.

The first 13 batches are not able to finish within the 5s batch interval.

*It takes a few cycles for spark to figure out that application is not able to handle the incoming rate within the batch interval. Developer must ensure the application will not come down to its knees before backpressure kicks in.

Backpressure kicks in

As per the backpressure.pid.minRate=3000 setting, for the 14th batch the input rate falls down to 3000*5=15000 events. New batches continue queuing up every 5s with 15K events.

Dynamic Scaling continues

As a result of dynamic scaling, the number of executors eventually launched becomes 27*.

*The below picture shows active and total executors as 28, which includes the driver process. There are some failed tasks, but our spark configuration prevents spark from killing the application, continuing to retry failures.

Spark has launched maximum number of executors as allowed by the YARN queue configuration for this job. In the screenshot below, you can see the last queue called safe-browsing now consumes 97% of its allocated resources.

Result of Dynamic Scaling and Backpressure

As more executors become available to process the incoming spike, we see that the processing time reduces to within the batch interval time. Spark’s rate estimator algorithm now gradually starts pulling in more than 15K events per batch.

The final result of the dynamic scale up is that there is only one active batch processing at a time. The earlier backup has vanished.

Spark’s Streaming Statistics UI provides an excellent visual representation of this entire timeline mentioned above, and the current health of our spark streaming application.

Observations

  1. Processing time averages 4s which is well within batch interval time of 5s.
  2. Scheduling delay is initially increased to 2.5 minutes, but as more executors are launched, this delay comes down to almost 0s.
  3. Executors must have reasonable number of cores and memory allocated. In our experience, minimum 4 cores and 4–7GB per executor with minimum number of executors set to 4 on m4.2xl EC2 instances performs reasonably well. Going lower could cause grief. Of course, your mileage will vary by use case.
  4. Average input rate was initially very high. Through dynamic scale up and backpressure our application caught up with the buffered events in kafka.
  5. To ensure that consuming application is really keeping pace with incoming events, i.e. there is no consumer lag, there is one more kafka metric we have found useful to monitor. Note how the net.bytes_in(blue line) is in sync with net.bytes_out(grey line)

Conclusion

Every spark streaming application has a unique signature. It is important for developers to understand the expected input rate, input cadence, scenarios that could result in huge spikes, the characteristics and constraints of the sink. Based on the business requirements and understanding of upstream and downstream components, application profiling for load, and tuning the spark configuration to override relevant defaults, will result into a healthy streamlined highly efficient streaming application.

We would love to hear your experience, thoughts and/or questions. If you found any other efficiencies, tips and tricks in your streaming explorations, please share.

--

--