Your first data pipeline with Kafka

Using Docker, Kafka, Python and Postgres

WesleyBos
6 min readAug 3, 2020
Photo by Rodion Kutsaev on Unsplash

Intro

With this post, I want to share my (first) experience using Kafka to create a simple pipeline and eventually a streaming data pipeline.

Kafka

Apache Kafka is an open-source stream-processing software platform that was initially developed by LinkedIn and became open-sourced in 2011. Kafka has various use cases, but it is primarily used to build real-time streaming data pipelines and applications.

Image credit

Kafka producer & consumer

Let’s begin with creating a pipeline where we write (produce) and read (consume) data to and from Kafka. Before we start, some concepts need to be covered:

  • Topic: a topic is a category to which records are published. These topics are multi-subscriber, meaning one topic can have multiple consumers that subscribe to it.
  • Kafka Broker: a Broker holds the topics to which producers publish messages to, and from which consumers obtain messages from.
  • Message/Record: in Kafka, a message* is a unit of data. A message is portrayed as a record which consists of a key and a value.
    *The terms ‘message’ and ‘record’ are used indistinguishably.
  • Producer: a producer writes messages to one or more topics of its choice.
  • Consumer: a consumer reads messages from one or more topics of its choice.

With the clarification of these concepts, let’s set up a pipeline.

Prerequisites:
This project uses Docker and docker-compose. View this link to find out how to install them for your OS.

Clone my git repo:

git clone https://github.com/Wesley-Bos/kafka-pipeline-examples.git

Note: depending on your pip and Python version, the commands differ a little:

  • pip becomes pip3
  • python become python3

Before we begin, create a new environment. I use Anaconda to do this but feel free to use any tool of your liking. Activate the environment and install the required libraries by executing the following commands:

sudo apt-get install python-psycopg2pip install -r requirements.txt

1. Run the docker-compose.yml file

docker-compose -f docker-compose.yml up -d

The docker-compose file contains the following services:

  • Zookeeper: a centralised service for maintaining Kafka clusters.
  • Kafka Broker: manages the storage of messages which are sent to and fetched from a topic.
  • Schema Registry: the Confluent Schema Registry provides a RESTful interface for storing and retrieving schemas.
  • Kafka Connect: a framework for connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems.
  • Control Center: the Confluent Control Center is a web-based tool for managing and monitoring Apache Kafka®.
  • ksqlDB: KsqlDB is the streaming database for Apache Kafka®.
  • Rest Proxy: the Confluent REST Proxy provides a RESTful interface to a Kafka cluster.

Wait a moment while Docker downloads the images and for the services to start up.

2. Create a producer

python producer.py
Producer writing data to the Kafka topic.

3. Create a consumer

python consumer.py
Consumer reading data from the Kafka topic.

In the terminal, you can see the messages that the producer has published and the data which is consumed from the Kafka topic. By going to the Control Center, the same data can be seen in a UI.

Great, we now have a pipeline where data is being published to and consumed from Kafka.

Source and Sink with Kafka using PostgreSQL

In other cases, one might have the desire to use Kafka in combination with a database to store/retrieve data or to do both.

Image credit

To establish a connection between Kafka and the database, Kafka Connect is employed.

It allows for connecting Kafka with external systems such as key-value stores, search indexes, file systems and in our case databases.

Before we view the code, there are a few concepts that need some explaining:

  • Connector: specifies where data should be copied to or from.
  • Source Connector: copies data from a system into Kafka.
  • Sink Connector: copies data from one or multiple Kafka topics into a system.
  • Task: a connector instance coordinates a task, which is an implementation of how data is copied to or from Kafka.
  • Converters: are required so that Kafka Connect supports a particular data format when writing to or reading from Kafka.

Now let’s get started by setting up the Kafka environment.

1. Run the docker-compose.yml file:

docker-compose -f docker-compose.yml up -d

2. Setup the Postgres database:

docker-compose -f docker-compose-pg.yml up -d

3. Write data to the database:

python generate_data.py

View the data in the database by executing the following commands (optional):

  • Connect to database:
docker exec -it postgres psql -U pxl numbers
  • View the databases:
numbers=# \dt
  • View the tables:
numbers=# \l
  • View the data:
numbers=# SELECT * FROM "NUMBER" LIMIT 1;

4. Create a source connector:

./source_connector.sh

When the connector is successfully created, the message “HTTP/1.1 201 Created” will be shown in the terminal. The status of the connectors can also be viewed in the Control Center.

5. Create a sink connector:

./sink_connector.sh

Again, if the connector is successfully created, the message “HTTP/1.1 201 Created” will be shown in the terminal.

Now, go back to the Postgres database and view all tables for the database numbers. The table ‘P_NUMBER’ should be added.

To see if the data is streaming, execute the queries below:

SELECT COUNT(index) FROM "NUMBER";SELECT COUNT(index) FROM "P_NUMBER";
Source (left) — Sink (right)

By inspecting both tables, we can see that the data is being streamed from one table into the other. And there you have it. You just created a streaming pipeline.

Conclusion

We started with a simple pipeline and eventually formed a streaming data pipeline. With these two examples, you should have a basic understanding of how to set up a data pipeline in Kafka.

With this information of Kafka, you can build some real-world applications. Perhaps these awesome projects can give you some inspiration:

Lastly, I want to thank you for reading until the end! As this is my first blog post, ever, feel free to leave me any feedback on where and how I can improve — it will be very much appreciated!

References:

Many thanks to Tuan Nguyen for his post. It helped me out tremendously! Be sure to check out his article:

--

--

WesleyBos

My interests reside in the field of Big Data. I like to perceive the world as one big data problem, and within, lies the knowledge to transform people’s lives.