Start your real-time pipeline with Apache Kafka
By Alex Kam, Data Engineer
For both data engineers and DevOps, setting up a real-time data pipeline is no trivial task, yet, such architectures are immensely useful, for instance, real-time recommendation engine, real-time geo-location visualization.
One of the most commonly used solution for such pipeline is Apache Kafka. Developed by Linkedin in 2011, Kafka has been slowly growing into a mature platform for distributed streaming with good scalability, data persistence, fault-tolerance.
Thanks to the announcement of Amazon Managed Streaming for Kafka (AWS MSK in short) in November last year, setting up a production-grade Kafka cluster that on AWS is not as challenging as it used to be. In this article, I will go over some of the basics on starting a bare-bone Kafka cluster both locally and on AWS cloud.
Firstly, let’s go over some brief overview of the basic components of Kafka.
Apache Kafka consists of six core components, namely: Broker, Topic, Consumer, Producer, Connector, Streamer.
Kafka topic is a category/grouping for messages, allowing for:
- data partitioning, with an immutable ordered sequence;
- data persistence (retention period is configurable);
- multiple-consumer subscription;
Kafka broker (sometimes referred as Kafka server/node) is the server node in the cluster, mainly responsible for:
- hosting partitions of Kafka Topics;
- transferring messages from Kafka Producer to Kafka Consumer;
- providing data replication and partitioning within a Kafka Cluster;
Kafka consumer subscribes to the broker, and consumes data from a topic. It actively pulls data from the specified Kafka Topic with offset management.
Kafka consumer group is a set of consumers consuming data from some topics. Each partition in the topic is assigned to exactly one member in the group. Changes of consumer group members result in partitions re-assignment to consumers.
On the other hand, Kafka producer is responsible for writing data to a Kafka topic, in the following ways:
- send message to a topic partition, then inform the partition leader;
- allow batching and compressing configuration;
- allow optimization between throughput and data guarantees;
To read from or write to various data sources (e.g. databases, distributed storage), rather than writing a producer or consumer for each data sources, Apache Kafka provides us a simpler abstraction namely Kafka Connect.
Mainly contributed by Confluent.io (a company founded by the creators of Kafka), Kafka Connect is an open source connector for connecting to various commonly used data sources, for instance, HDFS, JDBC or NoSQL databases (e.g. Cassandra, MongoDB, Elasticsearch).
The Kafka Connect component that reads from data sources is called Source Connector, and the component that write to data source is called Sink Connector. Both connectors make use of Kafka Connect API, which is much handier than the Kafka Producer API and Kafka Consumer API.
Hence, using Kafka Connect over Producer and Consumer is recommended, unless there are no official connectors for your data source (e.g. RabbitMQ, InfluxDB).
Now with our data sources and sinks ready, we introduces the final component in Kafka. Kafka Stream is the component that allow us to filter, transform, and aggregate the data from all the data streams.
There are many streaming processing operations available in the Kafka Stream APIs, including mapping, reducing, filtering, and window aggregation. It is also possible to use other frameworks to process Kafka data, including Apache Spark, Apache Flink, Apache Beam, or even AWS Lambda.
Still, if you are looking for more details, you can always refer to these resources:
In this section, I will go over the steps to setup a Kafka cluster locally. All the sample codes are available in this repository:
The local cluster architecture is defined in the file docker/docker-compose.yml
, which is as follows:
- kafka1, kafka2 serve as two separate broker nodes. With the help of Apache Zookeeper, having two nodes provides us higher availability via redundancy;
- schema_registry serves as the sole schema server, managing the schema (to be explained below) of all Kafka topics;
- kafdrop, schema_registry_ui are the optional web UI for the Kafka cluster and the schema registry server respectively;
- postgres represents the external databases containing the application data, which in this demo, the mock data;
- connect-shell is the Kafka Connect App for us to interact through in the terminal;
To start with, please make sure you have both Docker and Docker Compose already installed on your machine. Let’s clone the repository and enter our project directory:
$ cd ~
$ git clone https://github.com/kavimaluskam/kafka-starter
$ cd kafka-starter
Start services by executing the command make up
and services are expected to be running:
$ make up # remember, you need to have docker installed
$ docker ps
After the cluster’s ready, we can now read and write to Kafka topics. Let’s warm up with Kafka Topic CRUD (Create, Read, Update, Delete) in the command line interface.
Using the Kafka Shell Client, we can instantiate a producer / a consumer application within the terminal. Opening two terminal shells, and type each of the commands in each separate shell:
Starting a producer with Kafka Shell.
Starting a consumer with Kafka Shell.
For real-time usage, source data would be coming from different external sources, rather than manual input from the terminal. To automate the data connection to data sources, as a proof of concept, we make use of Kafka Connect to connect to a mock PostgresSQL database.
- Mock data is ready in the initialization script. Confirm the data by:
$ make db-preview
- Execute Kafka Connect to the PostgresSQL database, with Kafka Connect docker image provided by Confluent. We can use Kafka Connect on a local standalone cluster with the following command:
$ connect-standalone worker1.properties connector1.properties [connector2.properties...]
- worker1.properties is a configuration file for the Kafka Connect Worker, which contains settings such as the Kafka cluster location, serialization format, etc.
- connector{1/2}.properties is a configuration file for Kafka Connection, the data sources to be connected could be a data source or a data sink.
In the connect/dev/
directory, some predefined configuration files are already provided. Hence, we can use the following command to start a Kafka Connect with JSON serialization:
Now, Kafka Connect should be able to poll records in a database with detected schemas and perform JSON serialization. As illustrated above, we can see that the messages produced are of the following format, in which schema
describes the message format and payload
contains the actual message content:
{
"schema":
{"type":"struct","fields":[{"type":"int32",...]...},
"payload":
{"id":101,"first_name":...}
}
You may notice that the same schema object is actually embedded in every single messages, which lead to a huge waste of storage space. Moreover, there is no schema validation upon the arrival of each message. To address the above problem, we will need some more components — enters Schema Registry and Apache Avro. Avro serialization and schema registry pair is not a must for Kafka database connection, but is highly recommended.
Apache Avro is a standard data serialization system. All the Avro schema are stored in a metadata server with versioning called Schema Registry.
Let’s try Kafka Connect with Avro serialization:
To verify if the new data schema is generated, let us access the registry schema server through its REST API:
- To query the list of schemas, visit http://localhost:8081/subjects;
- To query individual schema (example, app-users-value), visit http://localhost:8081/subjects/app-users-value/versions/latest;
Finally, to put the icing on the cake, we can visit two GUI interfaces to visualize the Kafka cluster and schema registry.
Kafdrop, which is a web interface for monitoring a Kafka cluster is available at http://localhost:9000.
Schema Registry UI, a web interface for the schema registry, is available at http://localhost:8082.
- We have deployed the Kafka clusters as Docker containers;
- We have performed CRUD on Kafka topics;
- We have started a Kafka producer-consumer setup;
- We have connected a PostgresSQL database using Kafka Connect;
- We have added the Schema Registry;
- We have visualized the setup using web-based interfaces, namely Kafdrop and schema-registry-ui;
A round of applause for our progress! So far so good, but it isn’t a complete demo if it’s just running on our local machine. Now, we will deploy the entire architecture to AWS!
While AWS MSK is still in its early preview stage (as of January, 2019), we will deploy an Apache Kafka cluster with the following components:
- Three broker nodes on AWS MSK,
- PostgresSQL database,
- Kafdrop, and
- Kafka Connect running on AWS ECS (Elastic Container Service).
- A Linux / MacOS machine,
- An AWS account, with root privilege,
- Latest AWS CLI installed on your machine with pre-configured credentials (using
aws configure
), - A Docker Hub account for hosting the Kafka Connect image,
- And again, clone https://github.com/kavimaluskam/kafka-starter 😺.
Before setting up any services, a VPC is required as a network interface. On top of that, we will need 6 subnets: 3 pairs of public and private subnet in 3 available zones.
In case you are not familiarize with AWS VPC and subnets, please follow this official AWS tutorial to complete the setup. You may use the below suggested values for the required configuration:
After creating the VPC and and its subnets, we then create the AWS MSK cluster through the MSK dashboard.
The AWS MSK cluster will be running in the private subnets of the newly created VPC. For the minimal, demonstration purpose, we have only 1 broker per availability zone (AZ, for short).
Since AWS MSK is still in preview stage, we can only retrieve the created MSK cluster’s information via the AWS CLI but not MSK dashboard:
Retrieve brokers and zookeeper information to proceed
Now, we have a Kafka cluster with three broker nodes managed by AWS MSK. Next, we are deploying the following components:
- PostgresSQL database with mock data,
- Kafdrop, and
- An EC2 instance (namely containers server), for hosting the docker image.
We will deploy the above components using AWS CloudFormation. All the YAML file for this tutorial are created in aws-setup
. Simply deploy the two services and containers server via CLI:
aws cloudformation create-stack \
--stack-name data-kafka-starter \
--template-body file://`pwd`/aws-setup/cloudformation.yml \
--parameters ParameterKey=BootstrapServer,ParameterValue=\"$STG_BROKERS\" ParameterKey=ZookeeperServer,ParameterValue=\"$STG_ZK\" ParameterKey=SubnetID,ParameterValue=\"$SUBNET_ID\" \
--region us-east-1
The deployment may take a few minutes. You can check the latest progress in the AWS CloudFormation Dashboard.
When the deployment has completed, we then check the containers server instance created and copy the public IP (containers_server_ip)
. We shall successfully see the broker statistics via Kafdrop in http://<containers_server_ip>:9000.
Now, the environment is ready, and we can try out the data connection on the cluster by a Kafka Connect task. First, we wrap our Kafka Connect worker properties into a docker container using the script connect/stg/build.sh
:
DOCKER_HUB=${your docker hub account}
DOMAIN=${domain for the containers server created}sh connect/stg/build.sh -h $DOCKER_HUB -b $STG_BS -d $DOMAIN
Similar to what we did in Step 3, we deploy the Kafka Connect service to AWS ECS using AWS CloudFormation:
aws cloudformation create-stack \
--stack-name data-kafka-starter-connect \
--template-body file://`pwd`/aws-setup/connect-cloudformation.yml \
--parameters ParameterKey=ImageName,ParameterValue=\"$DOCKER_HUB/kafka-connect-demo\" \
--region us-east-1
We shall see created topics with messages via Kafdrop for the production environment.
To clean up the demo setup, we remove the CloudFormation stacks via CLI (as the second stack is serving in the first stack, we suggest you to remove the second stack first):
aws cloudformation delete-stack --stack-name data-kafka-starter-connectaws cloudformation delete-stack --stack-name data-kafka-starter
As in a simplified demonstration environment, we are not showcasing the best deployment practice. You are welcome to extend our example with:
- Schema Registry integration,
- Distributed Kafka Connect workers,
- etc
As you can observe, it is not trivial to deploy a Kafka cluster on AWS despite the availability of AWS MSK. Yet, the deployment process should be more and more streamlined as such a new AWS product matures.
Thanks for reading this long tutorial. Hope this gives you more understandings on Apache Kafka and guidelines for application setups. For any questions or thoughts, feel free to contact me/create issues @https://github.com/kavimaluskam/kafka-starter 😺
Originally published at medium.com on February 8, 2019.