IDX Stock — Real Time Data Streaming with Kafka Raft

dmitri yanno mahayana
19 min readSep 19, 2023

--

Kafka Ecosystem

Apache Kafka

Apache Kafka, a powerful and widely adopted open-source stream processing platform, is designed to handle the real-time processing of data streams efficiently and reliably. At its core, Kafka functions as a distributed messaging system, allowing various applications to publish and consume streams of data in a fault-tolerant and scalable manner.

It employs a publish-subscribe model, where producers send data to topics, and consumers subscribe to those topics to receive data. Kafka’s durability and fault tolerance are ensured by its distributed architecture, which replicates data across multiple broker nodes. This replication also supports high availability and data redundancy. Additionally, Kafka boasts excellent throughput and low-latency data processing, making it an ideal choice for use cases like real-time analytics, event sourcing, and log aggregation. Its ecosystem includes connectors and stream processing libraries like Kafka Streams and Apache Flink, further extending its capabilities.

Overall, Apache Kafka has become a cornerstone technology for building data pipelines and enabling real-time data processing in modern, data-driven applications.

IDX Stock API

Normally, you need to retrieve real-time stock exchange API using IDX Data Services (https://data.idx.co.id/). With this API we can get real-time transactions during market hours and we can filter the ticker, date, history, etc. For the sake of learning purposes, we are using GoAPI (https://goapi.id/p/data-saham-indonesia) to retrieve the stock exchange transaction. But remember, this API is under development and it is not a real-time transaction but what we can do is perform infinite looping to call the API.

Before starting the development, let’s register GoAPI (https://app.goapi.id/register) and retrieve the API-Key. GoAPI will give you limited monthly quota and we want to make sure if we use it efficiently. Hence, let’s use this endpoints as our data source.

1. Get Trending Stock:
https://api.goapi.id/v1/stock/idx/trending
Parameter: API-Key

2. Get Companies:
https://api.goapi.id/v1/stock/idx/companies
Parameter: API-Key

3. Get History Stock:
https://api.goapi.id/v1/stock/idx/BBCA/historical?from=2023-08-08&to=2023-09-22
Parameter: API-Key, Ticker, From, and To (YYYY-MM-DD)

You can find more GoAPI using this documentation is in here

Kafka Architect

We will have 3 brokers, 1 schema, and 1 ksqlDB server inside our Kafka cluster. This Kafka cluster will be deployed using Docker, and normally we will use Amazon ECS, EKS, or Fargate to orchestrate our container. We must prepare 1 Kafka Producer to capture API data from GoAPI and pass it to the Kafka Cluster to perform decoupling, de-duplicate, etc related to real time data processing.

Kafka Ecosystem

Additionally, we add Schema Registry to enable schema evolving and ksqlDB as a presentation layer to perform data transformation and sink the data to MongoDB.

We will talk the Kafka Monitoring and Machine Learning Model in separate tutorial.

KRAFT — Kafka Raft

KRaft becoming production-ready in version 3.3.0. In the latest release, this feature come up to overcome the challenges of Zookeeper in Kafka (duplication of work, prone of error, scalability limit, etc.), then ZooKeeper can be replaced by an internal Raft quorum of controllers.

Assuming we have real-time source using GoAPI and we need to define how many broker, partition, and replication factor.

  • Replication factor should be at least 2 and a maximum of 4. The recommended number is 3 as it provides the right balance between performance and fault tolerance, and usually cloud providers provide 3 data centers / availability zones to deploy to as part of a region [source]. That means minimum broker 3 are required to run in our cluster.
  • If you have a small cluster of fewer than 6 brokers, create partition with three times, i.e., 3X, the number of brokers you have. The reasoning behind it is that if you have more brokers over time, you will have enough partitions to cover that. And if you have a big cluster of over 12 brokers, create two times i.e., 2X, the number of brokers you have. Since we have only 3 brokers, that means we need 9 partitions. This partition number will be updated under topic creation process.

Let’s build our first docker-compose using this KRAFT config.

version: "3.5"
services:

kafka-gen:
image: confluentinc/cp-kafka:7.3.3
hostname: kafka-gen
container_name: kafka-gen
volumes:
- ./scripts/create_cluster_id.sh:/tmp/create_cluster_id.sh
- ./clusterID:/tmp/clusterID
command: "bash -c '/tmp/create_cluster_id.sh'"

kafka1:
image: confluentinc/cp-kafka:7.3.3
container_name: kafka1
ports:
- "39092:39092"
- "49092:49092"
environment:
KAFKA_NODE_ID: 1
KAFKA_JMX_PORT: 49092
KAFKA_ADVERTISED_LISTENERS: BROKER://kafka1:19092,EXTERNAL://localhost:39092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093'
KAFKA_LISTENERS: BROKER://kafka1:19092,EXTERNAL://kafka1:39092,CONTROLLER://kafka1:9093
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_PROCESS_ROLES: 'controller,broker'
TOPIC_AUTO_CREATE: 'false'
# 2 Days of Retention
KAFKA_LOG_RETENTION_HOURS: 48
# For testing small segments 16MB and retention of 128MB
KAFKA_LOG_SEGMENT_BYTES: 16777216
KAFKA_LOG_RETENTION_BYTES: 134217728
volumes:
- kafka1-data:/var/lib/kafka/data
- ./scripts/update_run.sh:/tmp/update_run.sh
- ./clusterID:/tmp/clusterID
command: "bash -c '/tmp/update_run.sh && /etc/confluent/docker/run'"
restart: always

kafka2:
image: confluentinc/cp-kafka:7.3.3
container_name: kafka2
ports:
- "39093:39093"
- "49093:49093"
environment:
KAFKA_NODE_ID: 2
KAFKA_JMX_PORT: 49093
KAFKA_ADVERTISED_LISTENERS: BROKER://kafka2:19093,EXTERNAL://localhost:39093
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093'
KAFKA_LISTENERS: BROKER://kafka2:19093,EXTERNAL://kafka2:39093,CONTROLLER://kafka2:9093
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_PROCESS_ROLES: 'controller,broker'
TOPIC_AUTO_CREATE: 'false'
# 2 Days of Retention
KAFKA_LOG_RETENTION_HOURS: 48
# For testing small segments 16MB and retention of 128MB
KAFKA_LOG_SEGMENT_BYTES: 16777216
KAFKA_LOG_RETENTION_BYTES: 134217728
volumes:
- kafka2-data:/var/lib/kafka/data
- ./scripts/update_run.sh:/tmp/update_run.sh
- ./clusterID:/tmp/clusterID
command: "bash -c '/tmp/update_run.sh && /etc/confluent/docker/run'"
restart: always

kafka3:
image: confluentinc/cp-kafka:7.3.3
container_name: kafka3
ports:
- "39094:39094"
- "49094:49094"
environment:
KAFKA_NODE_ID: 3
KAFKA_JMX_PORT: 49094
KAFKA_ADVERTISED_LISTENERS: BROKER://kafka3:19094,EXTERNAL://localhost:39094
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: BROKER:PLAINTEXT,EXTERNAL:PLAINTEXT,CONTROLLER:PLAINTEXT
KAFKA_CONTROLLER_QUORUM_VOTERS: '1@kafka1:9093,2@kafka2:9093,3@kafka3:9093'
KAFKA_LISTENERS: BROKER://kafka3:19094,EXTERNAL://kafka3:39094,CONTROLLER://kafka3:9093
KAFKA_LOG_DIRS: '/tmp/kraft-combined-logs'
KAFKA_INTER_BROKER_LISTENER_NAME: BROKER
KAFKA_CONTROLLER_LISTENER_NAMES: CONTROLLER
KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
KAFKA_PROCESS_ROLES: 'controller,broker'
TOPIC_AUTO_CREATE: 'false'
# 2 Days of Retention
KAFKA_LOG_RETENTION_HOURS: 48
# For testing small segments 16MB and retention of 128MB
KAFKA_LOG_SEGMENT_BYTES: 16777216
KAFKA_LOG_RETENTION_BYTES: 134217728
volumes:
- kafka3-data:/var/lib/kafka/data
- ./scripts/update_run.sh:/tmp/update_run.sh
- ./clusterID:/tmp/clusterID
command: "bash -c '/tmp/update_run.sh && /etc/confluent/docker/run'"
restart: always

volumes:
kafka1-data:
kafka2-data:
kafka3-data:

networks:
default:
name: my_docker_network
external: true

Schema Registry

Schema Registry provides a centralized repository for managing and validating schemas for topic message data, and for serialization and deserialization of the data over the network. Producers and consumers to Kafka topics can use schemas to ensure data consistency and compatibility as schemas evolve. Schema Registry is a key component for data governance, helping to ensure data quality, adherence to standards, visibility into data lineage, audit capabilities, collaboration across teams, efficient application development protocols, and system performance.

Confluent Schema Registry supports Avro, JSON Schema, and Protobuf serializers and deserializers (serdes). When you write producers and consumers using these supported formats, they handle the details of the wire format for you, so you don’t have to worry about how messages are mapped to bytes.

Let’s update previous docker-compose with Schema Registry container.

schema-registry:
image: confluentinc/cp-schema-registry
hostname: schema-registry
container_name: schema-registry
depends_on:
- kafka1
- kafka2
- kafka3
ports:
- "8282:8282"
- "1088:1088"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: kafka1:19092,kafka2:19093,kafka3:19094
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8282
SCHEMA_REGISTRY_JMX_OPTS:
-Djava.rmi.server.hostname=localhost
-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.port=1088
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.rmi.port=1088
restart: always

Kafka SQL Database — ksqlDB

ksqlDB is a database that’s purpose-built for stream processing applications. It consolidates the many components found in virtually every stream processing architecture. ksqlDB aims to provide one mental model for doing everything you need. You can build a complete streaming app against ksqlDB, which in turn has just one dependency: Apache Kafka.

ksqlDB can integrate with Confluent Schema Registry. ksqlDB automatically retrieves (reads) and registers (writes) schemas as needed, which spares you from defining columns and data types manually in CREATE statements and from manual interaction with Schema Registry.

Let’s update again our docker-compose with KSQLDB Container that uses Schema Registry.

ksqldb-server:
image: confluentinc/ksqldb-server:0.29.0
hostname: ksqldb-server
container_name: ksqldb-server
depends_on:
- kafka1
- kafka2
- kafka3
- schema-registry
ports:
- "9088:9088"
- "1099:1099"
environment:
KSQL_LISTENERS: http://0.0.0.0:9088
KSQL_BOOTSTRAP_SERVERS: kafka1:19092,kafka2:19093,kafka3:19094
KSQL_KSQL_SCHEMA_REGISTRY_URL: http://schema-registry:8282
KSQL_KSQL_LOGGING_PROCESSING_STREAM_AUTO_CREATE: "true"
KSQL_KSQL_LOGGING_PROCESSING_TOPIC_AUTO_CREATE: "true"
KSQL_KSQL_QUERY_PULL_METRICS_ENABLED: "true"
KSQL_JMX_OPTS:
-Djava.rmi.server.hostname=localhost
-Dcom.sun.management.jmxremote
-Dcom.sun.management.jmxremote.port=1099
-Dcom.sun.management.jmxremote.authenticate=false
-Dcom.sun.management.jmxremote.ssl=false
-Dcom.sun.management.jmxremote.rmi.port=1099
restart: always

ksqldb-cli:
image: confluentinc/ksqldb-cli:0.29.0
container_name: ksqldb-cli
depends_on:
- ksqldb-server
entrypoint: /bin/sh
tty: true
restart: always

Kafka Monitoring

Java Management Extensions (JMX) and Managed Beans (MBeans) are technologies for monitoring and managing Java applications, and they are enabled by default for Kafka and provide metrics for its components; brokers, controllers, producers, and consumers.

You see many JMX configurations in our docker-compose file, but we are not using it at all because we will discuss this in the Kafka Monitoring section part later.

Run Docker Compose

Once we have completed configuration in docker-compose file, we can run this command line in our computer.

docker compose up -d

It will download all images and build the containers under our docker virtual machine. Wait until all containers are ready and running before proceed to the next step.

Kafka Schema Creation

We need to create 2 Kafka schemas via API and use the API tool Postman to perform the POST method. You can use other tools like Katalon, JMeter, SOAPUI, etc. as long as you can hit the API your Kafka schema server. You need to put the schema content under the body request, and our schema is using Avro format. You can change this into other format like Protobuf or JSON, but I prefer to use Avro format because it uses a compact binary encoding format that is highly efficient in terms of both space and speed. It’s more space-efficient and making it suitable for scenarios where bandwidth or storage space is a concern. So, please make sure you understand how to play with the Avro format.

  1. POST URL: http://localhost:8282/subjects/IDX-Stock/versions/

Use Body request below:

{ "schema":
"{ \"type\": \"record\",\"name\": \"Stock\", \"fields\": [{\"name\": \"id\", \"type\":[\"null\",\"string\"],\"default\":null}, {\"name\": \"ticker\", \"type\":[\"null\",\"string\"],\"default\":null}, {\"name\": \"date\", \"type\":[\"null\",\"string\"],\"default\":null}, {\"name\": \"open\", \"type\":[\"null\",\"double\"],\"default\":null}, {\"name\": \"high\", \"type\":[\"null\",\"double\"],\"default\":null}, {\"name\": \"low\", \"type\":[\"null\",\"double\"],\"default\":null}, {\"name\": \"close\", \"type\":[\"null\",\"double\"],\"default\":null}, {\"name\": \"volume\", \"type\":[\"null\",\"long\"],\"default\":null} ]}"
}

2. POST URL: http://localhost:8282/subjects/IDX-Company/versions

Use Body request below:

{ "schema":
"{ \"type\": \"record\", \"name\": \"Company\", \"fields\": [ {\"name\": \"id\", \"type\": \"string\"}, {\"name\": \"ticker\", \"type\": \"string\"}, {\"name\": \"name\", \"type\": \"string\"}, {\"name\": \"logo\", \"type\": \"string\"}, {\"name\": \"createdtime\", \"type\": [ \"null\", \"string\" ]} ]}"
}

Once you have posted the 2 schemas above, try to check schema result using GET method.

  1. GET URL: http://localhost:8282/subjects/IDX-Stock/versions/1

Body result:

{
"subject": "IDX-Stock",
"version": 1,
"id": 1,
"schema": "{\"type\":\"record\",\"name\":\"Stock\",\"fields\":[{\"name\":\"id\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"ticker\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"date\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"open\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"high\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"low\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"close\",\"type\":[\"null\",\"double\"],\"default\":null},{\"name\":\"volume\",\"type\":[\"null\",\"long\"],\"default\":null}]}"
}

2. GET URL: http://localhost:8282/subjects/IDX-Company/versions/1

Body result:

{
"subject": "IDX-Company",
"version": 1,
"id": 3,
"schema": "{\"type\":\"record\",\"name\":\"Company\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"ticker\",\"type\":\"string\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"logo\",\"type\":\"string\"},{\"name\":\"createdtime\",\"type\":[\"null\",\"string\"]}]}"
}

Kafka Topic Creation

We have created a schema registry, so let’s move on to topic creation. As we mentioned before, we need to create topics with 9 partitions under 3 brokers. Automatically, we can use 2 replication factors since the formula is total broker -1.

Navigate to the console command and run this command line interface (CLI):

docker exec -ti kafka1 /usr/bin/kafka-topics --bootstrap-server kafka1:19092,kafka2:19093,kafka3:19094 --create --replication-factor 2 --partitions 9 --config cleanup.policy=compact --topic streaming.goapi.idx.stock.json
docker exec -ti kafka1 /usr/bin/kafka-topics --bootstrap-server kafka1:19092,kafka2:19093,kafka3:19094 --create --replication-factor 2 --partitions 9 --config cleanup.policy=compact --topic streaming.goapi.idx.companies.json

Additionally, we added compact as cleanup policy to enable Kafka only store the most recent value for each key in the topic. Setting the policy to compact only makes sense on topics for which applications produce events that contain both a key and a value.

Run this CLI to check our list topic:

docker exec -ti kafka1 /usr/bin/kafka-topics --bootstrap-server kafka1:19092,kafka2:19093,kafka3:19094 --list

ksqlDB Stream and Materialized Table Creation

We will not use Kafka Consumer to retrieve all the data from brokers. We want to use ksqlDB as a presentation layer to automatically de-duplicate data and store it in our MongoDB.

Navigate to the console command and run this CLI:

docker exec -it ksqldb-cli ksql http://ksqldb-server:9088

Once you enter CLI ksqlDB, then create Stream and Materialized table from our source topics (streaming.goapi.idx.stock.json and streaming.goapi.idx.companies.json). You can create Stream and Materialized table using this ksqlDB CLI:

-- Create Stock Stream with Avro Format
CREATE OR REPLACE STREAM ksqlstreamstock (id VARCHAR, ticker VARCHAR, date VARCHAR, open DOUBLE, high DOUBLE, low DOUBLE, close DOUBLE, volume BIGINT)
WITH (KAFKA_TOPIC='streaming.goapi.idx.stock.json', VALUE_FORMAT='AVRO', partitions=4);

-- Create Company Stream with Avro Format
CREATE OR REPLACE STREAM ksqlstreamcompany (id VARCHAR, ticker VARCHAR, name VARCHAR, logo VARCHAR)
WITH (kafka_topic='streaming.goapi.idx.companies.json', value_format='AVRO', partitions=4);

-- Create Stock Materialized View to Remove Duplication
CREATE OR REPLACE TABLE ksqltablegroupstock AS
SELECT
id AS id,
AS_VALUE(id) AS stockid,
latest_by_offset(ticker) AS ticker,
latest_by_offset(date) AS date,
latest_by_offset(open) AS open,
latest_by_offset(high) AS high,
latest_by_offset(low) AS low,
latest_by_offset(close) AS close,
latest_by_offset(volume) AS volume
FROM ksqlstreamstock
WINDOW TUMBLING (SIZE 1 MINUTE, RETENTION 3 DAYS, GRACE PERIOD 15 MINUTES)
GROUP BY id EMIT CHANGES;

-- Create Company Materialized View to Remove Duplication
CREATE OR REPLACE TABLE ksqltablegroupcompany AS
SELECT
id AS id,
AS_VALUE(id) AS companyid,
latest_by_offset(ticker) AS ticker,
latest_by_offset(name) AS name,
latest_by_offset(logo) AS logo
FROM ksqlstreamcompany
WINDOW TUMBLING (SIZE 1 MINUTE, RETENTION 3 DAYS, GRACE PERIOD 15 MINUTES)
GROUP BY id EMIT CHANGES;

-- Create Join Stock and Company Stream Table
CREATE OR REPLACE STREAM ksqlstreamjoinstockcompany AS
SELECT
stream1.id AS id,
stream1.ticker AS ticker,
date AS date,
open AS open,
high AS high,
low AS low,
close AS close,
volume AS volume,
name AS name,
logo AS logo
FROM ksqlstreamstock stream1
INNER JOIN ksqlstreamcompany stream2
WITHIN 7 DAYS GRACE PERIOD 15 MINUTES
ON stream1.ticker = stream2.ticker EMIT CHANGES;

-- Create Join Stock and Company Materialized Table
CREATE OR REPLACE TABLE ksqltablejoinstockcompany AS
SELECT
id AS id,
AS_VALUE(id) AS stockid,
latest_by_offset(ticker) AS ticker,
latest_by_offset(date) AS date,
latest_by_offset(open) AS open,
latest_by_offset(high) AS high,
latest_by_offset(low) AS low,
latest_by_offset(close) AS close,
latest_by_offset(volume) AS volume,
latest_by_offset(name) AS name,
latest_by_offset(logo) AS logo
FROM ksqlstreamjoinstockcompany
WINDOW TUMBLING (SIZE 1 MINUTE, RETENTION 3 DAYS, GRACE PERIOD 15 MINUTES)
GROUP BY id EMIT CHANGES;

If you’re creating an application with Kafka Streams or ksqlDB, and that application involves aggregations, it’s likely that you’ll use windowing. Aggregations of data accumulate over time, and without a limit, these aggregations won’t stop accumulating. Enter windowing, which defines the amount of data that can accumulate. tumbling windows are based on time intervals. They model fixed-size, non-overlapping, gap-less windows. A tumbling window is defined by a single property: the window’s duration. A tumbling window is a hopping window whose window duration is equal to its advance interval. Since tumbling windows never overlap, a record will belong to one and only one window [source].

Grace period may be required to ensure the events are accepted into the window. ksqlDB enables configuring this behavior for each of the window types and uses a default of 24 hours when no grace period is specified.

For each window type, you can configure the number of windows in the past that ksqlDB retains. This capability is very useful for interactive applications that use ksqlDB as their primary serving data store.

Python Kafka Requirements

We have a couple of options to build Kafka instead of Python, such as Java and Scala. But we will use Python as the language here because it is very easy to build and friendly for newbies. Let’s install this Python prerequisites using pip, setup, conda, etc.

confluent_kafka
requests
fastavro
ksqldb
pymongo
motor
pandas
pyarrow
pyspark==3.4.1

Config File

You may need to update the server config value based your server host names and IP. Do not forget to update the GoAPI API-Key with your own key.

import os

# Kafka Cluster Host
config = {
'bootstrap.servers': 'localhost:39092,localhost:39093,localhost:39094'
}
# Kafka Schema Host
srConfig = {
'url': 'http://localhost:8282'
}
# ksqlDB Host
ksqlConfig = {
'url': 'http://localhost:9088',
'mode': 'earliest' # earliest/latest
}
# MongoDB Host
mongoConfig = {
'url': 'mongodb://localhost:27017'
}
# Spark Host - Prepare this for next study case
sparkMasterConfig = {
'url': 'spark://192.168.1.4:7077'
}
rootDirectory = os.path.dirname(os.path.abspath(__file__)) # This is Project Root
apiKey = '[your-goapi-key]' # GoAPI Api-Key

Stock Class

We use this class to store stocks data and convert it into JSON format.

class Stock(object):
def __init__(self, id, ticker, date, open, high, low, close, volume):
self.id = id
self.ticker = ticker
self.date = date
self.open = open
self.high = high
self.low = low
self.close = close
self.volume = volume


def stockToDict(Stock, ctx):
return {"id": Stock.id,
"ticker": Stock.ticker,
"date": Stock.date,
"open": Stock.open,
"high": Stock.high,
"low": Stock.low,
"close": Stock.close,
"volume": Stock.volume}


def dictToStock(dict, ctx):
return dict

Company Class

We use this class to store companies data and convert it into JSON format.

class Company(object):
def __init__(self, id, ticker, name, logo, createdtime):
self.id = id
self.ticker = ticker
self.name = name
self.logo = logo
self.createdtime = createdtime


def companyToDict(Company, ctx):
return {"id": Company.id,
"ticker": Company.ticker,
"name": Company.name,
"logo": Company.logo,
"createdtime": Company.createdtime}


def dictToCompany(dict, ctx):
return dict

API Request Function

To call all APIs from GoAPI, we use “request” python package and pass all necessary header parameters to it.

import requests
from Config import apiKey


def getAPIResults(apiURL):
headers = {
'Accept': '*/*',
'X-API-KEY': apiKey
}
response = requests.get(apiURL, headers=headers)

if response.status_code != 200:
print('ERROR with Response Code: ' + str(response.status_code) + ' URL: ' + apiURL)
return response.status_code, ""
else:
return response.status_code, response.json()['data']['results']

Kafka Producer Avro Serializer

Producer sends the data to the Kafka broker using the Avro serializer. This schema registry will allow our schema to evolve when there are any changes in the data format, but it has similarities with an existing one that has been stored in the registry. Hence, we do not need to create a new schema for this issue and let’s the registry do the classification.

from confluent_kafka.serialization import StringSerializer, SerializationContext, MessageField

def deliveryReport(err, event):
if err is not None:
print(f"Error ID: {event.key().decode('utf8')}: {err}")

def sendProducer(topic, object, producer, avroSerializer):
producer.produce(topic=topic,
key=StringSerializer('utf_8')(str(object.id)),
value=avroSerializer(object, SerializationContext(topic, MessageField.VALUE)),
on_delivery=deliveryReport)

Kafka Producer Main Function

Kafka Producer is a client application that publishes (writes) events to a Kafka cluster. Kafka Producer is conceptually much simpler than the consumer since it has no need for group coordination. A producer partitioner maps each message to a topic partition, and the producer sends a produce request to the leader of that partition. The partitioners shipped with Kafka guarantee that all messages with the same non-empty key will be sent to the same partition.

from Module.CallAPI import getAPIResults  # Custom Module
from Module.ClassStock import Stock, stockToDict # Custom Module
from Module.ClassCompany import Company, companyToDict # Custom Module
from Module.ConfluentKafkaAvroProducer import sendProducer # Custom Module
from Config import config, srConfig # Custom Module
from confluent_kafka import Producer
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.schema_registry import SchemaRegistryClient
from confluent_kafka.schema_registry.avro import AvroSerializer
import datetime
import time
import urllib.parse

if __name__ == '__main__':
topic1 = 'streaming.goapi.idx.stock.json'
topic2 = 'streaming.goapi.idx.companies.json'
schema1 = 'IDX-Stock'
schema2 = 'IDX-Company'
baseUrl = 'https://api.goapi.id/v1/stock/idx/'

# Define Kafka Serializer and Schema
string_serializer = StringSerializer('utf_8')
schema_registry_client = SchemaRegistryClient(srConfig)
stockSchema = schema_registry_client.get_latest_version("IDX-Stock")
stockAvroSerializer = AvroSerializer(schema_registry_client,
stockSchema.schema.schema_str,
stockToDict)
companySchema = schema_registry_client.get_latest_version("IDX-Company")
companyAvroSerializer = AvroSerializer(schema_registry_client,
companySchema.schema.schema_str,
companyToDict)

# Define Kafka Producer
producer = Producer(config)

# Query Company Trending
print("Start extracting trending stock")
apiUrl = baseUrl + 'trending'
responseCode, listTrending = getAPIResults(apiUrl)
print("End extracting trending stock")

# URL List All Company
print("Start extracting companies")
apiUrl2 = baseUrl + 'companies'
responseCode, listCompany = getAPIResults(apiUrl2)
print("End extracting companies")

# Encode the parameter values
dateTimeFormat = '%Y-%m-%d'
currentDate = datetime.datetime.now().strftime(dateTimeFormat)
yesterdayDate = (datetime.datetime.now() - datetime.timedelta(1)).strftime(dateTimeFormat)
# encodedParam1 = urllib.parse.quote_plus(yesterdayDate) # Use this if you want to daily stock update
encodedParam1 = urllib.parse.quote_plus('2023-08-01')
encodedParam2 = urllib.parse.quote_plus(currentDate)

try:
Counter = 0
while (True):
for row in listTrending:
Counter += 1
emitent = row['ticker']
change = row['change']
percent = row['percent']
print('Counter: ' + str(Counter) + ' emitent: ' + emitent + ' change: ' + change +
' percent: ' + percent)

# Query Historical Stock Price
apiUrl3 = baseUrl + emitent + '/historical'
apiUrl3 = apiUrl3 + '?from=' + encodedParam1 + '&to=' + encodedParam2
responseCode, historicalPrices = getAPIResults(apiUrl3)
print('Counter: ' + str(Counter) + ' API Historical URL: ' + apiUrl3 +
' Response Code: ' + str(responseCode))
if responseCode == 200:
if len(historicalPrices) > 0 and len(listCompany) > 0:
for row2 in historicalPrices:
ticker = str(row2['ticker'])
date = str(row2['date'])
id = ticker + '_' + date
open = float(row2['open'])
high = float(row2['high'])
low = float(row2['low'])
close = float(row2['close'])
volume = int(row2['volume'])
print('Counter: ' + str(Counter) + ' ticker: ' + ticker + ' date: ' + date + ' open: ' +
str(open) + ' high: ' + str(high) + ' low: ' + str(low) + ' close: ' + str(close) +
' volume: ' + str(volume))

# Send Avro Producer
stock = Stock(id, ticker, date, open, high, low, close, volume)
sendProducer(topic1, stock, producer, stockAvroSerializer)

filteredCompany = list(filter(lambda x: x['ticker'].lower() == emitent.lower(), listCompany))
compTicker = str(filteredCompany[0]['ticker'])
compName = str(filteredCompany[0]['name'])
compLogo = str(filteredCompany[0]['logo'])
print('Counter: ' + str(Counter) + ' name: ' + compName + ' logo: ' + compLogo)

# Send Avro Producer
company = Company(ticker, ticker, compName, compLogo,
str(datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')))
sendProducer(topic2, company, producer, companyAvroSerializer)

# Save and flush
producer.flush()

time.sleep(5) # Wait 5 seconds to see the output
except KeyboardInterrupt:
print("Press Ctrl+C to terminate while statement")
pass

Producer gets a list of trending stocks and companies, then performs an infinite iteration until keyboard Ctrl+C is pressed. Inside the loop, it will retrieve historical stock prices based on the ticker name and date range. After, then Producer send the data to Stock and Company topic using our Avro function serializer. Lastly, producer commit and flush the data to send it to the Kafka cluster.

ksqlDB Get Data and Sink

Asyc/asyncio allows concurrency within a single thread. This gives you, as the developer, much more fine grained control of the task switching and can give much better performance for concurrent I/O bound tasks than Python threading. Threading in Python is inefficient because of the GIL (Global Interpreter Lock) which means that multiple threads cannot be run in parallel as you would expect on a multi-processor system. Plus you have to rely on the interpreter to switch between threads, this adds to the inefficiency.

Push queries (SELECT * FROM table EMIT CHANGES) enable you to subscribe to changes, which enable reacting to new information in real-time. They’re a good fit for asynchronous application flows. It will push a continuous stream of updates to the ksqlDB stream or table. The result of this statement isn’t persisted in a Kafka topic and is printed out only in the console, or returned to the client.

One more thing, ksqlDB will automatically de-serialize the Avro format. So, we do not need to prepare a de-serialize function to retrieve our data.

from ksqldb import KSQLdbClient
from Config import ksqlConfig, mongoConfig
from datetime import datetime
import asyncio
import json
from motor.motor_asyncio import AsyncIOMotorClient
from pymongo.server_api import ServerApi

stockQueryID = ""
companyQueryID = ""
joinStockCompQueryID = ""


async def buildJsonValue(colNames, Rows):
colNames = colNames
if len(colNames) == len(Rows):
global finalJson
for i in range(len(Rows)):
value = Rows[i]
if 'str' in str(type(Rows[i])):
value = '"' + str(Rows[i]) + '"'
if colNames[i].lower() != 'windowstart' and colNames[i].lower() != 'windowend': # Skip this column
if i == 0:
finalJson = '{"' + colNames[i].lower() + '":' + str(value) + ''
elif i == len(Rows) - 1:
finalJson = finalJson + ',"' + colNames[i].lower() + '":' + str(value) + '}'
else:
finalJson = finalJson + ',"' + colNames[i].lower() + '":' + str(value)
else:
raise Exception("Total Column " + str(len(colNames)) + "is not match with Total Value " + str(len(Rows)))
# print(finalJson)
return finalJson


async def queryAsyncStock():
global stockQueryID
query = ksqlClient.query_async("select * from KSQLTABLEGROUPSTOCK emit changes;",
stream_properties={"ksql.streams.auto.offset.reset": ksqlConfig['mode']},
timeout=None)
counter = 0
global colNamesStock
async for row in query:
counter = counter + 1
if counter == 1:
stockQueryID = row['queryId']
colNamesStock = row['columnNames']
print("Stock Column Names: ", colNamesStock)
print("Query ID: ", stockQueryID)
else:
jsonValue = await buildJsonValue(colNamesStock, row)
jsonObject = json.loads(jsonValue)
collectionName = 'ksql-stock-stream'
query = {'id': row[0]}
dateTime = datetime.today().strftime('%Y-%m-%d %H:%M:%S')
mongoResult = await mongoDB[collectionName].find_one(query)
# print(mongoResult)
if mongoResult is not None:
updateResult = await mongoDB[collectionName].replace_one(query, jsonObject)
print(dateTime + " Existing collection " + collectionName + " documentID " + str(
row[0]) + " modified document count: " + str(updateResult.modified_count))
else:
insertResult = await mongoDB[collectionName].insert_one(jsonObject)
print(dateTime + " Inserted collection " + collectionName + " documentID " + str(
row[0]) + " with the following mongoID: " + str(insertResult.inserted_id))


async def queryAsyncCompany():
global companyQueryID
query = ksqlClient.query_async("select * from KSQLTABLEGROUPCOMPANY emit changes;",
stream_properties={"ksql.streams.auto.offset.reset": ksqlConfig['mode']},
timeout=None)
counter = 0
global colNamesCompany
async for row in query:
counter = counter + 1
if counter == 1:
companyQueryID = row['queryId']
colNamesCompany = row['columnNames']
print("Company Column Names: ", colNamesCompany)
print("Query ID: ", companyQueryID)
else:
jsonValue = await buildJsonValue(colNamesCompany, row)
jsonObject = json.loads(jsonValue)
collectionName = 'ksql-company-stream'
query = {'id': row[0]}
dateTime = datetime.today().strftime('%Y-%m-%d %H:%M:%S')
mongoResult = await mongoDB[collectionName].find_one(query)
# print(mongoResult)
if mongoResult is not None:
updateResult = await mongoDB[collectionName].replace_one(query, jsonObject)
print(dateTime + " Existing collection " + collectionName + " documentID " + str(
row[0]) + " modified document count: " + str(updateResult.modified_count))
else:
insertResult = await mongoDB[collectionName].insert_one(jsonObject)
print(dateTime + " Inserted collection " + collectionName + " documentID " + str(
row[0]) + " with the following mongoID: " + str(insertResult.inserted_id))


async def queryAsyncJoinStockCompany():
global joinStockCompQueryID
query = ksqlClient.query_async("select * from KSQLTABLEJOINSTOCKCOMPANY emit changes;",
stream_properties={"ksql.streams.auto.offset.reset": ksqlConfig['mode']},
timeout=None)
counter = 0
global colNamesJoinStockComp
async for row in query:
counter = counter + 1
if counter == 1:
joinStockCompQueryID = row['queryId']
colNamesJoinStockComp = row['columnNames']
print("Company Column Names: ", colNamesJoinStockComp)
print("Query ID: ", joinStockCompQueryID)
else:
jsonValue = await buildJsonValue(colNamesJoinStockComp, row)
jsonObject = json.loads(jsonValue)
collectionName = 'ksql-join-stock-company'
query = {'id': row[0]}
dateTime = datetime.today().strftime('%Y-%m-%d %H:%M:%S')
mongoResult = await mongoDB[collectionName].find_one(query)
# print(mongoResult)
if mongoResult is not None:
updateResult = await mongoDB[collectionName].replace_one(query, jsonObject)
print(dateTime + " Existing collection " + collectionName + " documentID " + str(
row[0]) + " modified document count: " + str(updateResult.modified_count))
else:
insertResult = await mongoDB[collectionName].insert_one(jsonObject)
print(dateTime + " Inserted collection " + collectionName + " documentID " + str(
row[0]) + " with the following mongoID: " + str(insertResult.inserted_id))


def shutDown():
print("Shutdown Query Stock ID: ", stockQueryID)
ksqlClient.close_query(stockQueryID)
print("Shutdown Query Company ID: ", companyQueryID)
ksqlClient.close_query(companyQueryID)
print("Shutdown Query Join Stock Company ID: ", joinStockCompQueryID)
ksqlClient.close_query(joinStockCompQueryID)


if __name__ == '__main__':
ksqlClient = KSQLdbClient(ksqlConfig['url'])
mongoClient = AsyncIOMotorClient(mongoConfig['url'], server_api=ServerApi('1'))
mongoDB = mongoClient["kafka"] # Get Database Name
loop = asyncio.get_event_loop()
try:
loop.create_task(queryAsyncStock())
loop.create_task(queryAsyncCompany())
loop.create_task(queryAsyncJoinStockCompany())
loop.run_forever()
except KeyboardInterrupt:
print("Shutdown Starting...")
shutDown()

So, this code uses asyc functions to run ksqlDB push queries concurrently and sink the data to the MongoDB database. Additionally, we have prepared a function to convert ksqlDB result into JSON format. It splits the column and value based on the index and saves them in a single line of string.

Execution

First, run the ksqlDB Get Data and Sink program. If there is no error, then it will show this results:

ksqlDB Get Data and Sink Initial Result

After that, run the Producer program. if there is no error, then it will show this results:

Producer Capturing Data

Normally, ksqlDB will automatically capture new stream and do the aggregation based on our stream and materialized table. Hence, our push query will show the result from materialized table as soon as there are changes in the table due to new or update rows. This is example results when ksqlDB Get Data and Sink capture new rows:

ksqlDB Get Data and Sink — Processing Push Queries

Alternatively, we can check the data in MongoDB directly by navigating to MongoDB Compass and filter the data accordingly.

MongoDB Results

In the end, our ksqlDB Get Data and Sink program will automatically create 1 Kafka database, 3 MongoDB collections that consist of ksql-stock-stream, ksql-company-stream, and ksql-join-stock-company. If this db and collection haven’t been created, that means you need to check the MongoDB sink function. Try to debug the sink process and check all variables before saving the data to MongoDB.

Demo Video

Demo Data Streaming with Kafka

Summary

Kafka is highly acclaimed data streaming platform, achieves its best performance by offering a distributed, fault-tolerant, and scalable architecture. Kafka excels in handling real-time data streams with low latency and high throughput, making it ideal for use cases like event sourcing, log aggregation, and data integration.

It boasts excellent durability with data replication across multiple brokers, ensuring data integrity and fault tolerance. Kafka’s ability to horizontally scale across clusters and partitions further enhances its performance, allowing it to handle enormous volumes of data with ease. Its compatibility with various programming languages and robust ecosystem of connectors and tools also contribute to Kafka’s reputation as a top choice for data streaming applications.

Repository

I will share about Kafka Monitoring and Machine Learning for Data Streaming in the next topic, so stay tuned and see you…

--

--