Introducing KSQLDB!!! Database on streams

Abhishek Giri
DataPebbles
Published in
5 min readMay 6, 2022

What is Ksql DB?

“KsqlDB is the streaming SQL database for Apache Kafka to write a real-time application in SQL.” It is used to build Kafka-native applications for processing streams of real-time data. KsqlDB stores events immutably in Kafka by using a simple key/value model. It is distributed, real-time, and scalable.ksqlDB combines real-time stream processing with lightweight SQL syntax.

What ksqlDB offers:

  • Streams and Tables
  • Materialized view
  • Push Queries
  • Pull Queries
  • Connect

Streams and Tables: A stream is an immutable, append-only sequence of events that represents the history of changes. The table contains the current status of the events, which is the result of many changes. ksqlDB creates the relations with Kafka topic data by creating schema on the top of it.

Materialized view: Converting the stream of events into a table with all the required changes is called materializing. A materialized view is also known as stateful aggregation.

Push Queries: A push query is a form of query issued by a client that subscribes to a result as it changes in real-time. It’s a continuous query that pushes the incremental result to the client in real-time. Push queries enable you to query a stream or materialized table with a subscription to the results

Pull Queries: A pull query is a form of query issued by a client that retrieves a result as of “now”, like a query against a traditional RDBMS. Pull queries enable you to fetch the current state of a materialized view. Because materialized views are incrementally updated as new events arrive.

Connect: Kafka connect is an open-source component of Apache Kafka. It loads and exporting of data from an external system to Kafka. Ksql provides the functionality to create, describe, and import topics from connect to ksqlDB.

Let's see a small hands-on exercise, where we will find out the latest location of the knight riders.

We will be using docker-compose to create an instance of the zookeeper, Kafka broker, and ksqlDB in standalone mode.

Once every component of Kafka is up and running, you will be able to see logs as shown below:

To check each and every component is up and running

docker-compose ps

Now we need to connect to ksqlDB cli in order to issue queries

docker exec -it ksqldb-cli ksql http://ksqldb-server:8088

After the successful execution of the command, you will be able to see ksql cli

Now we will create a stream of riderLocations, which will be associated with the underlying Kafka topic.

CREATE STREAM riderLocations (profileId VARCHAR, latitude DOUBLE, longitude DOUBLE) WITH (kafka_topic='locations', value_format='json', partitions=1);

kafka_topic: Name of the Kafka topic underlying the stream.
value_format: Encoding of the messages stored in the Kafka topic.
partitions: Number of partitions to create for the locations topic. This is optional if the topic already exists.

Create Materialized view

Now we will create Materialized view in the form of a table to get the latest data, which will keep track of the location of the riders. We will be using LATEST_BY_OFFSET which will return the latest value of the specified column.

In the second query, it will create a materialized view which will give the location of the rider within specified numbers.

CREATE TABLE currentLocation AS   SELECT profileId,          LATEST_BY_OFFSET(latitude) AS la,          LATEST_BY_OFFSET(longitude) AS lo   FROM riderlocations   GROUP BY profileId   EMIT CHANGES;CREATE TABLE ridersNearLeiden AS   SELECT ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1) AS distanceInMiles, COLLECT_LIST(profileId) AS riders,          COUNT(*) AS count   FROM currentLocation   GROUP BY ROUND(GEO_DISTANCE(la, lo, 37.4133, -122.1162), -1);

Run a push query

Push query is a continuous stream of updates to the ksqlDB. The result of this statement isn’t persisted in a Kafka topic and is printed out only in the console, or returned to the client.

SELECT * FROM riderLocations WHERE GEO_DISTANCE(latitude, longitude, 37.4133, -122.1162) <= 5 EMIT CHANGES;

Now we are going to push some data to riderLocations stream so that it starts generating the output.

Start another session of ksqDB cli and run the below INSERT query

INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('c2309eec', 37.7877, -122.4205); 
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('18f4ea86', 37.3903, -122.0643);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ab5cbad', 37.3952, -122.0813);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('8b6eae59', 37.3944, -122.0813);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4a7c7b41', 37.4049, -122.0822);
INSERT INTO riderLocations (profileId, latitude, longitude) VALUES ('4ddad000', 37.7857, -122.4011);

Pull query

Pull query fetch the current value from materialized view and terminates. The result of this statement is not persisted in a Kafka topic and is printed out only in the console.

This query will return all the rider's profiles within a distance of 10miles.

SELECT * from ridersNearLeiden WHERE distanceInMiles <= 10;

This was the short and basic hands-on introduction with KsqlDB.KsqlDB can be used to create real-time value by processing data in motion rather than data at rest. Also, it provides the stream processing architecture with SQL statements.

For any queries, please reach out to me @ agiri@datapebbles.com

Reference link: https://docs.ksqldb.io/en/latest/developer-guide/ksqldb-reference/

--

--