Using kafka value(s) as KSQL DB primary key

Fables of the repartition (a twist on R.E.M’s album ;-))

Pini Faran
Machines talk, we tech.
4 min readNov 20, 2022

--

Why we used KSQL DB? (in a nutshell)

As part of a major re-factor in our product of process health in Augury, we switched from batch (SQL based) architecture to streaming (micro service) based architecture.

We chose our streaming platform to be Confluent Kafka. More details on why we switched to this architecture and some additional implementation\design aspects can be found in the trilogy I’ve published here, as well as here and in the closing episode.

Our data streamed on kafka is classic time series data composed of
{ sensor-id, timestamp, sensor-value} in JSON format.

Because some of our micro services keep state using Faust windows (e.g. for aggregation purposes), we partition the data using the sensor-id (i.e. use it as part of our kafka key), thus allowing us to calculate, for example, the average value of a sensor over a period of time, etc..

So looking at a sample payload, it would be something like this:

Kafka messages in our pipeline
Kafka messages in our pipeline

Simple enough.

Alas, sometimes just viewing the data (from Confluent UI, micro services logs, etc…) isn’t just good enough. Sometimes, for trouble shooting purposes, you may need to query this data the is streaming through your kafka topics.

KSQL DB may come in handy here. While KSQL DB has a lot of applicable use cases for which you can apply it, in our case we used it as a debugging tool, so to speak.

In a nutshell, KSQL DB is a stream processing engine, which allows you to process your data using SQL syntax. You can find this introduction video quite useful, as well as all these sessions in confluent developer web site.

So our initial thought was: “This is great! We can setup a KSQL DB instance in our confluent cloud environment, and use it to query the data on our topics!”

Well… creating a KSQL DB table from the topic using the sensor as the primary key is quite simple :

CREATE TABLE MyTable (
sensorId VARCHAR PRIMARY KEY,
timestamp VARCHAR,
value DECIMAL(19,5)
) WITH (
KAFKA_TOPIC = '<x>-telemetry',
VALUE_FORMAT = 'JSON'
);

And doing:

SELECT * FROM MyTable

was simple enough. So we just got all the data from the topic :-)
But what about filtering?

Deep-Dive Filtering

As it turned out, filtering using sensor ID was no brainer too:

SELECT * FROM MyTable WHERE sensorId = "127"

Cool.
But what about filtering using sensor ID and timestamp?

Suppose i want to see the values of sensor 127 from 12:00 to 13:00…
As it turns out, this proved to be a harder nut to crack.

The issue here is that the timestamp resides in in our Kafka value and NOT in our Kafka key.

Note that we CANNOT “move” \ “put” the timestamp in the kafka key, as it would cause telemetry records of the same sensor to be sent across different partitions, which would limit us from doing all of our stateful operations (aggregations, etc…) in our micro services.

… And KSQL DB does not support elements from the kafka value payload to be part of the primary key.

So what’s next?

(Re-)visiting KSQL DB streams and (re-)partitions

As it turns out, when working with KSQL DB streams, you can re-partition the stream, in a way which allows you to “surface up” the value payload (or parts of it).

Here’s how it would go in our use case:

Step 1 → create a stream from our topic:

CREATE STREAM MyStream (
sensorId VARCHAR KEY,
timestamp VARCHAR,
value DECIMAL(19,5)
) WITH (
KAFKA_TOPIC = '<x>-telemetry',
VALUE_FORMAT = 'JSON'
);

Step 2 → Repartition the stream:

CREATE STREAM MyStreamRepartitioned WITH (key_format='json') 
AS SELECT
struct(sensorId:=sensorId, timestamp:=timestamp) as myStruct,
value from MyStream PARTITION BY
struct(sensorId:=sensorId, timestamp:=timestamp);

Step 3 → Create a source table over the repartitioned stream:

CREATE SOURCE TABLE RepartitionedTable
(myStruct struct<sensorId VARCHAR, timestamp VARCHAR> PRIMARY key, value VARCHAR
) WITH (
KAFKA_TOPIC='<abc>REPARTITIONED',
VALUE_FORMAT='json',
KEY_FORMAT='json');

Note the <abc>REPARTITIONED topic name. This topic name is dynamically created by KSQL DB for the repartitioned stream in step 2.
It is visible in the list of topics in your Confluent cloud environment, so you can take it from there.

Now you can query according to the timestamp field too. Note that the query extracts the values from the JSON serving as the primary key in our source table:

SELECT EXTRACTJSONFIELD(myStruct -> sensorId, '$.sensorId') as sensor,  
myStruct -> timestamp as time,
VALUE as val
FROM RepartitionedTable
WHERE EXTRACTJSONFIELD(myStruct -> sensorId, '$.sensorId') = '127'
AND EXTRACTJSONFIELD(myStruct -> timestamp as time) >= '12:00'
EMIT CHANGES;

Final Note

The example above can be further improved & simplified, of course. I did not convert the timestamp and value into convenient formats (left them as strings…)., and the EXTRACTJSONFIELD operator is used too many times in the query ;-)

My main purpose in this post was to illustrate how with repartitioning you can use additional fields from the value payload as your primary key for queries. Hope this proved the point! :-)

--

--

Pini Faran
Machines talk, we tech.

Data engineer team lead. Has been around for 20+ years in software development. Seen quite a lot in C\C++\Java\Python\ML and more. Loves to share stuff!