The Power Of Streaming ETL — Flatten JSON With ksqlDB

Bhuvanesh
Bhuvanesh
Jan 8 · 6 min read

TL;DR Flatten and filter Debezium MySQL connector output from a nested JSON.

Image Credit: confluent

In the Data Analytics world, ETL is the kickstart point where the data is getting enriched and goes to further analytics. But the world is asking the real-time reports. We don’t have time to perform the ETL as a part of the analytics. That's why we are moving into the solutions where we can do the ETL on the fly during streaming the data. Recently we explored the ksqlDB which is the upgraded version of KSQL from Confluent. In this blog, we have written how to flatten the Debezium output from Nested JSON.

Our scenario:

We are capturing the CDC from MySQL Database using Debezium MySQL Connector. The output of the debezium is in the nested JSON format.

Eg: inserting a row in MySQL.

insert into inventory.customers (id, fn, ln, phone) values
(1,'Sally','Thomas',1111);

we’ll get the following output in Debezium.

{
"schema": {
"type": "struct",

The changed data is captured in payload: { after: { For analytics, we may want only the new row information and timestamp value from the source:{} and the type of operation from op .

Expected JSON Output:

{
"id": 1,
"fn": "Sally",
"ts_sec": 0,
"op": "c"
}

A short intro for KSQLDB:

KsqlDB is an ETL component, with ksqldb we can do the ETL with the SQL queries like extracting data from nested JSON, Array. And we can get the aggregated results from an every n seconds window. Everything is in realtime. Once the ETL is done, then the results will be published to a new Kafka topic. If you are familiar with AWS Kinesis, it's equivalent to Kinesis Analytics.

Streams:

Streams are the one who is responsible for the ETL. It’ll read the data from the kafka topic and start applying the transformation. We need to create one more stream one the existing stream to publish the transformed data to Kafka's topic. If you are from a relational database background, Streams are tables and another stream that you are going to create is like a view. (But ksqldb is also having the terms tables, so don’t confuse with them).

Configure the Debezium MySQL Connector:

  1. Hit here to read configure debezium with AWS MSK.
  2. This link is to configure debezium with self managed Kafka clusters.

MySQL Connector config file:

{
"connector.class": "io.debezium.connector.mysql.MySqlConnector",
"snapshot.locking.mode": "none",
"tasks.max": "1",
"database.history.kafka.topic": "schema-changes.mysql",
"database.whitelist": "bhuvi",
"database.user": "bhuvi",
"database.server.id": "1",
"database.history.kafka.bootstrap.servers": "KAFKA-BORKER-IP:9092",
"database.server.name": "mysql-db01",
"database.port": "3306",
"database.hostname": "MYSQL-IP-ADDRESS",
"database.password": "*****",
"name": "mysql-connector-db01",
"snapshot.mode": "initial",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"key.converter.schemas.enable": "false",
"value.converter.schemas.enable": "false",
"internal.key.converter": "org.apache.kafka.connect.json.JsonConverter",
"internal.value.converter": "org.apache.kafka.connect.json.JsonConverter",
"internal.key.converter.schemas.enable": "false",
"internal.value.converter.schemas.enable": "false"
}

Install ksqldb:

Its a part of the Confluent Kafka Platform. So you can get it from the confluent repo.

apt-get update 
sudo apt-get install default-jrewget -qO - https://packages.confluent.io/deb/5.3/archive.key | sudo apt-key add -
sudo add-apt-repository "deb [arch=amd64] https://packages.confluent.io/deb/5.3 stable main"
sudo apt-get update && sudo apt-get install confluent-ksql

Create a service file for ksql:

vi /lib/systemd/system/ksql-server.service

Start the KSQL server:

systemctl enable ksql-server.service
systemctl start ksql-server
systemctl status ksql-server

Create a table in MySQL:

create database bhuvi;
use bhuvi;
create table rohi (
id int,
fn varchar(10),
ln varchar(10),
phone int
);

Then the debezium will push the 2 rows to the Kafka topic.

kafka-console-consumer --bootstrap-server KAFKA-IP:9092 --topic mysql-db01.bhuvi.rohi

Create a stream for Transformation:

Flatten the JSON:

ksql> CREATE STREAM rawdata_mysql_table
(before STRUCT<
id int,
fn VARCHAR,
ln VARCHAR,
phone int>,
after STRUCT<
id int,
fn VARCHAR,
ln VARCHAR,
phone int>,
source STRUCT<
ts_ms bigint>,
op varchar)
WITH (KAFKA_TOPIC='mysql-db01.bhuvi.rohi', VALUE_FORMAT='JSON');

Read the flatten Data with SQL:

ksql> SELECT before->id    AS before_id, 
before->fn AS before_fn,
before->ln AS before_ln ,
before->phone AS before_phone,
after->id AS id,
after->fn AS fn,
after->ln AS ln,
after->phone AS phone ,
source->ts_ms AS ts_ms ,
op AS op
FROM rawdata_mysql_table ;

Try to insert a row, then in the KSQL window, you can see the flattened output.

mysql> insert into rohi values (100, 'rohit', 'ayare','87611');

Data in Kafka topic:

{
"before": null,
"after": {
"id": 100,
"fn": "rohit",
"ln": "ayare",
"phone": 87611
},
"source": {
"version": "1.0.0.Final",
"connector": "mysql",
"name": "mysql-db01",
"ts_ms": 1578455264000,
"snapshot": "false",
"db": "bhuvi",
"table": "rohi",
"server_id": 1,
"gtid": null,
"file": "mysql-bin.000009",
"pos": 1464,
"row": 0,
"thread": 14,
"query": null
},
"op": "c",
"ts_ms": 1578455264796
}

Data in KSQLDB Stream:

Stream the ksql results to a new stream:

To publish the flattened output to Kafka, we need to create one more Stream on top of the existing stream.

CREATE stream ksql_flatten_table AS
SELECT before->id AS before_id,
before->fn AS before_fn,
before->ln AS before_ln ,
before->phone AS before_phone,
after->id AS id,
after->fn AS fn,
after->ln AS ln,
after->phone AS phone ,
source->ts_ms AS ts_ms ,
op AS op
FROM rawdata_mysql_table ;

It’ll create a topic in kafka, and it’ll continuously read the data from source steam, then transform it and publish it to the output topic.

-- See the output topic in Kafka
kafka-topics --zookeeper localhost:2181 --list

Now insert a new row in MySQL.

mysql> insert into rohi values (200, 'searce', 'inc',123123);

See the flatten data from the output topic.

kafka-console-consumer --bootstrap-server localhost:9092 --topic KSQL_FLATTEN_TABLE --from-beginning

Hope this blog post helps you to understand the basic transformation with ksqldb. We can do a lot more things with ksqldb. You can add and remove any number of KSQLDB servers without the downtime and distribute your workload across multiple nodes. Go through the documentation and unleash the power of streaming ETL.

Next steps:

  1. Deploy the KSQLDB Servers in the cluster.
  2. Concepts of KSQLDB.
  3. Developer Guide for KSQLDB sql syntax and functions.
  4. Kafka Streams and ksqlDB Compared
  5. Exploring ksqlDB with Twitter Data

Searce Engineering

We identify better ways of doing things!

Bhuvanesh

Written by

Bhuvanesh

BigData | Cloud &Database Architect | blogger thedataguy.in

Searce Engineering

We identify better ways of doing things!

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade