How to configure KSQL, a streaming SQL Engine on Apache Kafka

Vishwa Teja Vangari
Egen Engineering & Beyond
8 min readJul 15, 2019

KSQL, a SQL framework on Kafka for real time data analysis.

Apache Kafka, a core messaging system concept remains fairly stable over the time, but the frameworks around Kafka are evolving at rapid pace. Among many other frameworks on Kafka like Kafka Producers, Consumers, Kafka Connect, Kafka Streams, Spark Streams, etc. KSQL is very prominent framework for real time data analysis, configure alerts based on pre-defined rules and easy to define as SQL like queries, these can be easily pluggable with any time series database and further can be integrated with any real time alerting & dashboard frameworks like Grafana, Splunk for visualization.

This blog will focus more on configuring KSQL on Kafka, and understanding KSQL concepts and queries for real time data analysis and subsequent blog will be more on configuring Grafana visualization dashboards.

KSQL-Kafka Workflow

KSQL is a wrapper on top of Kafka Streams API. As we know, Kafka Streams involves coding, understanding some topologies and harder to read and write code. KSQL API is just an implementation on Kafka Streams, it makes it way easy to do similar things as that of streams, this only requires us to have only SQL knowledge, we would feel like writing plain SQL queries with SELECT clauses, WHERE clauses and JOIN conditions.

  • KSQL is the streaming SQL engine on Apache Kafka for data analysis.
  • KSQL is declarative stream processing language.

To get started with KSQL concepts, below are the available DDL and DML SQL statements using KSQL API. we could refer Confluent Documentation to know more in detail for each of these functionality:

  • CREATE STREAM
  • CREATE TABLE
  • CREATE STREAM AS SELECT
  • CREATE TABLE AS SELECT
  • DESCRIBE
  • EXPLAIN
  • DROP STREAM
  • DROP TABLE
  • PRINT
  • SELECT
  • SHOW TOPICS
  • SHOW STREAMS
  • SHOW TABLES
  • SHOW QUERIES
  • SHOW PROPERTIES
  • TERMINATE

KSQL Supported Data Types:

  • BOOLEAN
  • INTEGER
  • BIGINT
  • DOUBLE
  • VARCHAR (or STRING)
  • ARRAY<ArrayType> (JSON and AVRO only)
  • MAP<VARCHAR, ValueType> (JSON and AVRO only)

Setting up KSQL

KSQL Servers can be set up in separate machines or containers and are given configuration to access Kafka broker cluster. KSQL Cli will connect to KSQL servers using HTTP protocol.

KSQL Server and KSQL CLI

To run KSQL server and cli, we will use confluent supported docker images to spin up containers. we can refer below docker-compose-ksql.yml file to spin up all the required docker containers (zookeeper, kafka, schema-registry, ksql-server, ksql-cli and ksql-data generators).

Limitation: As of now, KSQL Cli can only connect to one KSQL server at a time. The KSQL Cli does not support automatic failover to another KSQL server.

To start all docker containers, open command line terminal and execute docker-compose -f docker-compose-ksql.yml up -d, to ensure all the containers started well, verify this by executing docker ps -a

In another terminal start KSQL Cli for writing KSQL queries,
docker-compose -f docker-compose-ksql.yml exec ksql-cli ksql http://ksql-server:8088

Execute show topics; on KSQL terminal. we would see three topics _schemas, pageviews, users. _schemas topic is used by schema registry for storing all the topics schemas and pageviews, users topics are used by ksql-datagen-pageview, ksql-datagen-users containers producing random pageviews and users data, we will use these topics for writing KSQL queries.

ksql> show topics;Kafka Topic | Registered | Partitions | Partition Replicas | Consumers | ConsumerGroups
-----------------------------------------------------------------------------------------
_schemas | false | 1 | 1 | 0 | 0
pageviews | false | 1 | 1 | 0 | 0
users | false | 1 | 1 | 0 | 0
-----------------------------------------------------------------------------------------

Now let’s see the data produced by ksql-datagen-pageviews and ksql-datagen-users by consuming data from beginning using console consumer.

❯ docker-compose exec kafka kafka-console-consumer  --bootstrap-server kafka:29092 --topic users —from-beginning                              
{"registertime":1491619254221,"userid":"User_6","regionid":"Region_9","gender":"OTHER"}
{"registertime":1496738051659,"userid":"User_8","regionid":"Region_8","gender":"MALE"}
{"registertime":1504516387265,"userid":"User_9","regionid":"Region_7","gender":"FEMALE"}
{"registertime":1496899358848,"userid":"User_2","regionid":"Region_6","gender":"FEMALE"}
{"registertime":1496783188672,"userid":"User_1","regionid":"Region_9","gender":"OTHER"}
{"registertime":1510565416470,"userid":"User_6","regionid":"Region_7","gender":"FEMALE"}
......
❯ docker-compose exec kafka kafka-console-consumer --bootstrap-server kafka:29092 --topic pageviews —from-beginning
1563073112727,User_2,Page_23
1563073113653,User_5,Page_15
1563073114497,User_2,Page_44
1563073115453,User_8,Page_78
1563073115542,User_9,Page_82
1563073115979,User_8,Page_19
1563073116534,User_7,Page_35
1563073117304,User_8,Page_38
1563073117733,User_5,Page_80
......

STREAM:
In KSQL, we can create streams from Kafka topics, and also we can create streams of query results from other streams. Let’s create a pageviews stream from pageviews kafka topic by specifying KSQL supported datatypes as below.

  • Use the CREATE STREAM statement to create a stream from a Kafka topic.
  • Use the CREATE STREAM AS SELECT statement to create a stream with query results from an existing table or stream.
ksql> CREATE STREAM pageviews
> (viewtime BIGINT,
> userid VARCHAR,
> pageid VARCHAR)
> WITH (KAFKA_TOPIC=’pageviews’,
> VALUE_FORMAT=’DELIMITED’);
Message
— — — — — — — —
Stream created
— — — — — — — —
ksql>

STREAM WITH KEY
As we specify key for Kafka topics, we could also specify key for streams. This key can be same as that of topic key or could be derived from other fields.

ksql> CREATE STREAM pageviews_withkey
> (viewtime BIGINT,
> userid VARCHAR,
> pageid VARCHAR)
> WITH (KAFKA_TOPIC='pageviews',
> VALUE_FORMAT='DELIMITED',
> KEY='pageid');
Message
----------------
Stream created
----------------

STREAM WITH TIMESTAMP
we could also create stream with timestamp, message timestamps are used for window-based operations like windowed aggregations, and to support event-time processing.

ksql> CREATE STREAM pageviews_timestamped
> (viewtime BIGINT,
> userid VARCHAR,
> pageid VARCHAR)
> WITH (KAFKA_TOPIC='pageviews',
> VALUE_FORMAT='DELIMITED',
> KEY='pageid',
> TIMESTAMP='viewtime');
Message
----------------
Stream created
----------------

SHOW STREAMS: show streams is used for listing all the existing streams in KSQL cluster.

ksql> show streams;Stream Name           | Kafka Topic | Format
-------------------------------------------------
PAGEVIEWS_TIMESTAMPED | pageviews | DELIMITED
PAGEVIEWS | pageviews | DELIMITED
PAGEVIEWS_WITHKEY | pageviews | DELIMITED
-------------------------------------------------

STREAM WITH WHERE CLAUSE: where clause is used to specify conditions, in our example lets create pageviews_intro stream with pageid < ‘Page_20’ condition. Also lets see the result of the query for pageid less than 20, using PRINT STREAM, which is used to print the stream logs on console.

ksql> CREATE STREAM pageviews_intro AS
> SELECT * FROM pageviews
> WHERE pageid < 'Page_20';
Message
----------------------------
Stream created and running
----------------------------
ksql> PRINT pageviews_intro;
Format:STRING
7/14/19 3:43:19 AM UTC , 60411 , 1563075799952,User_4,Page_14
7/14/19 3:43:29 AM UTC , 60581 , 1563075809088,User_6,Page_18
7/14/19 3:43:32 AM UTC , 60651 , 1563075812370,User_9,Page_13
7/14/19 3:43:39 AM UTC , 60771 , 1563075819012,User_1,Page_15
7/14/19 3:43:40 AM UTC , 60791 , 1563075820027,User_8,Page_11
^CTopic printing ceased

SHOW QUERIES It is used for list all the running queries (these would be stream out of a stream or or some where condition queries on stream).

ksql> SHOW QUERIES;Query ID               | Kafka Topic     | Query String
-----------------------------------------------------------------------------
CSAS_PAGEVIEWS_INTRO_0 | PAGEVIEWS_INTRO | CREATE STREAM pageviews_intro AS
SELECT * FROM pageviews
WHERE pageid < 'Page_20';
-----------------------------------------------------------------------------
For detailed information on a Query run: EXPLAIN <Query ID>;
ksql>

CREATE TABLE

  • Use the CREATE TABLE statement to create a table from a Kafka topic.
  • Use the CREATE TABLE AS SELECT statement to create a table with query results from an existing table or stream.

Unlike Stream, it is mandatory to specify key for KSQL Table. As Table represents state at any point of time, it uses the key to compute that state.

ksql> CREATE TABLE users
> (registertime BIGINT,
> gender VARCHAR,
> regionid VARCHAR,
> userid VARCHAR,
> interests array<VARCHAR>,
> contactinfo map<VARCHAR, VARCHAR>)
> WITH (KAFKA_TOPIC='users',
> VALUE_FORMAT='JSON',
> KEY = 'userid');
Message
---------------
Table created
---------------

Inspect the table by using the SHOW TABLES and DESCRIBE statements:

ksql> SHOW TABLES;Table Name | Kafka Topic | Format | Windowed
----------------------------------------------
USERS | users | JSON | false
----------------------------------------------
ksql> DESCRIBE USERS;

Name : USERS
Field | Type
---------------------------------------------
ROWTIME | BIGINT (system)
ROWKEY | VARCHAR(STRING) (system)
REGISTERTIME | BIGINT
GENDER | VARCHAR(STRING)
REGIONID | VARCHAR(STRING)
USERID | VARCHAR(STRING)
INTERESTS | ARRAY<VARCHAR(STRING)>
CONTACTINFO | MAP<STRING, VARCHAR(STRING)>
---------------------------------------------
For runtime statistics and query details run: DESCRIBE EXTENDED <Stream,Table>;

TABLE vs STREAM

A stream in Kafka records the full history of world events from the beginning of time to today. As we go from today to tomorrow, new events are constantly being added to the world’s history. This history is a sequence or “chain” of events, so you know which event happened before another event.
Stream records history, sequence of all the events.

A table in Kafka is the state of the world today. It represents the present. It is an aggregation of the history of world events, and this aggregation is changing constantly as we go from today to tomorrow.
Table represents the state, at particular point of time.

If we have to do some aggregations like average, mean, maximum, minimum, etc for any such aggregation queries, then go with KSQL Table, and if we want to have series of events, a dashboard/analysis showing the change, then go with KSQL Stream.

JOIN Clause

We can join streams and tables in these ways:

  • Join two streams to create a new stream.
  • Join two tables to create a new table.
  • Join a stream and a table to create a new stream.
ksql> CREATE STREAM pageviews_enriched AS
> SELECT users.userid AS userid, pageid, regionid, gender FROM pageviews
> LEFT JOIN users ON pageviews.userid = users.userid;
Message
----------------------------
Stream created and running
----------------------------
ksql> SELECT * FROM pageviews_enriched;
1563079177384 | User_1 | User_1 | Page_90 | Region_4 | MALE
1563079177756 | User_1 | User_1 | Page_91 | Region_4 | MALE
1563079177967 | User_9 | User_9 | Page_23 | Region_3 | OTHER
1563079178115 | User_9 | User_9 | Page_49 | Region_3 | OTHER
1563079178804 | User_4 | User_4 | Page_66 | Region_5 | MALE
1563079179230 | User_3 | User_3 | Page_46 | Region_3 | MALE

AGGREGATION queries like GROUP BY clause

ksql> CREATE TABLE pageviews_per_region AS  SELECT regionid,  count(*)  FROM pageviews_enriched  GROUP BY regionid;Message
---------------------------
Table created and running
---------------------------
ksql> SELECT * FROM pageviews_per_region;
1563079826153 | Region_4 | Region_4 | 12
1563079825358 | Region_5 | Region_5 | 12
1563079825453 | Region_8 | Region_8 | 3
1563079827106 | Region_4 | Region_4 | 13
1563079827808 | Region_7 | Region_7 | 11
1563079829354 | Region_8 | Region_8 | 4
1563079829634 | Region_6 | Region_6 | 4
1563079829263 | Region_7 | Region_7 | 12
1563079830602 | Region_2 | Region_2 | 3

TERMINATE Deletes a running query by TERMINATE statement.

ksql> TERMINATE CSAS_PAGEVIEWS_INTRO_0;Message
-------------------
Query terminated.
-------------------
ksql> SHOW QUERIES;
Query ID | Kafka Topic | Query String
---------------------------------------
---------------------------------------
For detailed information on a Query run: EXPLAIN <Query ID>;

DROP STREAM:

ksql> DROP STREAM pageviews_intro;
Message
--------------------------------------
Source PAGEVIEWS_INTRO was dropped.
--------------------------------------

DROP TABLE In order to Drop table, we will have to terminate queries, which are using it, if not we would get below error, Terminate Query and then drop the table.

ksql> DROP TABLE users;
Cannot drop USERS.
The following queries read from this source: [CSAS_PAGEVIEWS_ENRICHED_1].
The following queries write into this source: [].
You need to terminate them before dropping USERS.
ksql> TERMINATE CSAS_PAGEVIEWS_ENRICHED_1;Message
-------------------
Query terminated.
-------------------
ksql> DROP TABLE users;
Message
----------------------------
Source USERS was dropped.
----------------------------

SHOW PROPERTIES:

KSQL Properties

Summary:

In this blog, we have seen some KSQL concepts and ways we could create tables, streams from Kafka topics, difference between stream and table, and also some KSQL queries on streams and tables using KSQL Client. If you want to read more about KSQL refer this documentation.

If you find this blog helpful, be sure to give it a few claps, read more or follow me on LinkedIn

--

--