Data Stream Processing for Newbies with Kafka, KSQL, and Postgres

A fully-reproducible, Dockerized, step-by-step tutorial for dipping your toes into the data stream.

Maria Patterson
Apr 8 · 8 min read
Let’s get streamin’.

With today’s terabyte and petabyte scale datasets and an uptick in demand for real-time analytics, traditional batch-oriented data processing doesn’t suffice. To keep up with the flow of incoming data, organizations are increasingly moving towards stream processing.

In batch processing, data are collected over a given period of time. These data are processed non-sequentially as a bounded unit, or batch, and pushed into an analytics system that periodically executes.

In stream processing, data are continuously processed as new data become available for analyzing. These data are processed sequentially as an unbounded stream and may be pulled in by a “listening” analytics system.

Though stream processing does not need to be real-time, it can enable data processing and analysis used by many real-time systems that may require fast reactions to incoming data — log or clickstream monitoring, financial analysis on a trading floor, or sensor data from an Internet of Things (IOT) device, for example.

Stream Processing Tools

Stream processing requires different tools from those used in traditional batch processing architecture. With large datasets, the canonical example of batch processing architecture is Hadoop’s MapReduce over data in HDFS. A number of new tools have popped up for use with data streams — e.g., a bunch of Apache tools like Storm / Twitter’s Heron, Flink, Samza, Kafka, Amazon’s Kinesis Streams, and Google DataFlow. And some tools are available for both batch and stream processing — e.g., Apache Beam and Spark. (Spark only sort of / kinda but I guess good enough. It is dropping support for its original streaming in favor of “structured streaming” for microbatch processing, which is non-ideal in some cases so I am pretend mad at it right now).

A Simple Recipe for Data Processing with Kafka and KSQL

My favorite new stream processing tool is Apache Kafka, originally a pub/sub messaging queue thought up by folks at LinkedIn and rebranded as a more general distributed data stream processing platform. Kafka takes data published by ‘producers’ (which may be, e.g., apps, files / file systems, or databases) and makes it available for ‘consumers’ subscribed to streams of different ‘topics.’ In my previous life as an astronomer, I did a lot of playing with Kafka for real-time distribution of alert data on new and changing astronomical object detection from some cool new telescopes [link for the curious].

Image pulled from: https://kafka.apache.org/intro.html

The Kafka ecosystem is growing in support and has been supplemented with the Kafka Streams system, for building streaming apps, and KSQL, a SQL-like stream interface. I like Kafka especially because of the availability of an API for user-friendly Python and its easy integration with many other tools via Kafka Connect.

Here I’ll outline a fully reproducible step-by-step tutorial on how to stream tables from Postgres to Kafka, perform calculations with KSQL, and sync results back to Postgres using Connect.

All materials are available on my GitHub at https://github.com/mtpatter/postgres-kafka-demo. To follow along, clone the repo to your local environment. You should need only Docker and docker-compose on your system.

Ingredients

We will be using the following technologies through Docker containers:

  • Kafka, the main data streaming platform
  • Zookeeper, Kafka’s sidekick used for managing consumers
  • KSQL server, which we will use to create live updating tables
  • Kafka’s schema registry, needed to use the Avro data format, a json-based binary format that enforces schemas on our data
  • Kafka Connect (pulled from Debezium), which will source and sink data back and forth to/from Postgres through Kafka
  • PostgreSQL (also pulled from Debezium and tailored for use with Connect)

Directions

The data used here were originally taken from the Graduate Admissions open dataset available on Kaggle. The admit csv files are records of students and test scores with their chances of college admission. The research csv files contain a flag per student for whether or not they have research experience.

1. Bring up the compute environment

docker-compose up -d

2. Load data into Postgres

We will bring up a container with a psql command line, mount our local data files inside, create a database called students, and load the data on students’ chance of admission into the admission table.

docker run -it — rm — network=postgres-kafka-demo_default \
-v $PWD:/home/data/ \
postgres:11.0 psql -h postgres -U postgres

Password = postgres

At the psql command line, create a database and connect:

CREATE DATABASE students;
\connect students;

Load the admission data table with:

CREATE TABLE admission
(student_id INTEGER, gre INTEGER, toefl INTEGER, cpga DOUBLE PRECISION, admit_chance DOUBLE PRECISION,
CONSTRAINT student_id_pk PRIMARY KEY (student_id));
\copy admission FROM ‘/home/data/admit_1.csv’ DELIMITER ‘,’ CSV HEADER

Load the research data table with:

CREATE TABLE research
(student_id INTEGER, rating INTEGER, research INTEGER,
PRIMARY KEY (student_id));
\copy research FROM ‘/home/data/research_1.csv’ DELIMITER ‘,’ CSV HEADER

3. Connect the Postgres database as a source to Kafka

The postgres-source.json file contains the configuration settings needed to sink all of the students database to Kafka:

{"name": "postgres-source",
"config": {"connector.class":"io.debezium.connector.postgresql.PostgresConnector",
"tasks.max":"1",
"database.hostname": "postgres",
"database.port": "5432",
"database.user": "postgres",
"database.password": "postgres",
"database.dbname" : "students",
"database.server.name": "dbserver1",
"database.whitelist": "students",
"database.history.kafka.bootstrap.servers": "kafka:9092",
"database.history.kafka.topic": "schema-changes.students",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "true",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"transforms": "unwrap",
"transforms.unwrap.type": "io.debezium.transforms.UnwrapFromEnvelope"
}
}

Submit the source file to Connect via curl:

curl -X POST -H “Accept:application/json” -H “Content-Type: application/json” --data @postgres-source.json http://localhost:8083/connectors

The connector postgres-source should show up when curling for the list of existing connectors:

curl -H “Accept:application/json” localhost:8083/connectors/

The two tables in the students database will now show up as topics in Kafka. You can check this by entering the Kafka container with the container ID (which you can get via docker ps);

docker exec -it <kafka-container-id> /bin/bash

and listing available topics:

/usr/bin/kafka-topics — list — zookeeper zookeeper:2181

4. Start KSQL

Bring up a KSQL server command line client as a container:

docker run — network postgres-kafka-demo_default \
— interactive — tty — rm \
confluentinc/cp-ksql-cli:latest \
http://ksql-server:8088

To see your updates, a few settings need to be configured by first running:

set ‘commit.interval.ms’=’2000';
set ‘cache.max.bytes.buffering’=’10000000';
set ‘auto.offset.reset’=’earliest’;

5. Mirror Postgres tables in KSQL

The Postgres table topics will be visible as dbserver1.public.admission and dbserver1.public.research in KSQL:

SHOW TOPICS;

We will create KSQL streams (a source stream subscribed to the corresponding Kafka topic and a rekeyed stream we need to populate a table) to auto update KSQL tables mirroring the Postgres tables. Since the data are sourced from Postgres via Connect, the data format will be set to Avro.

CREATE STREAM admission_src (student_id INTEGER, gre INTEGER, toefl INTEGER, cpga DOUBLE, admit_chance DOUBLE) \
WITH (KAFKA_TOPIC=’dbserver1.public.admission’, VALUE_FORMAT=’AVRO’);
CREATE STREAM admission_src_rekey WITH (PARTITIONS=1) AS \
SELECT * FROM admission_src PARTITION BY student_id;
SHOW STREAMS;CREATE TABLE admission (student_id INTEGER, gre INTEGER, toefl INTEGER, cpga DOUBLE, admit_chance DOUBLE)\
WITH (KAFKA_TOPIC=’ADMISSION_SRC_REKEY’, VALUE_FORMAT=’AVRO’, KEY=’student_id’);
SHOW TABLES;CREATE STREAM research_src (student_id INTEGER, rating INTEGER, research INTEGER)\
WITH (KAFKA_TOPIC=’dbserver1.public.research’, VALUE_FORMAT=’AVRO’);
CREATE STREAM research_src_rekey WITH (PARTITIONS=1) AS \
SELECT * FROM research_src PARTITION BY student_id;
CREATE TABLE research (student_id INTEGER, rating INTEGER, research INTEGER)\
WITH (KAFKA_TOPIC=’RESEARCH_SRC_REKEY’, VALUE_FORMAT=’AVRO’, KEY=’student_id’);

The table and stream names I’ve used above are lowercase, but currently KSQL will enforce uppercase casing convention for stream, table, and field names no matter what.

6. Create downstream tables

We will create a new KSQL streaming table to join students’ chance of admission with research experience.

CREATE TABLE research_boost AS \
SELECT a.student_id as student_id, \
a.admit_chance as admit_chance, \
r.research as research \
FROM admission a \
LEFT JOIN research r on a.student_id = r.student_id;

and another table calculating the average chance of admission for students with and without research experience:

CREATE TABLE research_ave_boost AS \
SELECT research, SUM(admit_chance)/COUNT(admit_chance) as ave_chance \
FROM research_boost \
WITH (KAFKA_TOPIC=’research_ave_boost’, VALUE_FORMAT=’delimited’, KEY=’research’) \
GROUP BY research;

7. Add a connector to sink a KSQL table back to Postgres

The postgres-sink.json configuration file will create a RESEARCH_AVE_BOOST table and send the data back to Postgres:

{"name": "postgres-sink",
"config": {"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"tasks.max":"1",
"topics": "RESEARCH_AVE_BOOST",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"connection.url": "jdbc:postgresql://postgres:5432/students?user=postgres&password=postgres",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "true",
"auto.create": "true",
"auto.evolve": "true",
"insert.mode": "upsert",
"pk.fields": "RESEARCH",
"pk.mode": "record_key"
}
}

Submit the sink file to Connect:

curl -X POST -H “Accept:application/json” -H “Content-Type: application/json” --data @postgres-sink.json http://localhost:8083/connectors

8. Update the source Postgres tables and watch the Postgres sink table update

The RESEARCH_AVE_BOOST table should now be available in Postgres to query:

SELECT “AVE_CHANCE” FROM “RESEARCH_AVE_BOOST”
WHERE cast(“RESEARCH” as INT)=0;

With these data the average admission chance will be 65.19%.

Add some new data to the admission and research tables in Postgres:

\copy admission FROM ‘/home/data/admit_2.csv’ DELIMITER ‘,’ CSV HEADER\copy research FROM ‘/home/data/research_2.csv’ DELIMITER ‘,’ CSV HEADER

With the same query above on the RESEARCH_AVE_BOOST table, the average chance of admission for students without research experience has been updated to 63.49%.

A few things to note:

  • The tables are forced by KSQL to uppercase and are case sensitive, which is annoying and also buggy. You can watch this KSQL GitHub issue for updates. The research field needs to be cast because it has been typed as text instead of integer (though it is integer type in KSQL), which may also be a bug in KSQL or Connect.
  • I use a Debezium PostgresConnector for my connector.class in my Connect source file and a Confluent JdbcSinkConnector for my connector.class in my Connect sink file. I tried both for source and sink, but this was the only configuration I could get to work correctly.

Wrapping Up

Now you should be in good shape to be able to start trying out KSQL with continuously running queries on your own database tables. Here we’ve walked through new data arriving via database table updates. Because we are using Kafka already, we could easily substitute Kafka producers publishing data directly to a Kafka topic or a continuously updating file (like a log, for example) to replace a Postgres table source. These data would appear as a stream available to KSQL just as above. There are a number of different Kafka connectors available for sourcing/sinking to/from databases, file systems, and even Twitter on the Confluent Hub.

Stream processing allows data analysis pipeline results to be continuously updated with the arrival of new data, which enables automation and scalability. This architecture is one of the many cool things we work with to build and scale analysis pipelines on the Data Science team at High Alpha.

If you have any comments/questions about data stream pipelines, feel free to drop me a line here or hit me up via Twitter @OpenSciPinay.


High Alpha is a venture studio pioneering a new model for entrepreneurship that unites company building and venture capital. To learn more, visit highalpha.com or subscribe to our newsletter.

High Alpha

Stories, resources, and ideas on the future of enterprise cloud from the High Alpha network and team. Learn more at www.highalpha.com.

Maria Patterson

Written by

Filipina data doofus, Cubs fan, distance runner, dachshund enthusiast, have code will Docker

High Alpha

Stories, resources, and ideas on the future of enterprise cloud from the High Alpha network and team. Learn more at www.highalpha.com.