A Self-Autoscaler for Structured Streaming Applications in Databricks
What if I need full control over my cluster size?
Apache Spark Structured Streaming deployed on Databricks is the perfect framework for running real-time workflows at scale. However, the Databricks jobs clusters use Optimized Autoscaling which can be somewhat aggressive for many 24–7 streaming workloads. Even though there are a few tuning parameters that can be used to slow down the default behavior, we can easily run into situations where we need tighter control over the cluster’s size mainly for cost-saving purposes.
For example, if the streaming query contains shuffle operations, e.g. when the query involves an aggregation or a join, Spark creates
spark.sql.shuffle.partitions tasks (with a default of 200). In an autoscaling cluster, Databricks scales out the cluster such that no tasks remain pending. This can result in over-allocation of workers and prohibitive costs. To give you a concrete example, we have had workflows that could run on 3 workers for a given input without autoscaling, but would need between 20 and 50 workers if we allowed the cluster to autoscale to a maximum of 60 workers.
In this post, I present an
AutoScaler class that runs a thread on the driver node and resizes the cluster using the Databricks Clusters API. This involves the following steps:
- Getting the cluster info
- Getting the last progress from the streaming query
- Calculating the desired number of workers
- Resizing the cluster
1. Getting the cluster info
Before we can resize a cluster, we need to obtain its
cluster_id. In databricks, each run of a job starts a new cluster and therefore, the
cluster_id needs to be dynamically obtained for the current run. Here, we assume that the job cluster has a unique cluster name as reflected in the
RunName field of the cluster’s
default_tags. This will be the same name that is specified in the
name field of the request that is sent to the
/create endpoint of the Jobs API.
We can get the cluster id by listing all of the clusters in the Databricks workspace and then searching the one cluster that has the intended name and is in
RUNNING state. Note that there will be only one cluster that satisfies these conditions since a Structured Streaming job should always be configured to allow exactly one concurrent run of the job
_get_cluster_info method returns the cluster id and the starting number of workers for our cluster.
2. Getting the last progress from the streaming query
We can get the information about the last processed batch in a Structured Streaming query from the
streamingQuery.lastProgress variable (see below example). Specifically, we can get:
- The input
sources: The name and start and end offsets of the input sources. Here, the input is a Kafka topic named
input_textwith one partition. This information can be useful if we want to commit the processed Kafka offsets back to the Kafka cluster.
- Processing rates: Number of input messages processed per second, e.g.
“processedRowsPerSecond” : 71.11.
- Latencies: The
durationMsrepresents the total batch duration time in milliseconds, e.g.
“triggerExecution” : 74323.
_get_progress_info method uses
processedRowsPerSecond values to extract the latest
batch_duration and process rate per worker (
3. Calculating the desired number of workers
For a streaming application, a natural metric to try to minimize is the latency. Depending on the application, a streaming query can have a nominal batch duration time that strikes the right balance between what is feasible and what is acceptable in terms of latency. The nominal batch duration can be obtained by running a few tests.
We can directly penalize latency in the autoscaling logic by adding or removing worker nodes based on the difference between the latest and the nominal batch duration times. The
_get_desired_number_workers method implements a step-scaling approach and limits the number of workers between specified minimum and maximum values.
In addition, we can safeguard this approach against excessive scale out operations by checking whether the current number of workers are under-performing in terms of process rate. Specifically, we can remove a worker node if
processedRowsPerSecond / number_workers is less than a nominal process rate per worker
4. Resizing the cluster
_resize_cluster method resizes the cluster by simply sending a POST request to the
/resize endpoint of the Databricks Clusters API.
Putting it all together
Here is the complete code for the
AutoScaler class. Note that it is running the
_run_autoscaler method inside a thread.
AutoScaler object should be instantiated on the driver node in the main block of the application. Here, the input stream source is a Kafka topic. The streaming query extracts the message content from Kafka and runs the
foreach_batch_process for each consumed batch. Also note that the streaming query is passed to the autoscaler along with a few other arguments that are needed for the autoscaling logic.
I presented an
AutoScaler class that runs a thread on the driver node and self-autoscales the Databricks cluster for a Structured Streaming application. We have used this autoscaler for one of the inference applications that supports GumGum’s Verity engine. The below screenshot shows the input rate, the Kafka consumer lag, and the number of Databricks workers from top to bottom. You can see that the application uses between 2 and 8 nodes while being allowed to scale out up to 200 nodes. Without this autoscaler, the cluster would be scaled out up to 70 nodes, resulting in prohibitively large costs.
In this post I provided a solution for the autoscaling logic by penalizing the batch duration to minimize latency. Please feel free to modify the logic as you see fit for your application.