Real-time machine learning inference at scale has become an essential part of modern applications. GumGum’s Verity engine powers the industry’s most sophisticated contextual targeting product by analyzing thousands of digital content every second around the clock. This is a challenging undertaking that requires deploying deep learning models using an event-driven streaming architecture on an elastic cloud-native cluster.
At GumGum, we use Apache Kafka’s high throughput and scalable streaming platform to connect various components of our machine learning pipelines. Up until recently, we deployed the underlying inference micro-services solely on Amazon ECS, which is a great choice due to its security, reliability, and scalability. However, as our scale multiplied over time, we decided to leverage Apache Spark Structured Streaming and migrate our inference micro-services to Databricks.
In this post, I summarize the advantages of adopting Spark Structured Streaming for inference workloads and provide an end-to-end demo for developing and deploying a natural language processing inference micro-service. The complete code is available on github.
Advantages of Spark Structured Streaming
1. Achieve virtually unlimited scalability
There is an inherent scalability bottleneck with ECS deployment where each task directly consumes from the input Kafka topics. Namely, the number of tasks (consumers) in the ECS cluster is upper-bounded by the number of partitions in the Kafka topics, which in turn is limited according to benchmark recommendations (up to 10 partitions per topic or up to 4,000 partitions per broker). On the other hand, the driver node in a Spark cluster can read from a handful of input Kafka topics and delegate inference to an arbitrary number of worker nodes. This gives Spark virtually unlimited scalability.
2. Achieve optimal performance
The tasks in an ECS cluster can end up competing with each other to consume from the input Kafka topics, creating an unbalanced distribution of load on the workers. In contrast, the driver node in the Spark cluster can be programmed to optimally distribute the work between worker nodes as a central consumer.
3. Fast turn-around for automation, testing, and debugging
By migrating to Spark, machine learning inference is converted to a data transformation task for both streaming and batch workloads. Consequently, the inference task can be placed in a unified and automated model training and data transformation pipeline. In other words, we can easily change the read, aggregation, and write operations according to our needs while the bulk of the code that handles inference remains the same. For example, large-scale a/b tests can be done quickly in a Databricks notebook that directly reads from and writes to Amazon S3. The same notebook can also be leveraged for localizing hard-to-find bugs or edge-cases in the inference model.
4. Save engineering time
Spark’s efficiency and high parallelism removes the need for maintaining code that handles multi-processing or multi-threading. In addition, Spark Structured Streaming provides a robust Kafka integration and offsets management that readily guarantees at-least once delivery of messages. This removes the need for maintaining code for interfacing with Kafka. Less code maintenance and fast turn-around for automation, testing, and debugging improves efficiency and saves engineering time.
Demo: Getting Started
In this demo, we deploy spaCy’s open-source Named Entity Recognition (NER) model using Spark’s Python API pyspark. You can get the complete code by running
git clone https://github.com/rashadmoarref/spark-demo.git. To simplify local deployment, we use a
Makefile that runs the application in Docker. Let’s give it a try:
1. Open a terminal and create a local
demo network to be used by the Kafka and app containers. Then run the Kafka service and the NER app:
2. In a separate terminal, start consuming from the
result Kafka topic to get NER results:
3. Open a third terminal and send input messages to the
input Kafka topic:
It’s that simple! I encourage you to play around by modifying the input messages or adding more messages to the input file and sending them to the app using
make produce-input command.
Now let’s dive a little deeper into the code structure. The
app folder contains the source code. Specifically,
run.py is the entry point to the app,
udfs.py contains the necessary user-defined functions, and
schemas.py includes the required schema structures. The unit tests are in the
test folder, and the
deploy folder contains files for deploying the app locally or on Databricks. Please feel free to explore the
test folders on your own. In the remainder of this post, we focus on the application deployment.
Dockerfile specifies how the NER Docker image is built. To avoid repeating ourselves, we use the same file to build images for both local deployment and deploying on Databricks using Databricks Container Services. Therefore, the Docker image must use one of the recommended Databricks base images or satisfy a few minimal requirements. Note that depending on the selected base image, we need to find the appropriate environment variables such as
Dockerfile and a few other variables like
PYSPARK_PYTHON defined in
deploy/local/app/local.env. In addition, unit tests are run using tox as part of the build process.
Since Databricks Container Services loads the Spark-related dependencies into our custom container at runtime, we use a
BUILD_TARGET argument (to be set at build time) in
Dockerfile to differentiate between the image that will be used locally and the image that will be used in Databricks. Specifically, if
BUILD_TARGET = local, we install
spark for local environment. If
BUILD_TARGET = databricks, we uninstall
deploy/local/app/docker-compose.yml file specifies the app service. Note that this service uses the
demo network and the build argument
BUILD_TARGET is set to
spark-submit command submits the
/app/run.py job and specifies that the job needs to use the
org.apache.spark:spark-sql-kafka-0–10_2.12:3.0.1 package allowing it to interact with Kafka. To make local development faster, we map the
app folder using the
../../../app:/app. This allows us to modify the source code and run the app using
docker-compose without rebuilding the image every time the code is changed.
local.env file provides the
spark-demo service with the necessary paths that Spark needs at runtime. Note that some of these paths depend on the Databricks base image that you select.
deploy/local/kafka/docker-compose.yml file specifies the Kafka and Zookeeper services to be run on the
demo network. Note that we are creating a volume mapping
./..:/data so that the
kafka service can access the
deploy/local/input.json file that contains the input messages to be sent to the
Makefile contains simple commands to set up and tear down local
app services using
docker-compose and to send messages to the
input topic and receive results from the
result topic using
docker exec on the
These handy commands can be used in a similar way as in our demo:
Deploying on Databricks
Databricks is a data-native platform for data science and machine learning, and is founded by the creators of Apache Spark and MLflow. GumGum heavily relies on Databricks for large-scale data engineering and business analytics, and the Verity team is adopting it for collaboration across the full lifecycle of machine learning.
Databricks Container Services allows us to specify a custom Docker image to create a Spark cluster. This is convenient since we were already using Docker images to deploy our micro-services to Amazon ECS. Deploying a micro-service in Databricks involves sending a few REST requests using the Databricks Jobs API.
A micro-service is deployed as a Spark job with a unique
name on a
new_cluster that runs a PythonTask with entry point
/app/run.py. All of these are specified in a single json payload. Other notable fields in this payload are
- instance types for driver and worker nodes, e.g.
- the docker image to be used, e.g.
- autoscaling, e.g.
- configuration for using spot instances, e.g.
- using AWS instance profiles to pass IAM roles, e.g.
- location of the driver and executor logs:
- location of any init scripts that we want Databricks to run immediately after spinning up our custom container:
- for Structured Streaming jobs, we must set the following configuration variables: retry unlimited amount of times
"max_retries": -1and allow exactly one concurrent run of the job
Once a job is created using the above settings, a job id is assigned to it. Since each micro-service has its own cluster name and job id, we can follow the following three steps, as outlined in
deploy/databricks/deploy-job.sh, to deploy our micro-service:
- List existing jobs and find the job id with the cluster name of interest.
- If no job was found, then create the job. Otherwise, update the job.
- Run the job.
Creating and resetting jobs can be done using
/api/2.0/jobs/reset endpoints respectively. Note that after resetting a job, we need to cancel the currently running job and start the job with the updated settings. We use the
/api/2.0/jobs/runs/list endpoint to find the current run id and cancel it using the
deploy/databricks/deploy-job.sh script can be invoked in your favorite CI/CD tool. Here, we use Drone which uses a
.drone.yml file to specify various CI/CD steps. We distinguish different stages of our CI/CD pipeline using a number of utility variables. For example, we can trigger the production pipeline when a commit is tagged
refs/tags/*, or we can trigger the staging pipeline when a PR is merged with the
refs/heads/main. We also define an
ecr_build variable to specify that the Docker image must be built for deploying to Databricks (
BUILD_TARGET=databricks) and the Amazon ECR registry where we want to store the image.
The CI/CD includes a build step and a deploy step. The build step stores the image in ECR where the image tag depends on whether we are in development, staging, or production phase. The deploy step is triggered during staging and production phase and invokes the
deploy-job.sh script after setting appropriate environment variables such as:
DATABRICKS_API_TOKEN: the token to be used when working with Databricks Jobs API
CLUSTER_NAME: a unique name for the cluster that is running the job
IMAGE_NAME: the image name to be fetched from ECR
INIT_SCRIPTS_DESTINATION_S3: S3 location of
LOG_DESTINATION_S3: S3 location of Spark logs
INSTANCE_TYPE: EC2 instance type for driver or worker nodes
SPARK_VERSION: runtime version of Spark
MAX_WORKERS: minimum and maximum number of worker nodes used in autoscaling
Logging, Autoscaling, and Monitoring
Databricks dumps the driver and executor logs every 5 minutes to
LOG_DESTINATION_S3 defined in
.drone.yml and passed as
cluster_log_conf in the job settings. Note that the
cluster id is automatically added to the specified S3 path.
The Databricks jobs clusters use Optimized Autoscaling which can be somewhat aggressive for our 24–7 workloads. At the time of writing this post, I found three tuning parameters that can be used to slow down the default behavior:
spark.databricks.aggressiveWindowDownS: How often to check for an opportunity to scale in (default is 40 sec).
spark.databricks.aggressiveFirstStepScaleUpFraction: Scale out by this fraction relative to the maximum number of workers (default is 0.34, then 0.64, which means the cluster will scale out to max workers in 2 steps).
spark.databricks.aggressiveFirstStepScaleUpMax: Scale out by adding this many workers in the first scale out event and increase by doubling the previous step thereafter. For example, we can set it to 1 to scale out by adding 1, 2, 4, … workers in consecutive scale out events (default is 40).
Monitoring Kafka Consumer Lag
One of the most important metrics to monitor in a streaming application is the Kafka consumer group lag. It quantifies the difference between most recent offsets in the input topics and the last committed offsets. In contrast to most consumers, Spark Structured Streaming does not commit the processed offsets back to Kafka. It instead tracks the Kafka offsets using offset files that are stored in a checkpoint location in S3. An offset file contains information about input topics, partitions, and the last successfully processed offsets. Checkpointing allows recovering from failures: when a job fails, the subsequent run starts from the last offset in the checkpoint location. The offset files are added for each batch of consumed messages that is successfully processed.
To avoid the need to work with both S3 and Kafka to calculate the consumer lag, we create a dummy consumer and report the Kafka offsets using a thread that runs on the driver node. The
KafkaOffsetsReporter uses the
lastProgress property of the streaming query to determine the last committed offsets as reflected in the
endOffset field, and then commits it to the Kafka cluster. Once we do that, we can readily use tools like Burrow and Karrot to report lag to AWS CloudWatch as well as Prometheus.
Monitoring Databricks Prometheus Metrics
We use Pushgateway to expose Databricks’s internal Prometheus metrics to our external Prometheus cluster. To enable exporting of Spark metrics to Prometheus, we must set
spark.ui.prometheus.enabled=true in the job’s
spark_conf and execute the following
init-script.sh upon launching of our custom containers:
Finally, we use a
DatabricksPushgatewayExporter to run a separate thread on the driver node and expose our metrics.
Acknowledgement: Many thanks to my friend and colleague Florian Dambrine for his initiative on using a
Makefile for simplifying local development in Verity, for putting together the scripts that made Monitoring Databricks Prometheus Metrics possible, and for building Karrot that allows us to export Burrow consumer lags to CloudWatch.