Design Streaming Data pipeline using Kafka

Mohammed Ragab
Nerd For Tech
Published in
7 min readApr 25, 2021

In this article I will explain some data pipeline use cases and how can Kafka be useful there with a prof of concpt how to build data streaming pipeline.

In many large/medium scale project you need to use some sort of data system that can be valid in different cases for example you need SQLDB to handle transaction and grantee data consistency in your software and you need a different model for reporting, user events, and CDC (change data capture) and so on and you can do this job in many different ways such as some SQLDB systems support Real-time sync for (CDC) like Postgres and others but on this article, I will focus on how far Kafka, Kafka streams and Kafka connect can be useful on this context. Consider we have a software like a rating system for products and we have SQLDB that store customers rates in a SQL structure and on another hand we need send this data to our reporting data system and send the user events to Casandra DB and logs to Elastic search or time series like Influx DB and so on and we have different systems like for example notification system that listens to users rates that lower than 3 for example to contact them and many different use cases.

The traditional thinking about solving this problem is to write some sort of data transferring application from SQL to elastic API with/without Kafka and so on.

So What is Kafka?

Kafka itself is not our focus on this article but In a nutshell, Apache Kafka is an open-source distributed event streaming platform used by thousands of companies for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

Why Kafka?

Scalable, durable, and fault-tolerant Kafka can work with Spark Streaming, Storm, HBase, Flink, and Spark for real-time ingesting, analysis, and processing of streaming data. Kafka is a data stream used to feed Hadoop BigData lakes. Kafka brokers support massive message streams for a low-latency follow-up analysis in Hadoop or Spark.

I recommend reading my article to understand Kafka key concepts

So now how we can reduce efforts to build services that transfer the data between data systems as you see in the design on the top I used Kafka connect.

What is Kafka connect?

Kafka Connect is a tool for scalably and reliably streaming data between Apache Kafka and other data systems. It makes it simple to quickly define connectors that move large data sets into and out of Kafka. Kafka Connect can ingest entire databases or collect metrics from all your application servers into Kafka topics, making the data available for stream processing with low latency. An export connector can deliver data from Kafka topics into secondary indexes like Elasticsearch or into batch systems such as Hadoop for offline analysis.

How Kafka connect works?

You can deploy Kafka Connect as a standalone process that runs jobs on a single machine (for example, log collection), or as a distributed, scalable, fault-tolerant service supporting an entire organization. Kafka Connect provides a low barrier to entry and low operational overhead. You can start small with a standalone environment for development and testing, and then scale up to a full production environment to support a large organization’s data pipeline.

Kafka Connect includes two types of connectors

  • Source connector Ingests entire databases and streams table updates to Kafka topics. A source connector can also collect metrics from all your application servers and store these in Kafka topics, making the data available for stream processing with low latency.
  • Sink connector Delivers data from Kafka topics into secondary indexes such as Elasticsearch, or batch systems such as Hadoop for offline analysis.

So as you see from the architecture on the top we used SQLDB source connector maybe Postgres, MSSQL, MySQL, or others and we used another sink connect to push data from Kafka to Elastic search, and so on.

What do we need?

1- Kafka cluster with zookeeper you can use Docker for POC

2- debezium Kafka connector in this article I took a Postgres as an example so I used debezium Kafka connector to capture data changes (insert / update /delete) from Postgres to Kafka to send it later to elastic using elastic sink connect

3- schema register Confluent Schema Registry provides a serving layer for your metadata. It provides a RESTful interface for storing and retrieving your Avro, JSON Schema, and other schemas. It stores a versioned history of all schemas based on a specified subject name strategy, provides multiple compatibility settings, and allows the evolution of schemas according to the configured compatibility settings and expanded support for these schema types. It provides serializers that plug into Apache Kafka® clients that handle schema storage and retrieval for Kafka messages that are sent in any of the supported formats. In this article, we will use Avro.

4- JAVA (optional), as we are talking about streaming application Kafka-streams APIs supported only on java Kafka client maybe you will find some sort of clients on other programming languages support streaming APIs but I do not think it strong or complete like JAVA client and we only use JAVA her if we will write a streaming API like notification system.

5- KAFKA-SQL (optional) Confluent KSQL is the streaming SQL engine that enables real-time data processing against Apache Kafka, KSQL is scalable, elastic, fault-tolerant, and it supports a wide range of streaming operations, including data filtering, transformations, aggregations, joins, windowing, and more.

What is Kafka stream?

Kafka Streams is a library for building streaming applications, specifically applications that transform input Kafka topics into output Kafka topics (or calls to external services, or updates to databases, or whatever). It lets you do this with concise code in a way that is distributed and fault-tolerant. Stream processing is a computer programming paradigm, equivalent to data-flow programming, event stream processing, and reactive programming, that allows some applications to more easily exploit a limited form of parallel processing. Kafka streams support some sort of stateless API like a filter, mapping, flat-map ..etc like any stream and on the other hand support stateful API like count, windowing, group by, etc

So after running all components we need using docker or another way we need to do the following

1- Create the job on SQL source connect that capture changes on form table(s) to Kafka to do this just we will call connect API

http://localhost:8083/connectors POST
{
"name":"rating-system-connector","config":{"connector.class":"io.debezium.connector.postgresql.PostgresConnector","database.hostname":"localhost","database.port":"5433","database.user":"postgres","database.password":"P@ssw0rd","database.dbname":"rating_system_demo","database.server.name":"ratingserver","table.whitelist":"rating.rates","key.converter.schemas.enable":"true","key.converter": "org.apache.kafka.connect.storage.StringConverter","value.converter": "io.confluent.connect.avro.AvroConverter","value.converter.schema.registry.url": "http://schema-registry:8081","value.converter.schemas.enable": "true"}}

We called connect API to create a background job and we told connect about database connection information such as server, user name, password, DB name, table, and so on also the schema register URL and converter type AVRO as we explained before on schema register. (Avro is an open-source data serialization system that helps with data exchange between systems, programming languages, and processing frameworks. Avro helps define a binary format for your data, as well as map it to the programming language of your choice)

By creating a job every change done in the table will push to Kafka with after/before pattern for example if you insert a new record you will have after the object is null and before will contain the inserted record data as a JSON object structure in update you will have before and after and delete you will have before and so on. also if your case to just take a snapshot form data from time to time, for example, every 7 days you can use confluent JDBSOURCE connector and on this case you can call the other connector to create a job as well

{"name": "jdbc_source_mysql_01","config": {"connector.class": "io.confluent.connect.jdbc.JdbcSourceConnector","connection.url": "jdbc:postgres://localhost:5432/customers","value.converter.schema.registry.url": "http://schema-registry:8081","connection.user": "root","connection.password": "P@ssw0rd","topic.prefix": "customer-events-","table.whitelist" : "customer_events","timestamp.column.name": "created_on","transforms":"createKey,extractInt","transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey","transforms.createKey.fields":"id","transforms.extractInt.type":"org.apache.kafka.connect.transforms.ExtractField$Key","transforms.extractInt.field":"id","mode":"bulk"}}

2- We need sink to connect to consume data from Kafka and push it through Elastic search API or reporting database etc. call elastic-sink-connect endpoint to create a job

{
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"tasks.max": "1",
"topics": "example.elasticsearch.data",
"name": "example-elasticsearch-connector",
"connection.url": "http://elasticsearch:9200",
"type.name": "_doc"
}

We just told connect about our elastic API URL and type of index _doc or maybe log.

Now we made a data pipeline system we can use KSQL to make some continuous queries on streams or maybe build a java stream application. for example, if we need to get a count of rates based on region or country or time base and so on we need to create a stream on top of the Kafka topic

CREATE STREAM RATES_STREAM
(id BIGINT,
customer_name VARCHAR,
Country_name VARCHAR
rate INT)
WITH (KAFKA_TOPIC='RATES',
VALUE_FORMAT='AVRO')
EMIT CHANGES;

First, we created a stream from our topic then we can create another stream

CREATE STREAM RATES_COUNT AS
SELECT COUNT(*), Country_name FROM RATES_STREAM
GROUP BY Country_name
EMIT CHANGES;

Then you can easily select from the stream

SELECT * FROM RATES_COUNT EMIT CHANGES;

Or you can create your stream application for example if we need to capture customers that add low rate on our product using Kafka stream API in JAVA

Properties config = getConfig();
StreamsBuilder streamsBuilder = new StreamsBuilder();
KStream<String, Rate> stream = streamsBuilder.stream("rates");
stream.peek(OrderServiceApplication::PrintAll)
.filter((k,v)-> v.rate < 3)
.peek(OrderServiceApplication::SendNotification);


Topology topology = streamsBuilder.build();
KafkaStreams streams = new KafkaStreams(topology, config);
streams.start();

And more .., so I will explain streams and KSQL tricks deeply in another article

In the end, I hope this article helps you with designing your streaming-data -pipeline system.

--

--