Real-time Machine Learning Inference at Scale Using Spark Structured Streaming
A complete demo for developing locally and deploying on Databricks
Real-time machine learning inference at scale has become an essential part of modern applications. GumGum’s Verity engine powers the industry’s most sophisticated contextual targeting product by analyzing thousands of digital content every second around the clock. This is a challenging undertaking that requires deploying deep learning models using an event-driven streaming architecture on an elastic cloud-native cluster.
At GumGum, we use Apache Kafka’s high throughput and scalable streaming platform to connect various components of our machine learning pipelines. Up until recently, we deployed the underlying inference micro-services solely on Amazon ECS, which is a great choice due to its security, reliability, and scalability. However, as our scale multiplied over time, we decided to leverage Apache Spark Structured Streaming and migrate our inference micro-services to Databricks.
In this post, I summarize the advantages of adopting Spark Structured Streaming for inference workloads and provide an end-to-end demo for developing and deploying a natural language processing inference micro-service. The complete code is available on github.
Advantages of Spark Structured Streaming
1. Achieve virtually unlimited scalability
There is an inherent scalability bottleneck with ECS deployment where each task directly consumes from the input Kafka topics. Namely, the number of tasks (consumers) in the ECS cluster is upper-bounded by the number of partitions in the Kafka topics, which in turn is limited according to benchmark recommendations (up to 10 partitions per topic or up to 4,000 partitions per broker). On the other hand, the driver node in a Spark cluster can read from a handful of input Kafka topics and delegate inference to an arbitrary number of worker nodes. This gives Spark virtually unlimited scalability.
2. Achieve optimal performance
The tasks in an ECS cluster can end up competing with each other to consume from the input Kafka topics, creating an unbalanced distribution of load on the workers. In contrast, the driver node in the Spark cluster can be programmed to optimally distribute the work between worker nodes as a central consumer.
3. Fast turn-around for automation, testing, and debugging
By migrating to Spark, machine learning inference is converted to a data transformation task for both streaming and batch workloads. Consequently, the inference task can be placed in a unified and automated model training and data transformation pipeline. In other words, we can easily change the read, aggregation, and write operations according to our needs while the bulk of the code that handles inference remains the same. For example, large-scale a/b tests can be done quickly in a Databricks notebook that directly reads from and writes to Amazon S3. The same notebook can also be leveraged for localizing hard-to-find bugs or edge-cases in the inference model.
4. Save engineering time
Spark’s efficiency and high parallelism removes the need for maintaining code that handles multi-processing or multi-threading. In addition, Spark Structured Streaming provides a robust Kafka integration and offsets management that readily guarantees at-least once delivery of messages. This removes the need for maintaining code for interfacing with Kafka. Less code maintenance and fast turn-around for automation, testing, and debugging improves efficiency and saves engineering time.
Demo: Getting Started
In this demo, we deploy spaCy’s open-source Named Entity Recognition (NER) model using Spark’s Python API pyspark. You can get the complete code by running git clone https://github.com/rashadmoarref/spark-demo.git
. To simplify local deployment, we use a Makefile
that runs the application in Docker. Let’s give it a try:
1. Open a terminal and create a local demo
network to be used by the Kafka and app containers. Then run the Kafka service and the NER app:
2. In a separate terminal, start consuming from the result
Kafka topic to get NER results:
3. Open a third terminal and send input messages to the input
Kafka topic:
It’s that simple! I encourage you to play around by modifying the input messages or adding more messages to the input file and sending them to the app using make produce-input
command.
Now let’s dive a little deeper into the code structure. The app
folder contains the source code. Specifically, run.py
is the entry point to the app, udfs.py
contains the necessary user-defined functions, and schemas.py
includes the required schema structures. The unit tests are in the test
folder, and the deploy
folder contains files for deploying the app locally or on Databricks. Please feel free to explore the app
and test
folders on your own. In the remainder of this post, we focus on the application deployment.
Local Deployment
The Dockerfile
specifies how the NER Docker image is built. To avoid repeating ourselves, we use the same file to build images for both local deployment and deploying on Databricks using Databricks Container Services. Therefore, the Docker image must use one of the recommended Databricks base images or satisfy a few minimal requirements. Note that depending on the selected base image, we need to find the appropriate environment variables such as PATH
and PYTHONUSERBASE
in Dockerfile
and a few other variables like PYSPARK_PYTHON
defined in deploy/local/app/local.env
. In addition, unit tests are run using tox as part of the build process.
Since Databricks Container Services loads the Spark-related dependencies into our custom container at runtime, we use a BUILD_TARGET
argument (to be set at build time) in Dockerfile
to differentiate between the image that will be used locally and the image that will be used in Databricks. Specifically, if BUILD_TARGET = local
, we install jdk
, scala
, and spark
for local environment. If BUILD_TARGET = databricks
, we uninstall pyspark
.
The deploy/local/app/docker-compose.yml
file specifies the app service. Note that this service uses the demo
network and the build argument BUILD_TARGET
is set to local
. The spark-submit
command submits the /app/run.py
job and specifies that the job needs to use the org.apache.spark:spark-sql-kafka-0–10_2.12:3.0.1
package allowing it to interact with Kafka. To make local development faster, we map the app
folder using the volumes
option ../../../app:/app
. This allows us to modify the source code and run the app using docker-compose
without rebuilding the image every time the code is changed.
The local.env
file provides the spark-demo
service with the necessary paths that Spark needs at runtime. Note that some of these paths depend on the Databricks base image that you select.
The deploy/local/kafka/docker-compose.yml
file specifies the Kafka and Zookeeper services to be run on the demo
network. Note that we are creating a volume mapping ./..:/data
so that the kafka
service can access the deploy/local/input.json
file that contains the input messages to be sent to the input
topic.
The Makefile
contains simple commands to set up and tear down local kafka
and app
services using docker-compose
and to send messages to the input
topic and receive results from the result
topic using docker exec
on the kafka
container.
These handy commands can be used in a similar way as in our demo:
Deploying on Databricks
Databricks is a data-native platform for data science and machine learning, and is founded by the creators of Apache Spark and MLflow. GumGum heavily relies on Databricks for large-scale data engineering and business analytics, and the Verity team is adopting it for collaboration across the full lifecycle of machine learning.
Databricks Container Services allows us to specify a custom Docker image to create a Spark cluster. This is convenient since we were already using Docker images to deploy our micro-services to Amazon ECS. Deploying a micro-service in Databricks involves sending a few REST requests using the Databricks Jobs API.
A micro-service is deployed as a Spark job with a unique name
on a new_cluster
that runs a PythonTask with entry point /app/run.py
. All of these are specified in a single json payload. Other notable fields in this payload are
- instance types for driver and worker nodes, e.g.
driver_node_type_id
- the docker image to be used, e.g.
ECR_IMAGE_URL="<aws-account>.dkr.ecr.<aws-region>.amazonaws.com/<image-name>"
- autoscaling, e.g.
min_workers
,max_workers
- configuration for using spot instances, e.g.
spot_bid_price_percent
- using AWS instance profiles to pass IAM roles, e.g.
INSTANCE_PROFILE_ARN="arn:aws:iam::<aws-account>:instance-profile/<profile-name>"
- location of the driver and executor logs:
cluster_log_conf
- location of any init scripts that we want Databricks to run immediately after spinning up our custom container:
init_scripts
- for Structured Streaming jobs, we must set the following configuration variables: retry unlimited amount of times
"max_retries": -1
and allow exactly one concurrent run of the job"max_concurrent_runs": 1
Once a job is created using the above settings, a job id is assigned to it. Since each micro-service has its own cluster name and job id, we can follow the following three steps, as outlined in deploy/databricks/deploy-job.sh
, to deploy our micro-service:
- List existing jobs and find the job id with the cluster name of interest.
- If no job was found, then create the job. Otherwise, update the job.
- Run the job.
Creating and resetting jobs can be done using /api/2.0/jobs/create
and /api/2.0/jobs/reset
endpoints respectively. Note that after resetting a job, we need to cancel the currently running job and start the job with the updated settings. We use the /api/2.0/jobs/runs/list
endpoint to find the current run id and cancel it using the /api/2.0/jobs/runs/cancel
endpoint.
The above deploy/databricks/deploy-job.sh
script can be invoked in your favorite CI/CD tool. Here, we use Drone which uses a .drone.yml
file to specify various CI/CD steps. We distinguish different stages of our CI/CD pipeline using a number of utility variables. For example, we can trigger the production pipeline when a commit is tagged refs/tags/*
, or we can trigger the staging pipeline when a PR is merged with the main
branch refs/heads/main
. We also define an ecr_build
variable to specify that the Docker image must be built for deploying to Databricks (BUILD_TARGET=databricks
) and the Amazon ECR registry where we want to store the image.
The CI/CD includes a build step and a deploy step. The build step stores the image in ECR where the image tag depends on whether we are in development, staging, or production phase. The deploy step is triggered during staging and production phase and invokes the deploy-job.sh
script after setting appropriate environment variables such as:
DATABRICKS_API_TOKEN
: the token to be used when working with Databricks Jobs APICLUSTER_NAME
: a unique name for the cluster that is running the jobIMAGE_NAME
: the image name to be fetched from ECRINIT_SCRIPTS_DESTINATION_S3
: S3 location ofinit-script.sh
LOG_DESTINATION_S3
: S3 location of Spark logsINSTANCE_TYPE
: EC2 instance type for driver or worker nodesSPARK_VERSION
: runtime version of SparkMIN_WORKERS
,MAX_WORKERS
: minimum and maximum number of worker nodes used in autoscaling
Logging, Autoscaling, and Monitoring
Logging
Databricks dumps the driver and executor logs every 5 minutes to LOG_DESTINATION_S3
defined in .drone.yml
and passed as cluster_log_conf
in the job settings. Note that the cluster id
is automatically added to the specified S3 path.
Autoscaling
The Databricks jobs clusters use Optimized Autoscaling which can be somewhat aggressive for our 24–7 workloads. At the time of writing this post, I found three tuning parameters that can be used to slow down the default behavior:
spark.databricks.aggressiveWindowDownS
: How often to check for an opportunity to scale in (default is 40 sec).spark.databricks.aggressiveFirstStepScaleUpFraction
: Scale out by this fraction relative to the maximum number of workers (default is 0.34, then 0.64, which means the cluster will scale out to max workers in 2 steps).spark.databricks.aggressiveFirstStepScaleUpMax
: Scale out by adding this many workers in the first scale out event and increase by doubling the previous step thereafter. For example, we can set it to 1 to scale out by adding 1, 2, 4, … workers in consecutive scale out events (default is 40).
Monitoring Kafka Consumer Lag
One of the most important metrics to monitor in a streaming application is the Kafka consumer group lag. It quantifies the difference between most recent offsets in the input topics and the last committed offsets. In contrast to most consumers, Spark Structured Streaming does not commit the processed offsets back to Kafka. It instead tracks the Kafka offsets using offset files that are stored in a checkpoint location in S3. An offset file contains information about input topics, partitions, and the last successfully processed offsets. Checkpointing allows recovering from failures: when a job fails, the subsequent run starts from the last offset in the checkpoint location. The offset files are added for each batch of consumed messages that is successfully processed.
To avoid the need to work with both S3 and Kafka to calculate the consumer lag, we create a dummy consumer and report the Kafka offsets using a thread that runs on the driver node. The KafkaOffsetsReporter
uses the lastProgress
property of the streaming query to determine the last committed offsets as reflected in the endOffset
field, and then commits it to the Kafka cluster. Once we do that, we can readily use tools like Burrow and Karrot to report lag to AWS CloudWatch as well as Prometheus.
Monitoring Databricks Prometheus Metrics
We use Pushgateway to expose Databricks’s internal Prometheus metrics to our external Prometheus cluster. To enable exporting of Spark metrics to Prometheus, we must set spark.ui.prometheus.enabled=true
in the job’s spark_conf
and execute the following init-script.sh
upon launching of our custom containers:
Finally, we use a DatabricksPushgatewayExporter
to run a separate thread on the driver node and expose our metrics.
Acknowledgement: Many thanks to my friend and colleague Florian Dambrine for his initiative on using a Makefile
for simplifying local development in Verity, for putting together the scripts that made Monitoring Databricks Prometheus Metrics possible, and for building Karrot that allows us to export Burrow consumer lags to CloudWatch.