How to build a Real Time Data Streaming pipeline?

Haq Nawaz
7 min readMar 15, 2023

using Apache Kafka, Debezium and Postgres

Data Streaming Workflow

Stream processing is a method of continuously collecting and processing real-time or near real-time data. We have covered Stream Processing in the previous article. Today we are going to kick off the Data Streaming series with PySpark using Apache Kafka. Apache Kafka is a Distributed Event Streaming solution that enables applications to efficiently manage billions of events. It is a Publish-Subscribe (Pub/Sub) messaging system that accepts Data Streams from several sources and allows real-time analysis of Big Data streams. It can quickly scale up with minimal downtime. We will use Apache Kafka as a source and develop a Data Streaming application in PySpark. We have covered PySpark basics and developed a batch processing data pipeline in this session here.

This tutorial will enable you to:

  • Setup Apache Kafka, Debezium and configure Postgres for Data Streaming
  • Setup and initiate the Docker containers
  • Create Python Apache Kafka Producer and Consumer to read from Kafka topic

If you are a visual learner then I have an accompanying video on YouTube with a walk-through of the complete code.

We can use a log file or distributed engines like Kafka or kinesis to stream data using Spark. However, we want to stream data from a relational database. This is not supported out of the box. Therefore, we will stream database changes to Kafka and for this we will utilize the Debezium Postgres connector. We will use Kafka as a source for our data streaming application. Debezium Postgres connector captures row-level database changes and streams them to Kafka via Kafka Connect. This works as a source connector and without writing additional code we can stream database records. Hold on, how is this actually possible?

Starting with PostgreSQL version 10 Plus, there is a logical replication stream mode, called pgoutput that is natively supported by the PostgreSQL. Logical replication is a method of replicating data objects and their changes, based upon their replication identity (usually a primary key). It uses the publish subscribe method commonly known as pub sub similar to Kafka. Pub/Sub provides a framework for exchanging messages between publishers and subscribers. With this mode Debezium PostgreSQL connector can subscribe to the logical replication and receive database changes without the need for additional plug-ins or code. The connector consumes the PostgreSQL replication logs in order to stream the database changes. All we have to do is to deploy it on our Kafka environment and configure a connector.

Database Setup

Let’s start with configuring our environment. First, we will go over the Postgres setup. We have installed and configured PostgreSQL in this video here. Feel free to pause and install Postgres following the step-by-step guide provided in this video. We will use this installation for this series. Let’s launch pgadmin, this is a GUI Postgres database management tool. We will check the Write Ahead Log or simply known as wal_level. We can check this setting with the following query.

select * from pg_settings where name ='wal_level';

This setting is located in the postgresql.conf file. We can open this file from our install directory and search for wal_level setting and make sure this is set to logical. Save the updated file and be sure to restart the Postgres services on the server. The default value is replica, which writes enough data to support archiving and replication. Logical adds additional information necessary to support the row-level changes. If you’re using PostgreSQL in a docker container then run the following command to set the wal_level.

ALTER SYSTEM SET wal_level = logical;

For Debezium connector to work as expected the user associated with it must run as a user that has the following privileges:

  • User must have Replication privileges in the database to add the table to a publication. A publication is essentially a group of tables whose data changes are intended to be replicated through logical replication.
  • User must have CREATE privileges on the database to add publications.
  • User must have SELECT privileges on the tables to copy the initial table data. Table owners automatically have SELECT permission for the table.

Kafka and Debezium Setup

We will install Apache Kafka and Debezium as docker containers. Docker Desktop is a prerequisite for this. Download and install the Docker Desktop application with defaults. Docker containers make this tedious process a breeze. I cover why docker should be on your list to learn technologies here. Feel free to check it out.

First we need a zookeeper instance. Zookeeper is a Kafka requirement. The Zookeeper is primarily used to track the status of nodes in the Kafka cluster and maintain a list of Kafka topics and messages. Therefore, we spin up a zookeeper container. This runs on port 2181. For Zookeeper and Kafka we are using the confluent image.

  zookeeper:
image: confluentinc/cp-zookeeper:5.5.3
environment:
ZOOKEEPER_CLIENT_PORT: 2181

Next we set up a Kafka container. This depends on the zookeeper service. Kafka runs on port 9092. KAFKA_ADVERTISED_LISTENERS configures the broker to listen on 9092 internally and from outside the docker containers we can access Kafka on port 29092. All clients internal to docker are using 9092 port to reach the Kafka broker. We expose the Port 29092 for external connectivity.

  kafka:
image: confluentinc/cp-enterprise-kafka:5.5.3
depends_on:
- zookeeper
ports:
- 29092:29092
environment:
KAFKA_BROKER_ID: 1
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT

Following Kafka, we define Debezium container’s details. This is based on the Debezium image. It needs the Kafka server details, since this is internal, we provide the Kafka container name and local port of 9092. Debezium needs the message serializer and deserializer and we provide the Avro Converter. Our message will be serialized by Avro Converter. Avro is a data serialization system and it is quite similar to Json. Debezium also needs the schema registry details. Schema register stores and retrieves the Avro schema of our messages. Which we define next.

  debezium:
image: debezium/connect:1.4
environment:
BOOTSTRAP_SERVERS: kafka:9092
GROUP_ID: 1
CONFIG_STORAGE_TOPIC: connect_configs
OFFSET_STORAGE_TOPIC: connect_offsets
KEY_CONVERTER: io.confluent.connect.avro.AvroConverter
VALUE_CONVERTER: io.confluent.connect.avro.AvroConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: http://schema-registry:8081
depends_on: [kafka]
ports:
- 8083:8083

Our schema register container will be responsible for storing and retrieving the schema of our messages. It also keeps track of the schema version as our schema evolves over time. It needs the zookeeper and Kafka server details.

  schema-registry:
image: confluentinc/cp-schema-registry:5.5.3
environment:
- SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL=zookeeper:2181
- SCHEMA_REGISTRY_HOST_NAME=schema-registry
- SCHEMA_REGISTRY_LISTENERS=http://schema-registry:8081,http://localhost:8081
ports:
- 8081:8081
depends_on: [zookeeper, kafka]

Last service is optional. This is Kafka manager. This provides a GUI interface to manage our Kafka cluster. This runs on port 9000 and it depends on zookeeper and Kafka services.

  kafka_manager:
image: hlebalbau/kafka-manager:stable
restart: always
ports:
- "9000:9000"
depends_on:
- zookeeper
- kafka
environment:
ZK_HOSTS: "zookeeper:2181"
APPLICATION_SECRET: "random-secret"
command: -Dpidfile.path=/dev/null

The complete docker compose file is available on GitHub. We start all these containers with the following command.

docker-compose up -d

If this is the first time you are running this command then docker will pull down these images from the docker hub. Then it will build and start these images. Once all the containers start, we can bring up the docker desktop to view the containers. We can ping the various APIs end points such as Kafka manager, schema registry and Kafka Connect to make sure services are up.

We launch the Kafka manager with the following URL localhost:9000. In the Kafka manager we can add our Kafka cluster. We click on the Cluster drop on the top nav bar and select Add Cluster option. We provide a name for this cluster, datastreaming. This requires the zookeeper services, so we provide the Zookeeper URL, zookeeper:2181. We select the following options and will leave the rest of the options as defaults and click save to add our Kafka cluster.

  • Enable JMX Polling
  • Poll consumer information
  • Enable Active OffsetCache

We create a test topic from this UI by clicking the topic option from the nav bar, provide a topic name and leave the rest of the options default. We have created a Kafka topic. Let’s send some messages to this topic and once received, consume them.

Kafka Producer and Consumer

We will use Python to create a producer and a consumer. To code these we will use a Jupyter notebook and we will need the Kafka python library installed.

Pip install kafka-python

The producer needs the Kafka server. So we define a local variable and we access the Kafka on port 29092. In addition, it needs the topic name. Let’s create an instance of the KafkaProducer and this needs the Kafka server and we supply it our local variable. We save this instance in the local variable. Now we call this instance and access the send function. This takes two arguments, topic name and the message. At the end we print that the message is sent, if there is no error. Let’s execute this cell to send a message to this topic.

# Import KafkaProducer from Kafka library
from kafka import KafkaProducer

# Define server with port
bootstrap_servers = ['localhost:29092']

# Define topic name where the message will publish
topicName = 'test'

# Initialize producer variable
producer = KafkaProducer(bootstrap_servers = bootstrap_servers)

# Publish text in defined topic
producer.send(topicName, b'Hello Kafka...')

# Print message
print("Message Sent")

In another kernel, we define a Kafka consumer. It will be very similar to the producer. In this instance we import the Kafka consumer. Define our Kafka server and topic. Create an instance of the consumer with topic name and Kafka server details. We save this instance in a local variable. We iterate over this variable and print the topic and the value. This prints the topic name and our message. We can go back to the previous kernel and send another message. The message is received instantly by our consumer.

from kafka import KafkaConsumer

# Define server with port
bootstrap_servers = ['localhost:29092']

# Define topic name from where the message will receive
topicName = 'test'

# Initialize consumer variable
consumer = KafkaConsumer (topicName , auto_offset_reset='earliest', bootstrap_servers = bootstrap_servers)

# Read and print message from consumer
for msg in consumer:
print("Topic Name=%s,Message=%s"%(msg.topic,msg.value))

As soon as the messages arrive in this topic this consumer will read and display them. If we are to open the Kafka manager. We should see activity on this topic. The offset should be at the last message and we should see the consumer group ids that are reading from this topic.

Conclusion

  • We showcased how easy it is to set up Apache Kafka, Debezium containers for the real time Data Streaming.
  • We create the Apache Kafka, Debezium, Schema Registry and Zookeeper Docker Containers.
  • We create Python Kafka Producer and Consumer to read messages from a topic
  • The full code can be found here

--

--

Haq Nawaz

I am a business intelligence developer and data science enthusiast. In my free time I like to travel and code, and I enjoy landscape photography.