Start your real-time pipeline with Apache Kafka

By Alex Kam, Data Engineer

HK01 Product & Technology team
HK01 Tech Blog
10 min readFeb 8, 2019

--

For both data engineers and DevOps, setting up a real-time data pipeline is no trivial task, yet, such architectures are immensely useful, for instance, real-time recommendation engine, real-time geo-location visualization.

One of the most commonly used solution for such pipeline is Apache Kafka. Developed by Linkedin in 2011, Kafka has been slowly growing into a mature platform for distributed streaming with good scalability, data persistence, fault-tolerance.

Thanks to the announcement of Amazon Managed Streaming for Kafka (AWS MSK in short) in November last year, setting up a production-grade Kafka cluster that on AWS is not as challenging as it used to be. In this article, I will go over some of the basics on starting a bare-bone Kafka cluster both locally and on AWS cloud.

Firstly, let’s go over some brief overview of the basic components of Kafka.

Apache Kafka consists of six core components, namely: Broker, Topic, Consumer, Producer, Connector, Streamer.

Relationship between Kafka Components

Kafka topic is a category/grouping for messages, allowing for:

  • data partitioning, with an immutable ordered sequence;
  • data persistence (retention period is configurable);
  • multiple-consumer subscription;

Kafka broker (sometimes referred as Kafka server/node) is the server node in the cluster, mainly responsible for:

  • hosting partitions of Kafka Topics;
  • transferring messages from Kafka Producer to Kafka Consumer;
  • providing data replication and partitioning within a Kafka Cluster;

Kafka consumer subscribes to the broker, and consumes data from a topic. It actively pulls data from the specified Kafka Topic with offset management.

Kafka consumer group is a set of consumers consuming data from some topics. Each partition in the topic is assigned to exactly one member in the group. Changes of consumer group members result in partitions re-assignment to consumers.

On the other hand, Kafka producer is responsible for writing data to a Kafka topic, in the following ways:

  • send message to a topic partition, then inform the partition leader;
  • allow batching and compressing configuration;
  • allow optimization between throughput and data guarantees;

To read from or write to various data sources (e.g. databases, distributed storage), rather than writing a producer or consumer for each data sources, Apache Kafka provides us a simpler abstraction namely Kafka Connect.

Mainly contributed by Confluent.io (a company founded by the creators of Kafka), Kafka Connect is an open source connector for connecting to various commonly used data sources, for instance, HDFS, JDBC or NoSQL databases (e.g. Cassandra, MongoDB, Elasticsearch).

The Kafka Connect component that reads from data sources is called Source Connector, and the component that write to data source is called Sink Connector. Both connectors make use of Kafka Connect API, which is much handier than the Kafka Producer API and Kafka Consumer API.

“Same same but different”, Kafka Connect makes stream handling easier.

Hence, using Kafka Connect over Producer and Consumer is recommended, unless there are no official connectors for your data source (e.g. RabbitMQ, InfluxDB).

Now with our data sources and sinks ready, we introduces the final component in Kafka. Kafka Stream is the component that allow us to filter, transform, and aggregate the data from all the data streams.

There are many streaming processing operations available in the Kafka Stream APIs, including mapping, reducing, filtering, and window aggregation. It is also possible to use other frameworks to process Kafka data, including Apache Spark, Apache Flink, Apache Beam, or even AWS Lambda.

Still, if you are looking for more details, you can always refer to these resources:

In this section, I will go over the steps to setup a Kafka cluster locally. All the sample codes are available in this repository:

The local cluster architecture is defined in the file docker/docker-compose.yml, which is as follows:

Components of the local Kafka Cluster
  • kafka1, kafka2 serve as two separate broker nodes. With the help of Apache Zookeeper, having two nodes provides us higher availability via redundancy;
  • schema_registry serves as the sole schema server, managing the schema (to be explained below) of all Kafka topics;
  • kafdrop, schema_registry_ui are the optional web UI for the Kafka cluster and the schema registry server respectively;
  • postgres represents the external databases containing the application data, which in this demo, the mock data;
  • connect-shell is the Kafka Connect App for us to interact through in the terminal;

To start with, please make sure you have both Docker and Docker Compose already installed on your machine. Let’s clone the repository and enter our project directory:

Start services by executing the command make up and services are expected to be running:

Output from the command “docker ps” — you can see the Kafka cluster is now ready.

After the cluster’s ready, we can now read and write to Kafka topics. Let’s warm up with Kafka Topic CRUD (Create, Read, Update, Delete) in the command line interface.

Using the Kafka Shell Client, we can instantiate a producer / a consumer application within the terminal. Opening two terminal shells, and type each of the commands in each separate shell:

Starting a producer with Kafka Shell.

Starting a consumer with Kafka Shell.

For real-time usage, source data would be coming from different external sources, rather than manual input from the terminal. To automate the data connection to data sources, as a proof of concept, we make use of Kafka Connect to connect to a mock PostgresSQL database.

  1. Mock data is ready in the initialization script. Confirm the data by:
    $ make db-preview
  2. Execute Kafka Connect to the PostgresSQL database, with Kafka Connect docker image provided by Confluent. We can use Kafka Connect on a local standalone cluster with the following command:
  • worker1.properties is a configuration file for the Kafka Connect Worker, which contains settings such as the Kafka cluster location, serialization format, etc.
  • connector{1/2}.properties is a configuration file for Kafka Connection, the data sources to be connected could be a data source or a data sink.

In the connect/dev/ directory, some predefined configuration files are already provided. Hence, we can use the following command to start a Kafka Connect with JSON serialization:

Now, Kafka Connect should be able to poll records in a database with detected schemas and perform JSON serialization. As illustrated above, we can see that the messages produced are of the following format, in which schema describes the message format and payload contains the actual message content:

You may notice that the same schema object is actually embedded in every single messages, which lead to a huge waste of storage space. Moreover, there is no schema validation upon the arrival of each message. To address the above problem, we will need some more components — enters Schema Registry and Apache Avro. Avro serialization and schema registry pair is not a must for Kafka database connection, but is highly recommended.

Apache Avro is a standard data serialization system. All the Avro schema are stored in a metadata server with versioning called Schema Registry.

Let’s try Kafka Connect with Avro serialization:

To verify if the new data schema is generated, let us access the registry schema server through its REST API:

Finally, to put the icing on the cake, we can visit two GUI interfaces to visualize the Kafka cluster and schema registry.

Kafdrop, which is a web interface for monitoring a Kafka cluster is available at http://localhost:9000.

Kafdrop Interface, showing the status of the Kafka cluster.

Schema Registry UI, a web interface for the schema registry, is available at http://localhost:8082.

Schema Registry UI, showing the details of available schemas.
  1. We have deployed the Kafka clusters as Docker containers;
  2. We have performed CRUD on Kafka topics;
  3. We have started a Kafka producer-consumer setup;
  4. We have connected a PostgresSQL database using Kafka Connect;
  5. We have added the Schema Registry;
  6. We have visualized the setup using web-based interfaces, namely Kafdrop and schema-registry-ui;

A round of applause for our progress! So far so good, but it isn’t a complete demo if it’s just running on our local machine. Now, we will deploy the entire architecture to AWS!

Especially when we have managed clusters as services.

While AWS MSK is still in its early preview stage (as of January, 2019), we will deploy an Apache Kafka cluster with the following components:

Before setting up any services, a VPC is required as a network interface. On top of that, we will need 6 subnets: 3 pairs of public and private subnet in 3 available zones.

In case you are not familiarize with AWS VPC and subnets, please follow this official AWS tutorial to complete the setup. You may use the below suggested values for the required configuration:

Suggested parameter for creating VPC and subnets
Created VPC and subnets can be viewed in the VPC dashboard

After creating the VPC and and its subnets, we then create the AWS MSK cluster through the MSK dashboard.

The AWS MSK cluster will be running in the private subnets of the newly created VPC. For the minimal, demonstration purpose, we have only 1 broker per availability zone (AZ, for short).

Configuration of creating AWS MSK cluster.

Since AWS MSK is still in preview stage, we can only retrieve the created MSK cluster’s information via the AWS CLI but not MSK dashboard:

Retrieve brokers and zookeeper information to proceed

Now, we have a Kafka cluster with three broker nodes managed by AWS MSK. Next, we are deploying the following components:

  • PostgresSQL database with mock data,
  • Kafdrop, and
  • An EC2 instance (namely containers server), for hosting the docker image.

We will deploy the above components using AWS CloudFormation. All the YAML file for this tutorial are created in aws-setup. Simply deploy the two services and containers server via CLI:

The deployment may take a few minutes. You can check the latest progress in the AWS CloudFormation Dashboard.

CloudFormation dashboard shows deployment stack’s status and details
CloudFormation dashboard also shows deployment progress and events

When the deployment has completed, we then check the containers server instance created and copy the public IP (containers_server_ip). We shall successfully see the broker statistics via Kafdrop in http://<containers_server_ip>:9000.

MSK cluster information in Kafdrop

Now, the environment is ready, and we can try out the data connection on the cluster by a Kafka Connect task. First, we wrap our Kafka Connect worker properties into a docker container using the script connect/stg/build.sh:

Similar to what we did in Step 3, we deploy the Kafka Connect service to AWS ECS using AWS CloudFormation:

We shall see created topics with messages via Kafdrop for the production environment.

Finally, database records are saved in MSK cluster!

To clean up the demo setup, we remove the CloudFormation stacks via CLI (as the second stack is serving in the first stack, we suggest you to remove the second stack first):

As in a simplified demonstration environment, we are not showcasing the best deployment practice. You are welcome to extend our example with:

  • Schema Registry integration,
  • Distributed Kafka Connect workers,
  • etc

As you can observe, it is not trivial to deploy a Kafka cluster on AWS despite the availability of AWS MSK. Yet, the deployment process should be more and more streamlined as such a new AWS product matures.

Thanks for reading this long tutorial. Hope this gives you more understandings on Apache Kafka and guidelines for application setups. For any questions or thoughts, feel free to contact me/create issues @https://github.com/kavimaluskam/kafka-starter 😺

Originally published at medium.com on February 8, 2019.

--

--