A Self-Autoscaler for Structured Streaming Applications in Databricks

What if I need full control over my cluster size?

Photo of a building
Photo by Kletis Roy on Unsplash

Apache Spark Structured Streaming deployed on Databricks is the perfect framework for running real-time workflows at scale. However, the Databricks jobs clusters use Optimized Autoscaling which can be somewhat aggressive for many 24–7 streaming workloads. Even though there are a few tuning parameters that can be used to slow down the default behavior, we can easily run into situations where we need tighter control over the cluster’s size mainly for cost-saving purposes.

For example, if the streaming query contains shuffle operations, e.g. when the query involves an aggregation or a join, Spark creates spark.sql.shuffle.partitions tasks (with a default of 200). In an autoscaling cluster, Databricks scales out the cluster such that no tasks remain pending. This can result in over-allocation of workers and prohibitive costs. To give you a concrete example, we have had workflows that could run on 3 workers for a given input without autoscaling, but would need between 20 and 50 workers if we allowed the cluster to autoscale to a maximum of 60 workers.

In this post, I present an AutoScaler class that runs a thread on the driver node and resizes the cluster using the Databricks Clusters API. This involves the following steps:

1. Getting the cluster info

Before we can resize a cluster, we need to obtain its cluster_id. In databricks, each run of a job starts a new cluster and therefore, the cluster_id needs to be dynamically obtained for the current run. Here, we assume that the job cluster has a unique cluster name as reflected in the RunName field of the cluster’s default_tags. This will be the same name that is specified in the name field of the request that is sent to the/create endpoint of the Jobs API.

We can get the cluster id by listing all of the clusters in the Databricks workspace and then searching the one cluster that has the intended name and is in RUNNING state. Note that there will be only one cluster that satisfies these conditions since a Structured Streaming job should always be configured to allow exactly one concurrent run of the job "max_concurrent_runs": 1.

The _get_cluster_info method returns the cluster id and the starting number of workers for our cluster.

2. Getting the last progress from the streaming query

We can get the information about the last processed batch in a Structured Streaming query from the streamingQuery.lastProgress variable (see below example). Specifically, we can get:

The _get_progress_info method uses durationMs.triggerExecution and processedRowsPerSecond values to extract the latest batch_duration and process rate per worker (processed_rows_per_second_per_worker).

3. Calculating the desired number of workers

For a streaming application, a natural metric to try to minimize is the latency. Depending on the application, a streaming query can have a nominal batch duration time that strikes the right balance between what is feasible and what is acceptable in terms of latency. The nominal batch duration can be obtained by running a few tests.

We can directly penalize latency in the autoscaling logic by adding or removing worker nodes based on the difference between the latest and the nominal batch duration times. The _get_desired_number_workers method implements a step-scaling approach and limits the number of workers between specified minimum and maximum values.

In addition, we can safeguard this approach against excessive scale out operations by checking whether the current number of workers are under-performing in terms of process rate. Specifically, we can remove a worker node if processedRowsPerSecond / number_workers is less than a nominal process rate per worker nominal_RPS_per_worker.

4. Resizing the cluster

The _resize_cluster method resizes the cluster by simply sending a POST request to the /resize endpoint of the Databricks Clusters API.

Photo of gears
Photo by Laura Ockel on Unsplash

Putting it all together

Here is the complete code for the AutoScaler class. Note that it is running the _run_autoscaler method inside a thread.

The AutoScaler object should be instantiated on the driver node in the main block of the application. Here, the input stream source is a Kafka topic. The streaming query extracts the message content from Kafka and runs the foreach_batch_process for each consumed batch. Also note that the streaming query is passed to the autoscaler along with a few other arguments that are needed for the autoscaling logic.

Summary

I presented an AutoScaler class that runs a thread on the driver node and self-autoscales the Databricks cluster for a Structured Streaming application. We have used this autoscaler for one of the inference applications that supports GumGum’s Verity engine. The below screenshot shows the input rate, the Kafka consumer lag, and the number of Databricks workers from top to bottom. You can see that the application uses between 2 and 8 nodes while being allowed to scale out up to 200 nodes. Without this autoscaler, the cluster would be scaled out up to 70 nodes, resulting in prohibitively large costs.

In this post I provided a solution for the autoscaling logic by penalizing the batch duration to minimize latency. Please feel free to modify the logic as you see fit for your application.

We’re always looking for new talent! View jobs.

Follow us: Facebook | Twitter | | LinkedIn | Instagram

Software Engineer with entrepreneurial spirit. Passionate about building Machine Learning applications at scale. PhD in ECE, Univ. Minnesota. Caltech Alumnus.