Real-time Machine Learning Inference at Scale Using Spark Structured Streaming

A complete demo for developing locally and deploying on Databricks

Rashad Moarref
GumGum Tech Blog
10 min readMay 24, 2021

--

Photo by Hunter Harritt on Unsplash

Real-time machine learning inference at scale has become an essential part of modern applications. GumGum’s Verity engine powers the industry’s most sophisticated contextual targeting product by analyzing thousands of digital content every second around the clock. This is a challenging undertaking that requires deploying deep learning models using an event-driven streaming architecture on an elastic cloud-native cluster.

At GumGum, we use Apache Kafka’s high throughput and scalable streaming platform to connect various components of our machine learning pipelines. Up until recently, we deployed the underlying inference micro-services solely on Amazon ECS, which is a great choice due to its security, reliability, and scalability. However, as our scale multiplied over time, we decided to leverage Apache Spark Structured Streaming and migrate our inference micro-services to Databricks.

In this post, I summarize the advantages of adopting Spark Structured Streaming for inference workloads and provide an end-to-end demo for developing and deploying a natural language processing inference micro-service. The complete code is available on github.

Advantages of Spark Structured Streaming

1. Achieve virtually unlimited scalability

There is an inherent scalability bottleneck with ECS deployment where each task directly consumes from the input Kafka topics. Namely, the number of tasks (consumers) in the ECS cluster is upper-bounded by the number of partitions in the Kafka topics, which in turn is limited according to benchmark recommendations (up to 10 partitions per topic or up to 4,000 partitions per broker). On the other hand, the driver node in a Spark cluster can read from a handful of input Kafka topics and delegate inference to an arbitrary number of worker nodes. This gives Spark virtually unlimited scalability.

2. Achieve optimal performance

The tasks in an ECS cluster can end up competing with each other to consume from the input Kafka topics, creating an unbalanced distribution of load on the workers. In contrast, the driver node in the Spark cluster can be programmed to optimally distribute the work between worker nodes as a central consumer.

3. Fast turn-around for automation, testing, and debugging

By migrating to Spark, machine learning inference is converted to a data transformation task for both streaming and batch workloads. Consequently, the inference task can be placed in a unified and automated model training and data transformation pipeline. In other words, we can easily change the read, aggregation, and write operations according to our needs while the bulk of the code that handles inference remains the same. For example, large-scale a/b tests can be done quickly in a Databricks notebook that directly reads from and writes to Amazon S3. The same notebook can also be leveraged for localizing hard-to-find bugs or edge-cases in the inference model.

4. Save engineering time

Spark’s efficiency and high parallelism removes the need for maintaining code that handles multi-processing or multi-threading. In addition, Spark Structured Streaming provides a robust Kafka integration and offsets management that readily guarantees at-least once delivery of messages. This removes the need for maintaining code for interfacing with Kafka. Less code maintenance and fast turn-around for automation, testing, and debugging improves efficiency and saves engineering time.

Demo: Getting Started

Photo by Everton Vila on Unsplash

In this demo, we deploy spaCy’s open-source Named Entity Recognition (NER) model using Spark’s Python API pyspark. You can get the complete code by running git clone https://github.com/rashadmoarref/spark-demo.git. To simplify local deployment, we use a Makefile that runs the application in Docker. Let’s give it a try:

1. Open a terminal and create a local demo network to be used by the Kafka and app containers. Then run the Kafka service and the NER app:

2. In a separate terminal, start consuming from the result Kafka topic to get NER results:

3. Open a third terminal and send input messages to the input Kafka topic:

It’s that simple! I encourage you to play around by modifying the input messages or adding more messages to the input file and sending them to the app using make produce-input command.

Now let’s dive a little deeper into the code structure. The app folder contains the source code. Specifically, run.py is the entry point to the app, udfs.py contains the necessary user-defined functions, and schemas.py includes the required schema structures. The unit tests are in the test folder, and the deploy folder contains files for deploying the app locally or on Databricks. Please feel free to explore the app and test folders on your own. In the remainder of this post, we focus on the application deployment.

Local Deployment

Photo by Avel Chuklanov on Unsplash

The Dockerfile specifies how the NER Docker image is built. To avoid repeating ourselves, we use the same file to build images for both local deployment and deploying on Databricks using Databricks Container Services. Therefore, the Docker image must use one of the recommended Databricks base images or satisfy a few minimal requirements. Note that depending on the selected base image, we need to find the appropriate environment variables such as PATH and PYTHONUSERBASE in Dockerfile and a few other variables like PYSPARK_PYTHON defined in deploy/local/app/local.env. In addition, unit tests are run using tox as part of the build process.

Since Databricks Container Services loads the Spark-related dependencies into our custom container at runtime, we use a BUILD_TARGET argument (to be set at build time) in Dockerfile to differentiate between the image that will be used locally and the image that will be used in Databricks. Specifically, if BUILD_TARGET = local, we install jdk, scala, and spark for local environment. If BUILD_TARGET = databricks, we uninstall pyspark.

The deploy/local/app/docker-compose.yml file specifies the app service. Note that this service uses the demo network and the build argument BUILD_TARGET is set to local. The spark-submit command submits the /app/run.py job and specifies that the job needs to use the org.apache.spark:spark-sql-kafka-0–10_2.12:3.0.1 package allowing it to interact with Kafka. To make local development faster, we map the app folder using the volumes option ../../../app:/app. This allows us to modify the source code and run the app using docker-compose without rebuilding the image every time the code is changed.

The local.env file provides the spark-demo service with the necessary paths that Spark needs at runtime. Note that some of these paths depend on the Databricks base image that you select.

The deploy/local/kafka/docker-compose.yml file specifies the Kafka and Zookeeper services to be run on the demo network. Note that we are creating a volume mapping ./..:/data so that the kafka service can access the deploy/local/input.json file that contains the input messages to be sent to the input topic.

The Makefile contains simple commands to set up and tear down local kafka and app services using docker-compose and to send messages to the input topic and receive results from the result topic using docker exec on the kafka container.

These handy commands can be used in a similar way as in our demo:

Deploying on Databricks

Photo by Frederik Merten on Unsplash

Databricks is a data-native platform for data science and machine learning, and is founded by the creators of Apache Spark and MLflow. GumGum heavily relies on Databricks for large-scale data engineering and business analytics, and the Verity team is adopting it for collaboration across the full lifecycle of machine learning.

Databricks Container Services allows us to specify a custom Docker image to create a Spark cluster. This is convenient since we were already using Docker images to deploy our micro-services to Amazon ECS. Deploying a micro-service in Databricks involves sending a few REST requests using the Databricks Jobs API.

A micro-service is deployed as a Spark job with a unique name on a new_cluster that runs a PythonTask with entry point /app/run.py. All of these are specified in a single json payload. Other notable fields in this payload are

  • instance types for driver and worker nodes, e.g. driver_node_type_id
  • the docker image to be used, e.g. ECR_IMAGE_URL="<aws-account>.dkr.ecr.<aws-region>.amazonaws.com/<image-name>"
  • autoscaling, e.g. min_workers, max_workers
  • configuration for using spot instances, e.g. spot_bid_price_percent
  • using AWS instance profiles to pass IAM roles, e.g. INSTANCE_PROFILE_ARN="arn:aws:iam::<aws-account>:instance-profile/<profile-name>"
  • location of the driver and executor logs: cluster_log_conf
  • location of any init scripts that we want Databricks to run immediately after spinning up our custom container: init_scripts
  • for Structured Streaming jobs, we must set the following configuration variables: retry unlimited amount of times "max_retries": -1 and allow exactly one concurrent run of the job "max_concurrent_runs": 1

Once a job is created using the above settings, a job id is assigned to it. Since each micro-service has its own cluster name and job id, we can follow the following three steps, as outlined in deploy/databricks/deploy-job.sh, to deploy our micro-service:

  1. List existing jobs and find the job id with the cluster name of interest.
  2. If no job was found, then create the job. Otherwise, update the job.
  3. Run the job.

Creating and resetting jobs can be done using /api/2.0/jobs/create and /api/2.0/jobs/reset endpoints respectively. Note that after resetting a job, we need to cancel the currently running job and start the job with the updated settings. We use the /api/2.0/jobs/runs/list endpoint to find the current run id and cancel it using the /api/2.0/jobs/runs/cancel endpoint.

The above deploy/databricks/deploy-job.sh script can be invoked in your favorite CI/CD tool. Here, we use Drone which uses a .drone.yml file to specify various CI/CD steps. We distinguish different stages of our CI/CD pipeline using a number of utility variables. For example, we can trigger the production pipeline when a commit is tagged refs/tags/*, or we can trigger the staging pipeline when a PR is merged with the main branch refs/heads/main. We also define an ecr_build variable to specify that the Docker image must be built for deploying to Databricks (BUILD_TARGET=databricks) and the Amazon ECR registry where we want to store the image.

The CI/CD includes a build step and a deploy step. The build step stores the image in ECR where the image tag depends on whether we are in development, staging, or production phase. The deploy step is triggered during staging and production phase and invokes the deploy-job.sh script after setting appropriate environment variables such as:

  • DATABRICKS_API_TOKEN: the token to be used when working with Databricks Jobs API
  • CLUSTER_NAME: a unique name for the cluster that is running the job
  • IMAGE_NAME: the image name to be fetched from ECR
  • INIT_SCRIPTS_DESTINATION_S3: S3 location of init-script.sh
  • LOG_DESTINATION_S3: S3 location of Spark logs
  • INSTANCE_TYPE: EC2 instance type for driver or worker nodes
  • SPARK_VERSION: runtime version of Spark
  • MIN_WORKERS, MAX_WORKERS: minimum and maximum number of worker nodes used in autoscaling

Logging, Autoscaling, and Monitoring

Photo by Carlos Muza on Unsplash

Logging

Databricks dumps the driver and executor logs every 5 minutes to LOG_DESTINATION_S3 defined in .drone.yml and passed as cluster_log_conf in the job settings. Note that the cluster id is automatically added to the specified S3 path.

Autoscaling

The Databricks jobs clusters use Optimized Autoscaling which can be somewhat aggressive for our 24–7 workloads. At the time of writing this post, I found three tuning parameters that can be used to slow down the default behavior:

  • spark.databricks.aggressiveWindowDownS: How often to check for an opportunity to scale in (default is 40 sec).
  • spark.databricks.aggressiveFirstStepScaleUpFraction: Scale out by this fraction relative to the maximum number of workers (default is 0.34, then 0.64, which means the cluster will scale out to max workers in 2 steps).
  • spark.databricks.aggressiveFirstStepScaleUpMax: Scale out by adding this many workers in the first scale out event and increase by doubling the previous step thereafter. For example, we can set it to 1 to scale out by adding 1, 2, 4, … workers in consecutive scale out events (default is 40).

Monitoring Kafka Consumer Lag

One of the most important metrics to monitor in a streaming application is the Kafka consumer group lag. It quantifies the difference between most recent offsets in the input topics and the last committed offsets. In contrast to most consumers, Spark Structured Streaming does not commit the processed offsets back to Kafka. It instead tracks the Kafka offsets using offset files that are stored in a checkpoint location in S3. An offset file contains information about input topics, partitions, and the last successfully processed offsets. Checkpointing allows recovering from failures: when a job fails, the subsequent run starts from the last offset in the checkpoint location. The offset files are added for each batch of consumed messages that is successfully processed.

To avoid the need to work with both S3 and Kafka to calculate the consumer lag, we create a dummy consumer and report the Kafka offsets using a thread that runs on the driver node. The KafkaOffsetsReporter uses the lastProgress property of the streaming query to determine the last committed offsets as reflected in the endOffset field, and then commits it to the Kafka cluster. Once we do that, we can readily use tools like Burrow and Karrot to report lag to AWS CloudWatch as well as Prometheus.

Monitoring Databricks Prometheus Metrics

We use Pushgateway to expose Databricks’s internal Prometheus metrics to our external Prometheus cluster. To enable exporting of Spark metrics to Prometheus, we must set spark.ui.prometheus.enabled=true in the job’s spark_conf and execute the following init-script.sh upon launching of our custom containers:

Finally, we use a DatabricksPushgatewayExporter to run a separate thread on the driver node and expose our metrics.

Acknowledgement: Many thanks to my friend and colleague Florian Dambrine for his initiative on using a Makefile for simplifying local development in Verity, for putting together the scripts that made Monitoring Databricks Prometheus Metrics possible, and for building Karrot that allows us to export Burrow consumer lags to CloudWatch.

We’re always looking for new talent! View jobs.

Follow us: Facebook | Twitter | | LinkedIn | Instagram

--

--

Rashad Moarref
GumGum Tech Blog

Software Engineer with entrepreneurial spirit. Passionate about building Machine Learning applications at scale. PhD in ECE, Univ. Minnesota. Caltech Alumnus.