Scalable Machine Learning with Kafka Streams and KServe

Jakob Edding
bakdata
Published in
9 min readJun 3, 2022

--

Photo by Kyle Hinkson on Unsplash

For the impatient: Go right to the demo repository or the kserve-client

With the advances made in the last couple of years in machine learning (ML), state-of-the-art models have reached milestones, making them suitable for solving an astonishing variety of tasks. In order to make decisions in real-time, there is a demand for applying ML models on live data. Such advances put pressure on companies to remain competitive in being fast and scalable while keeping solutions cost-efficient. Moreover, while developing ML models is by itself complex, serving models for production must be as easy as possible for DevOps, Data Scientists and Data Engineers.

In this blog post, we demonstrate how to combine state-of-the-art stream data processing with modern ML on Kubernetes. We present how we can use Apache Kafka and Kafka Streams in combination with the KServe inference platform for an easy integration of ML models with data streams.

Use Case

Popular libraries such as PyTorch, TensorFlow, or scikit-learn have become the de-facto standards for modern ML. However, using ML models created with these libraries on stream data is challenging. This is especially true if scalability, fault-tolerance, and exactly-once processing must be guaranteed. Another requirement is that Data Scientists have to continuously refine models easily.

Kafka Streams meets the above requirements for stream processing. Using KEDA and its Kafka scaler makes it easy to scale streaming apps automatically on Kubernetes. KServe compliments this by supporting popular ML libraries with out-of-the-box production model serving, i.e., an inference service for REST and gRPC.

Combining Kafka Streams and KServe enables efficient resource allocation of streaming applications and related inference services independently. This ensures optimal performance at scale and improves cost efficiency, which is especially important when ML models require expensive hardware, such as GPUs.

You can find the code for this post and the demo in our GitHub repository kafka-streams-kserve-demo.

KServe

KServe is a scalable, standardized model serving platform for Kubernetes. It provides strong abstractions to ease the delivery of ML models as services in Kubernetes. Deploying your model to a Kubernetes cluster is as easy as providing an ML model on your object storage of choice and a small Kubernetes YAML resource definition file. KServe publishes it via a standard inference protocol (REST/gRPC). It handles networking, authentication and model versioning. It provides a solution not only for the prediction but also for pre/post processing of data, monitoring and explainability. Further, the inference service will be automatically scaled by KServe depending on the load.

Out of the box, KServe currently supports TensorFlow, PyTorch, scikit-learn, XGBoost, ONNX and TensorRT.

KServe gives you tools to implement your own predictor using a custom model server. The MLServer, which provides the default KServe runtimes for scikit-learn and XGBoost, also enables us to implement custom model servers. This is an easy way to serve your models via REST or gRPC, fully compliant with the KServe’s V2 Dataplane spec.

For this post, we are going to use the MLServer to implement our own custom model server because we demonstrate an unsupported ML framework in our streaming pipeline.

Demo

The goal of the demo is to translate English texts to Spanish using a Deep Learning Model. First, English tweets from the Twitter API are produced into a topic. Then, the Kafka Streams app reads from there and calls our custom translation predictor to obtain a translation. Finally, the Spanish translation is written to an output topic.

A Diagram which shows the data flow inside a Kubernetes cluster from the Text Producer to the Kafka Streams Translation App which calls the MLServer Translator in KServe
Architecture overview of the demo system

Building a Custom Predictor with MLServer

To obtain the translation, we use the Argos Translate library, which is an open-source offline translation library written in Python. It conveniently abstracts the translation process from preliminary NLP tasks such as tokenization. We wrap the Argos model and serve this through MLServer. For simplicity’s sake, we load the model from the project directory instead of from cloud storage.

Every predictor has the following structure:

We need to create a class that implements the MLModel class. It requires a load and a predict method. In the load method, you should initialize the model. For our Argos predictor, the load method looks like this:

This method loads the Argos model and returns true if it succeeds. Now we take a look at the predict method:

Note that the code is simple and uses the Argos translation library at its core. The most important parts are input and output types. The MLServer library and KServe’s V2 Dataplane specification define two types for receiving and responding to an inference request.

The first one is the InferenceRequest object which encodes and serializes the data. The __get_input_data method in line 29 takes care of extracting the data from the object and assumes that it is a string type. Normally, this object has some useful metadata and MLServer supports multiple data types. For our case, we know that we receive strings. The documentation explains how to use other types of data.

On the other hand, we have the InferenceResponse type. This type inherits the aforementioned metadata capabilities from the InferenceRequest type. This type is composed of multiple ResponseOutput types (lines 16–24) where we describe the data format (under shape and datatype) and the actual response data from our predictor (under the data attribute). With this, we can test our predictor via the MLServer REST API. We tell MLServer where our predictor is and how to load it in a model-settings.json file:

With this file, MLServer can load our predictor. Next, we test it from within the mlserver-translator directory:

We now receive a response from our predictor and are ready to continue our journey to scale ML models.

Building the Kafka Streams Application

We demonstrate the Java Kafka Streams app which calls the MLServer inference service. The app class forms the entry point for the Translation Kafka Streams app.
To connect to Kafka, it inherits functionality from the generic KafkaStreamsApplication from our Kafka Streams Bootstrap project. Applications created with this project automatically send a dead letter to a dedicated error topic in case a record cannot be processed. To send requests to KServe, it makes use of our open-source Java KServe Client. The library includes a mechanism to automatically retry requests to the inference service in case the request times out, e.g. when the inference service is scaled to zero initially.

Let’s take a closer look at the process method of the App class. It takes as input a TextToTranslate object deserialized from the input topic, calls the KServe inference service and outputs a Translation object containing both texts (original and translation).

The method makes a request to the inference service for which a connection was set up according to the provided base endpoint, inference service and model name arguments. The request contains a list of inputs to translate. For simplicity, we process only one text (lines 6–17). To build the input, we set an input tensor name and the input data type to variable-length bytes (lines 8–9). Since the input is a single string, the shape is a one-dimensional list with one element (line 10). Extra parameters for the request are not set.

The number of outputs for this model corresponds to the number of inputs we sent. As we only send one input, we flat map the outputs to obtain the single output instance. From this, we obtain the output data — i.e. the translated text — and return it.

Running the Demo

Clone our demo from https://github.com/bakdata/kafka-streams-kserve-demo.

To run the demo, you need a Kubernetes cluster with KServe set up. Use the KServe docs if needed. The demo also requires Kafka with an input, output and error topic (for possible dead letters) configured for the Translator app. We use 10 partitions for each topic.

For the demo, the text-producer sends tweets to the input topic. Therefore, a Twitter bearer token is required to use the producer. Alternatively, you can look at the TextProducer and TextToTranslate class and produce your own records to an input topic.

First, we deploy the Translation inference service to the KServe inference service namespace in Kubernetes.

You now should add our Helm repository which contains the charts for the Producer app and the Kafka Streams app.

We can then deploy the Kafka Streams app that processes incoming records and makes prediction requests to the inference service. The environment variables BASE_ENDPOINT and INFERENCE_SERVICE_NAME (lines 8–9 below) should be chosen such that http://{$INFERENCE_SERVICE_NAME}{$BASE_ENDPOINT} matches the URL of the translator inference service.

Lastly, we deploy the Producer app that writes records to the input Kafka topic.

Scaling Kafka Streams and KServe

With these basic components up and running, the question of how to scale this system arises.

For this experiment, we scale the Kafka Streams app manually by increasing and decreasing the number of replicas. In production, we use an event-driven autoscaler like KEDA and thus automatically scale applications based on the Kafka consumer lag.

KServe autoscales the inference service’s number of predictors in response to the request load created by the Kafka Streams app replicas.

To showcase the scaling, we measure the number of:

  • incoming messages
  • replicas of our Kafka Streams app
  • predictors of the MLServer Translator

We observe the scaling mechanism and its effects.

By default, we configure our Kafka Streams app deployment with one replica. As you can see below, we adapt the replica count to two after some time, then to three and later to four before scaling down to three again.

Number of Ready Kafka Streams App Replicas

In the graph depicting the number of inference service predictors, we see that KServe automatically scales the service to two predictors to handle the initial load. This configuration remains unchanged when the Kafka Streams App is scaled to two replicas. Now the two predictors can be fully utilized.

With three Kafka Streams app replicas, the load on the service increases so that it is scaled to five predictors. After a short while, all requests have been served so the predictors are scaled down to three and then to two. This is a repeating pattern until the workload has been fully processed. Obviously, KServe attempts to find the sweet spot for the number of predictors. With only two predictors, requests from the three or four Kafka Streams app replicas cannot be served instantly and are held back by KServe. Once enough requests have been collected, the predictors are scaled up.

Number of MLServer Translator Predictors

The measured throughput of messages per second being written to the topic for translated texts resembles as expected the scaling of the predictors. Regular spikes in the throughput occur when the number of predictors is scaled up for a while. Two even bigger spikes in throughput are related to the manual scaling of the Kafka Streams App to four replicas to which KServe responds to by keeping five predictors for a prolonged period of time. Hence, the throughput sees strong spikes.

Messages in per Second

In an earlier iteration, we experienced some frequent up and downscaling of Kafka Streams replicas and KServe predictors. To “smooth” the scaling (avoid frequent up and downscaling) and achieve the presented results, we applied a Kafka configuration with static membership that we discuss in a previous blog post.

Conclusion

We have demonstrated how Kafka Streams and KServe can be combined to build a scalable system for ML inference. This way, we can leverage Kafka Streams’ scalability, fault-tolerance, and exactly-once stream data processing. KServe adds compatibility with current ML frameworks and auto-scaling features to the mix.

--

--