ML in KSQL

Exploiting KSQL stream transformation and user-defined functions to deploy realtime machine learning models

At HomeAway, we use Apache Kafka as the backbone for our streaming architecture. We also like to deploy machine learning models to make realtime predictions on our data streams. Confluent KSQL provides an easy to use and interactive SQL interface for performing stream processing on Kafka. Below we show how to build a model in Python and use the model in KSQL to make predictions based on a stream of data in Kafka. We use Predictive Model Markup Language (PMML) to enable the ability to train the model using the Python library Scikit-learn, but perform model inference in Java-based KSQL. Below we will give a walkthrough, showing the steps necessary to run a simple example locally. KSQL and its user-defined-functions (UDFs) are key parts of this implementation that are interesting even outside the context of machine learning.

Setting up KSQL

To run KSQL, you can either download the Confluent Platform or use Docker. In the following I will use the Docker version. Create the following docker-compose.yml file for a quick start with Docker:

This file assumes you have a subdirectory ./volume in the current directory. After a docker-compose up -d, your Kafka and KSQL servers will be running. The command docker-compose ps will verify this.

Training the Model

We will do what is often a “hello world” of ML models: We will train a model on the iris data set. In this data set we have 150 samples of data from three species of iris: Iris setosa, Iris virginica, and Iris versicolor. Each sample has four features: the sepal length, the sepal width, the petal length, and the petal width. We will train a logistic regression model to classify which species of iris a sample is, based on the four features. As mentioned above, we use PMML in order to train the model in Python but have it run in Java/KSQL. Specifically, we use the JPMML-Sklearn project to give us exactly what we need in this case. The Jupyter notebook to train the model can be found here, but we will also include the code below.

Aside from some usual libraries that a data scientist would be expected to have handy (e.g., pandas and sklearn), we need to pip install the sklearn2pmml package before running the following.

Implementing the Model in KSQL

Model evaluation will be done in KSQL by implementing the model inside a KSQL UDF. I have created a Maven project to generate the required jars. This is the main UDF:

You will notice that this code uses the file iris-pipeline.pmml. In order to obtain this, we need to convert the pipeline pickle file iris-pipeline.pkl.z produced in the previous section. To convert, you need to clone and build the JPMML-Sklearn project (or just download the jar here). You can also just skip the following step altogether by downloading the already-built PMML file. With the jar in hand, the command to run is:

java -jar files/jars/jpmml-sklearn-executable-1.5-SNAPSHOT.jar --pkl-input ./models/iris-pipeline.pkl.z --pmml-output ./models/iris-pipeline.pmml

Clone and then build this project using mvn clean package. You can test out that the PMML file is working properly using this:

java -cp target/ksql-ml-pmml-example-0.0.1-jar-with-dependencies.jar ksqlexample.pmml.LocalRun models/iris-pipeline.pmml 7.0,3.2,4.7,4.4

Move the PMML file and the jar to the directory you mounted in the ksql-server container.

cp models/iris-pipeline.pmml docker/volume/
cp target/ksql-ml-pmml-example-0.0.1-jar-with-dependencies.jar docker/volume/

Restart the KSQL server via the following.

docker-compose up -d ksql-server

Realtime Model Evaluation Demo

KSQL should now be running and have access to the model and the UDF. We are now ready to send data to the Kafka topic and watch the machine learning model make predictions through KSQL.

First we must create the topic. Using Docker, the command is the following:

docker-compose exec broker kafka-topics --create --zookeeper zookeeper:2181 --replication-factor 1 --partitions 1 --topic iris_data

In another terminal, start the KSQL CLI.

docker-compose exec ksql-cli ksql http://ksql-server:8088

A stream is the KSQL equivalent of a table in SQL. You can create one from within the KSQL CLI by issuing the following command.

CREATE STREAM iris_stream (sepal_length double,sepal_width double,petal_length double,petal_width double) WITH (kafka_topic='iris_data', value_format='json');

Finally, we implement our model on incoming data by calling the UDF in KSQL.

select ksqlexample.pmml.IrisLogReg(sepal_length,sepal_width, petal_length,petal_width) from iris_stream;

That’s it! Now we can start publishing data to the topic and watching the model make predictions. Run the console-producer and then paste example data in.

docker-compose exec broker kafka-console-producer  --broker-list broker:29092 --topic iris_data
>{"sepal_length": 5.1,"sepal_width": 3.5,"petal_length": 1.4,"petal_width": 0.2}
>{"sepal_length": 5.9,"sepal_width": 3.0,"petal_length": 5.1,"petal_width": 1.8}
>{"sepal_length": 6.3,"sepal_width": 2.5,"petal_length": 4.9,"petal_width": 1.5}

If you switch back to the terminal running the KSQL CLI you should see the model making predictions on which class (0, 1, or 2) the classifier predicts the sample to be from. Below is a video to illustrate.

The above could have easily been combined with various KSQL queries to perform manipulations on the data before doing model evaluation. For instance, if we only wanted to perform model inference on samples where the sepal length is larger than 3.0, we could have created the stream like this:

CREATE STREAM iris_filtered AS SELECT * FROM iris_stream WHERE sepal_width > 3.0;

Or if the data in the Kafka topic was published in centimeters and the model expects it to be in inches:

CREATE STREAM iris_to_inches AS SELECT sepal_length/2.54, sepal_width/2.54, petal_length/2.54, petal_width/2.54 FROM iris_stream;

Conclusion

KSQL enables performing SQL-like queries on a stream of data to more simply transform the stream and directly feed it into a machine learning model. There is some extra overhead required on top of your existing Kafka cluster in that you need to have KSQL servers running, but in exchange, you can submit a simple SQL query from which KSQL will build and deploy a KStreams job, which can include machine learning as in the example above.

Here we made use of PMML for a model serialization format. We also experimented with using MLeap instead of PMML. While we found that KSQL UDFs could also be written in Scala, there was a particular Scala reflection error that proved problematic during the MLeap model’s deserialization in the UDF. H2O is another natural ML framework to use here. It produces models which naturally run in Java and hence integrates well with KSQL, as illustrated in this Confluent blog. The Confluent blog was very helpful in deriving the walkthrough presented above, and our contribution can be viewed as supplementary in that the example requires less in terms of setup and offers a demonstration of cross-language (Python to Java) functionality.