Apache Kafka Streaming, count on SQL

Bocutiu Stefan
6 min readNov 14, 2017

--

Some of you might know there are two SQL solutions available for Apache Kafka. For the ones who don’t, well … you know now. There are quite a few differences between them. While L(enses)SQL engine offers topology view, full Avro support from day one, multiple execution modes (Kubernetes and Cluster targeting enterprise customers), joins supporting key or value fields, and a few extra things, I am not going to focus on the comparison. But rather I will show you how quick and easy it is to perform count aggregation on Kafka Streams using LSQL engine.

Count aggregation is a very common scenario in stream processing. Some typical use cases include aggregated time reports for the transactions a payment provider does, page views for any given product on an e-commerce website, how many customers are accessing a hotel booking system, and many more.

Count Kafka Streams records

Assume a global payments provider receiving a large amount of multi-currency transactions into a Kafka topic. Understanding their data volume requires access to real time reports counting the number of transactions at every 1-minute interval or counting the transactions per currency on a 1-minute interval. This empowers the business to react and take relevant actions.

Let’s take the first use case and write the LSQL query to implement the requirement:

SET autocreate=true; 

INSERT INTO `cc_payments_1m_freq`
SELECT STREAM
COUNT(*) AS `count`
FROM `cc_payments`
WHERE _ktype='BINARY'
AND _vtype='AVRO'
GROUP BY tumble(1, m)

The outcome of registering and running this SQL processor in Lenses can be seen below. Besides the flow topology view, you can also see the results as they are computed!

The count processor in Lenses.

But how does it work? -you might ask.

Lenses SQL engine is responsible for translating the code above into a Kafka Streams application while Lenses takes care of execution and scaling.

If I were to quickly explain the code I would say: it computes the number of records flowing through on each 1-minute interval and inserts the results into cc_payments_1m_freq topic.

First line of code instructs the engine to `autocreate` the target topic (cc_payments_1m_freq in this case) if it does not already exist. The next lines of code resemble an insert statement.

You might have spotted a few keywords which are not typical for a SQL syntax. The first one is the STREAM keyword; in Lenses SQL this means the query will translate into an instance of KStream. If you are not familiar with Kafka Streams DSL, a KStream is an abstraction of a record stream where each data record represents a self-contained datum in the unbounded data set. For further details, check out the Apache Kafka documentation.

The next two keywords, _ktype and _vtype, are LSQL specific. A Kafka message is made out of two parts: a key and a value; to read them it is required to provide a de-serializer. Through the syntax

_ktype='BINARY' AND _vtype='AVRO'

we let the engine know the key part is just an array of bytes, whereas the value part is an Avro record. In this example the cc_payments topic receives Avro records. However the same will work if Json or any other type of payload is used; all that is required is to change the _vtype value.

The resulting Kafka stream flow keeps a count for each message sent for a time window of 1 minute. The following part of the code instructs the engine to construct a Kafka stream to count all records received within a 1-minute interval.

GROUP BY tumble(1, m)

Windowing allows you to control how to group records with the same key for stateful operations (aggregations or joins). In this scenario we have used a tumbling window — such window models fixed-size, non-overlapping and gap-less windows.

Since we are not using any of the Avro record fields, an optimisation can be made. To speed up the stream processing we can use the binary decoder for the value part as well, thus avoiding the cost of de-serialising the Avro record.
Such an optimised LSQL code will be:

SET autocreate=true; 

INSERT INTO `cc_payments_1m_freq`
SELECT STREAM
COUNT(*) AS `count`
FROM `cc_payments`
WHERE _ktype='BINARY'
AND _vtype='BINARY'
GROUP BY tumble(1, m)

The calculated results should look similar to the entries below (of course the data volume will impact the results)

{"key":0, "timestamp":"2017-10-15 18:21:00"}      | 192
{"key":0, "timestamp":"2017-10-15 18:22:00"} |
92
{"key":0, "timestamp":"2017-10-15 18:23:00"} |
2001

Why are there records with the same key?

Using the LSQL above it is quite likely to see results such as:

{"key":0, "timestamp":"2017-10-15 18:21:00"}      | 192 
{"key":0, "timestamp":"2017-10-15 18:21:00"} |
602
{"key":0, "timestamp":"2017-10-15 18:23:00"} |
201
{"key":0, "timestamp":"2017-10-15 18:23:00"} |
831

You might think there is something wrong with counting the records. However, that is not the case. The rate of results depends on your input data rate, the number of parallel running Kafka Streams instances, and the configuration parameter `commit.interval.ms`.

If you are not tech savvy, you can skip this paragraph. The commit interval is the frequency to save the position of the processor in a Kafka Streams flow. Please consult the documentation if you are not familiar with the terminology. The default value is 30 seconds if you’re not using exactly-once semantics. This means the Kafka Streams processor punctuate method is called every 30 seconds. Maybe you think the solution is to set this value to 60 seconds, but there is a catch here. The processor punctuate interval is not aligned with the time-stamp on the records received on the stream. There is quite a bit of luck involved to have them matching! This means the result value for each window interval will be emitted twice, which doesn’t mean is double counting - far from it. But that means to get the count for each minute a sum is required and that is not optimal. To reduce the number of emitted records and get one result for each 1-minute interval you need to change the commit interval to be 120 seconds.

With the tech part covered, here is the LSQL code to achieve the count of transactions on a 1-minute interval:

SET autocreate=true; 
SET `commit.interval.ms`='120000';

INSERT INTO `cc_payments_1m_freq`
SELECT STREAM COUNT(*) AS `count`
FROM `cc_payments`
WHERE _ktype='BINARY'
AND _vtype='BINARY'
GROUP BY tumble(1, m)

How can I count on a field in the Kafka value payload?

Using the same example of payments, we can easily count the transactions at every 1-minute interval, for each currency involved in the payment request.
Here is the LSQL allowing you to quickly describe and execute such a Kafka Streams flow:

SET autocreate=true;
SET `auto.offset.reset`='latest';
SET `commit.interval.ms`='120000';

INSERT INTO `cc_payments_1m_ccy_freq`
SELECT STREAM
currency
, COUNT(*) AS `count`
into windowsFROM `cc_payments`
WHERE _ktype='BINARY'
AND _vtype='AVRO'
GROUP BY tumble(1, m)

This time it is import to define the _vtype since we are using the currency field. Aggregations in Kafka are always performed on the key part. Since we are accessing a field in the value part, we need to re-map the stream such that each new record contains the currency in the key part. Once that is completed, the aggregation is applied using the tumble window specified. The resulting count is written to the `cc_payments_1m_ccy_freq` topic every 120 seconds. Easy!

Counting on currency like we have seen above comes with a performance cost. A stream re-mapping involves an intermediary topic. For the currency example above we advise you to produce the records having the key containing the currency value.

Final Notes

Although counting aggregations is relatively easy to implement via code (yes, it requires you to be a Java/Scala/Clojure developer), it won’t be delivered faster than just registering a LSQL processor via a simple web interface and having deployment and scale out functionality provided out of the box.

You have learned in this article how easy and quick it is to do count aggregation on Kafka Streams by leveraging LSQL engine and Lenses platform. This combination allows you to focus on your business requirements and deliver your reports a lot faster.

The key information for you to take away from this entry is:

Lenses, as a platform, takes care of deployment, execution, scaling, and monitoring of Kafka Streams applications described through LSQL.

On future entries we will look at how you can do transformations and joins with Lenses SQL.

If you want to learn more about LSQL Engine and Lenses you can do so by reading the documentation here.

You can get Lenses for development for free. Just navigate to Landoop.com and download it!

--

--

Bocutiu Stefan

Head of Engineering @Lenses.io, Streaming platforms advocate