Extending ksqlDB built-in capability

Adil Houmadi
adidoescode
Published in
5 min readJul 30, 2021

Introduction

As we may know, Apache Kafka handles your data in real-time, but when it comes to stream processing usually developers use a set of components that require Java programming skills, such as Kafka streams, Apache Flink, Apache Spark.
Hence, what about non-tech users? Is there any alternative to process the data without these set of skills?

How to stream your flow (Photo by @robertlukeman on Unsplash)

Fortunately, there is a great alternative developed by Confluent, through only using an SQL-like language which is called: ksqlDB. This event streaming database allows non tech-users to play with data in real time. It provides a powerful interface via SQL-like queries without the need for programming language. This technology is also suitable for developers who are interested in building modern IoT system or big data platforms.

As mentioned previously working with high level abstraction is effortless compared to a programming language, the API and CLI of ksqlDB makes the interaction with the data much more easier.

Furthermore, managing a Kafka connector can be executed using similar technology that simplifies the architecture, and speed up the integrations.

ksqlDB allows stream processing operations, such as filtering, transforming, enhancing, and aggregating the data.

Hence, in case the built-in functions are not sufficient, the technology could extended easily to achieve the same capability like in Kafka streams.

Good news is that ksqlDB supports this feature via user defined function (UDF). In one of our projects, we had a requirement to split data from a main topic to a regional one that will contain only data from that region, at the same time, some fields of the new events needed to be hashed using a specific encryption algorithm.

Implementation

Often, you are looking to have a custom behaviour or to have some computation in your ksqlDB queries, regardless of the complexity of the usage case, ksqlDB provides a Java API to achieve this goal, it gives the possibility to extend the core of the engine by creating your own user-defined function.

If you check the documentation of the technology, filtering or masking data is already supported by default, but when it comes to hashing a field using a specific algorithm, a custom development is needed.

In our case, we created a hashing function to encrypt the data according to a set of algorithms. The first implementation was able to accept a string as input, hash the entry according to the specified algorithm, and send it back as hash.

The Java implementation looks like the following:

Couple of annotations are needed to make the function compatible with ksqlDB engine:

An example of Utils Class can be found here: Utils.java

Once we finished the development of our extension, we need to package it into a fat JAR file, using our favourite package manager in our case, we did use maven to generate this JAR.

We need to deploy the generated JAR to ksqlDB server, in our case, we do have a dockerized environment that will facilitate the task for us, by mounting the JAR to KSQL_KSQL_EXTENSION_DIR, it will be scanned via the internal ksqlDB framework and will be recognised.

Once the extension is deployed and recognised in the ksqlDB Server, finally we will be able to browse its documentation and start using it in our queries.

Let’s try out our transient query by selecting and hashing the regionid using SHA-256 algorithm, the results will be returned to the client and will not be written to any Kafka topic.

Here comes the results, to persist this output we will use persistent query in the next section, it will write back the results to a Kafka topic, this proves very useful, thus, we would want to share this data with another client.

Demonstration

In this section we will demonstrate a working example using ksqlDB, we will cover the following case study:

We will be interested to filter data that match the gender FEMALE and hash the regionid of each record.

This section contains the following details:

  • Setup the environment
  • Generate random data
  • Create a stream and apply the hash function
  • Browse the output topic
Stream processing workflow

Setup the environment

Using docker and docker-compose, we can spawn our environment.

You can spawn your own environment using this docker-compose.yml file

The environment contains the following components:

  • Zookeeper, Kafka broker, Schema registry: To handle the topic and the schema of the data
  • Kowl UI: To browse the data
  • ksqlDB server, ksqldDB client, ksqlDB-datagen: To stream the data and generate random data.
Docker Environment

Generate random data

We will use ksqlDB-datagen to generate random data in Avro Format and push it forward to the topic.

The random data that will be generated, will have the following structure:

We are generating 1000 msgs/second in 1000 iterations.

An example of the data that has been pushed to the topic: ‘users’

Create a stream and apply the hash function

We will create a stream on top of our topic. First, we can check the content of our topic in ksqlDB by executing this command, it will give us the format for key, the value, and print the rows:

In order to create the stream and start interacting with the topic, we use the following command:

We can check the stream creation by listing all the streams.

This transient query selects the userid, gender and hash the regionid:

Create the stream with a persistent query, the result of the query will be written back to the users_female topic:

Browse the output topic

The data is flowing in the final topic (users_female) in AVRO format. As we keep pushing random data to input topic ‘users’ the ksqlDB query is processing each record and pushing back to ‘users_female’ topic

Browse the data via Kowl UI

Out of the box, ksqlDB provides a set of functions that will allow filtering, grouping, and transforming the data easily and achievably.

It’s a good alternative to Kafka streams, no need to write code in a programming language to meet your requirement. Thus, when it comes achieving it: the API is very simple to use, and it will also eventually cover corner use cases, similar to case we have demonstrated in this article.

References

https://docs.ksqldb.io/en/latest/

https://docs.ksqldb.io/en/latest/how-to-guides/create-a-user-defined-function/

https://tomershaiman.medium.com/tutorial-building-a-s3-parquet-datalake-without-a-single-line-of-code-5ea4455edc1e

https://github.com/cloudhut/kowl

https://github.com/confluentinc/cp-all-in-one/tree/6.2.x/cp-all-in-one

--

--