A Self-Autoscaler for Structured Streaming Applications in Databricks

What if I need full control over my cluster size?

Photo of a building
Photo by Kletis Roy on Unsplash

Apache Spark Structured Streaming deployed on Databricks is the perfect framework for running real-time workflows at scale. However, the Databricks jobs clusters use Optimized Autoscaling which can be somewhat aggressive for many 24–7 streaming workloads. Even though there are a few tuning parameters that can be used to slow down the default behavior, we can easily run into situations where we need tighter control over the cluster’s size mainly for cost-saving purposes.

For example, if the streaming query contains shuffle operations, e.g. when the query involves an aggregation or a join, Spark creates tasks (with a default of 200). In an autoscaling cluster, Databricks scales out the cluster such that no tasks remain pending. This can result in over-allocation of workers and prohibitive costs. To give you a concrete example, we have had workflows that could run on 3 workers for a given input without autoscaling, but would need between 20 and 50 workers if we allowed the cluster to autoscale to a maximum of 60 workers.

In this post, I present an class that runs a thread on the driver node and resizes the cluster using the Databricks Clusters API. This involves the following steps:

  1. Getting the cluster info
  2. Getting the last progress from the streaming query
  3. Calculating the desired number of workers
  4. Resizing the cluster

1. Getting the cluster info

Before we can resize a cluster, we need to obtain its . In databricks, each run of a job starts a new cluster and therefore, the needs to be dynamically obtained for the current run. Here, we assume that the job cluster has a unique cluster name as reflected in the field of the cluster’s . This will be the same name that is specified in the field of the request that is sent to the endpoint of the Jobs API.

We can get the cluster id by listing all of the clusters in the Databricks workspace and then searching the one cluster that has the intended name and is in state. Note that there will be only one cluster that satisfies these conditions since a Structured Streaming job should always be configured to allow exactly one concurrent run of the job .

The method returns the cluster id and the starting number of workers for our cluster.

2. Getting the last progress from the streaming query

We can get the information about the last processed batch in a Structured Streaming query from the variable (see below example). Specifically, we can get:

  • The input : The name and start and end offsets of the input sources. Here, the input is a Kafka topic named with one partition. This information can be useful if we want to commit the processed Kafka offsets back to the Kafka cluster.
  • Processing rates: Number of input messages processed per second, e.g. .
  • Latencies: The field in represents the total batch duration time in milliseconds, e.g. .

The method uses and values to extract the latest and process rate per worker ().

3. Calculating the desired number of workers

For a streaming application, a natural metric to try to minimize is the latency. Depending on the application, a streaming query can have a nominal batch duration time that strikes the right balance between what is feasible and what is acceptable in terms of latency. The nominal batch duration can be obtained by running a few tests.

We can directly penalize latency in the autoscaling logic by adding or removing worker nodes based on the difference between the latest and the nominal batch duration times. The method implements a step-scaling approach and limits the number of workers between specified minimum and maximum values.

In addition, we can safeguard this approach against excessive scale out operations by checking whether the current number of workers are under-performing in terms of process rate. Specifically, we can remove a worker node if is less than a nominal process rate per worker .

4. Resizing the cluster

The method resizes the cluster by simply sending a POST request to the endpoint of the Databricks Clusters API.

Photo of gears
Photo by Laura Ockel on Unsplash

Putting it all together

Here is the complete code for the class. Note that it is running the method inside a thread.

The object should be instantiated on the driver node in the main block of the application. Here, the input stream source is a Kafka topic. The streaming query extracts the message content from Kafka and runs the for each consumed batch. Also note that the streaming query is passed to the autoscaler along with a few other arguments that are needed for the autoscaling logic.


I presented an class that runs a thread on the driver node and self-autoscales the Databricks cluster for a Structured Streaming application. We have used this autoscaler for one of the inference applications that supports GumGum’s Verity engine. The below screenshot shows the input rate, the Kafka consumer lag, and the number of Databricks workers from top to bottom. You can see that the application uses between 2 and 8 nodes while being allowed to scale out up to 200 nodes. Without this autoscaler, the cluster would be scaled out up to 70 nodes, resulting in prohibitively large costs.

In this post I provided a solution for the autoscaling logic by penalizing the batch duration to minimize latency. Please feel free to modify the logic as you see fit for your application.

We’re always looking for new talent! View jobs.

Follow us: Facebook | Twitter | | LinkedIn | Instagram

Software Engineer with entrepreneurial spirit. Passionate about building Machine Learning applications at scale. PhD in ECE, Univ. Minnesota. Caltech Alumnus.