Rendezvous with Kafka : A simple guide to get started

Suman Pattnaik
Walmart Global Tech Blog
16 min readFeb 17, 2019

--

What is Kafka?

The last decade saw Technology evolving in every field that humans could imagine. From nanotechnology to high speed Gaming leveraging GPUs, technology really has expanded and changed at a rate faster than ever. The most common attribute that grew with all the technological advancements was Data. There was some extremely huge growth in digital data that enterprises created. Far greater were the insights that enterprises expected to get out of the data, be it static or streaming. That posed a great challenge for companies which historically relied on Messaging Queues for transferring data from one system to another. For example data emanating from point of sales potentially could be used by finance , by accounts and also to create customer insights through analytics. If we have to do with queues, it would need multiple queues each containing the same exact data. The complexity continues to increase with other data points like order data, Website data etc. So there is also a greater challenge to maintain that infrastructure and ensure resiliency of the system and reliability and consistency of the data. The engineering Solution that came out to help with the situation and simplify the architecture is termed “Kafka”.

Kafka is a streaming Publish-subscribe platform that is capable of running in a distributed mode and handle millions of messages per minute. Here are its key capabilities :

  • Publish and subscribe to streams of records, similar to enterprise messaging system.
  • Store streams of records in a fault-tolerant durable way for a pre-configured amount of time.
  • Process streams of records as they occur or after the fact.

Genesis of Kafka

Originally created at LinkedIn in 2010

LinkedIn’s installation of Kafka processes over 1.4 trillion messages per day

In use at many organizations (2000+)

example : LinkedIn, Walmart, Netflix, Goldman Sachs etc

Why Kafka?

· Publish once and Subscribe for many use-cases .

· Designed to support batch and real-time stream processing

· Performs extremely well at very large scale and is Horizontally scalable.

· Fault Tolerant

In the example that we discussed above, we could just have the sales data from point of sales streamed to a “sales” topic in Kafka and there could be three independent consumers reading the data separately for accounts, sales and customer insights respectively.

So you could move from a architecture like this

to something like this

Use-Cases

  • Messaging System
  • Website Activity Tracking
  • Metrics and Log Aggregation
  • Stream Processing
  • Event Sourcing
  • Commit Log
  • Decouple system dependencies etc

Kafka Simplified EcoSystem

Kafka Extensive EcoSystem

Topics and Partitions

Topics

A Topic is a subject/feed name to which messages are stored and published. Messages are byte arrays that can store any object in any format. All Kafka messages are organized into topics. If one wishes to send a message he would send it to a specific topic and if one wishes to read a message he reads it from a specific topic. applications writing data to topics are called Producers and applications reading from topics are called consumers. Here are some key points to remember:

  • Topics are identified by a unique “name”
  • Stream of records published to a Topic
  • Data in topics live in cluster until a preconfigured TTL(Time to Live) is reached.
  • Topic can have zero, one ore more subscribers (multi-subscriber)
  • You can have multiple topics in a broker

Partitions

Kafka topics are divided into a number of partitions, which contains messages in an unchangeable ordered sequence(First in first out). Each message in a partition is assigned and identified by its unique location point called offset. A topic can also have multiple partition logs. This allows for multiple consumers to read from a topic in parallel and thus helps with Horizontal scaling. Here are some key points to remember:

  • Each partition is ordered
  • Records in the partitions are assigned a sequential id number called the offset
  • Messages within a partition are immutable
  • Offset are meaningful within a context of a specific partitions
  • Order of messages is guaranteed within a partition
  • Data is retained for configurable retention period
  • Records are distributed in round robin fashion unless a key is specified
  • Each partition contains the subset of the Topic’s messages
  • More partitions means more parallelism
  • Its important to chose the right number of partitions. Here is some guide to do so : https://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/

Brokers

A broker is a set of one or more servers running Kafka. Topics reside in brokers. Here are some key points to remember:

  • Brokers receive and store messages when they are sent by the Producers
  • A Kafka cluster will typically have multiple Brokers
  • Each can handle hundreds of thousands, or millions, of messages per second
  • Each Broker is identified with its id (broker.id)
  • After connecting to one or more broker broker (bootsrap.servers), you will be connected to all the servers in cluster
  • A minimum of 3 or 5 brokers in a cluster is recommended for high availability and throughput
  • Partitions are distributed across the Kafka cluster
  • Each partition is replicated across a configurable number of servers for fault tolerance
  • Any given Partition is handled by a single Broker
  • Typically, a broker handles multiple partitions
  • Example — Two topics with 3 and 2 partitions

Replication Factor

In order to facilitate resiliency and mitigate any data loss, Kafka like many other distributed technologies maintains copies of data on multiple servers.Replication factor is the number of copies that need to exist for the same data. A replication factor of 3 would mean that there are 3 copies with 2 ISR(In sync replicas). ISR is the number of redundant copies apart the one that exist on the leader.Here are some key points to remember:

  • Every topic partition in Kafka is replicated n times, where n is the replication factor of the topic
  • RF should be greater than 1 and less or equal to number of brokers
  • Out of the n replicas, one replica is designated as the leader while others are followers or ISR (In-Sync replicas)
  • Only leader handles all read and write requests for the partition
  • Example: Topic 1 with replication-factor 3 and 1 partition
  • Example: Topic 2 with replication-factor 3 and 2 partitions

Kafka Producer

A Kafka Producer is an application that sends messages to a Kafka topic.

Here are some key Kafka Producer configurations to take note of :

Core Configuration

bootstrap.servers : A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping — this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:9092,host2:9092,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).

key.serializer : Serializer class for key that implements the org.apache.kafka.common.serialization.Serializer interface.

value.serializer : Serializer class for value that implements the org.apache.kafka.common.serialization.Serializer interface

client.id : An id string to pass to the server when making requests. The purpose of this is to be able to track the source of requests beyond just ip/port by allowing a logical application name to be included in server-side request logging.

Message Durability ( Please select ones based on the sensitivity to data loss and over all speed of processing )

acks=0 (no acknowledgement, fire and forget, possible data loss)

acks=1 (default, wait for leader’s acknowledgement, limited data loss)

acks=2 (wait for leader’s and one other replica’s acknowledgement , limited data loss. Behaves like quorum in various other distributed and replicated platforms)

acks=all (wait for leader and all replicas, no data loss)

Note : Ack setting is designed for trade-off between performance and the chance of losing data. Its important to note that , the greater the Ack ( acks=all ), the slower the speed of processing as the client would wait on all replicas and the leader to send acknowledgment. Use this only when Loss of any one message is not tolerable and there is no other process to reprocess on failed messages. If set to zero then the producer will not wait for any acknowledgment from the server at all. The record will be immediately added to the socket buffer and considered sent. No guarantee can be made that the server has received the record in this case, and the retriesconfiguration will not take effect (as the client won't generally know of any failures). The offset given back for each record will always be set to -1.Preferred Ack is 2 for a prod application with a kafka cluster with ISR as 2. Non critical applications and applications where losing one message may trigger immediate resend of data ( chat message failure etc) , one may have acks=0 to gain on speed.

Message Ordering

Messages are written to the broker in the same order that they are received by producer client

retries > 0 can cause reordering (default is 0)

max.in.flight.requests.per.connection=1 with retries enabled: The maximum number of unacknowledged requests the client will send on a single connection before blocking. Note that if this setting is set to be greater than 1 and there are failed sends, there is a risk of message re-ordering due to retries (i.e., if retries are enabled).

Batching and Compression

batch.size — (size based batching) : The producer will attempt to batch records together into fewer requests whenever multiple records are being sent to the same partition. This helps performance on both the client and the server. This configuration controls the default batch size in bytes. The default size is 16384

No attempt will be made to batch records larger than this size.

Requests sent to brokers will contain multiple batches, one for each partition with data available to be sent.

A small batch size will make batching less common and may reduce throughput (a batch size of zero will disable batching entirely). A very large batch size may use memory a bit more wastefully as we will always allocate a buffer of the specified batch size in anticipation of additional records.When receiving compressed messages, 0.10.0 brokers avoid recompressing the messages, which in general reduces the latency and improves the throughput. In certain cases, however, this may reduce the batching size on the producer, which could lead to worse throughput. If this happens, users can tune linger.ms and batch.size of the producer for better throughput.

linger.ms — (time based batching) : The producer groups together any records that arrive in between request transmissions into a single batched request. Normally this occurs only under load when records arrive faster than they can be sent out. However in some circumstances the client may want to reduce the number of requests even under moderate load. This setting accomplishes this by adding a small amount of artificial delay — i.e, rather than immediately sending out a record the producer will wait for up to the given delay to allow other records to be sent so that the sends can be batched together. This setting gives the upper bound on the delay for batching: once we get batch.sizeworth of records for a partition it will be sent immediately regardless of this setting, however if we have fewer than this many bytes accumulated for this partition we will 'linger' for the specified time waiting for more records to show up. This setting defaults to 0 (i.e. no delay). Setting linger.ms=5, for example, would have the effect of reducing the number of requests sent but would add up to 5ms of latency to records sent in the absence of load.

compression.type — larger batches mean higher compression. Specify the final compression type for a given topic. This configuration accepts the standard compression codecs (‘gzip’, ‘snappy’, ‘lz4’, ‘zstd’). It additionally accepts ‘uncompressed’ which is equivalent to no compression; and ‘producer’ which means retain the original compression codec set by the producer.

Queuing Limits

buffer.memory — total memory that is available to the Java client for collecting unsent messages

max.block.ms — block additional sends for this duration before raising exception

request.timeout.ms — if message is queued for more than this duration then remove it from queue and raise exception

Kafka Consumer

An application that reads data from a kafka Topic is called a kafka consumer. Here are some key points to remember.

  • Consumer reads data from the topic
  • Consumer reads data in order within each partition
  • Consumer reads data in parallel across the partition

Here are some key Kafka Consumer configurations to take note of :

Core Configuration

bootstrap.servers : A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. The client will make use of all servers irrespective of which servers are specified here for bootstrapping — this list only impacts the initial hosts used to discover the full set of servers. This list should be in the form host1:9092,host2:9092,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down).

key.deserializer : Deserializer class for key that implements the org.apache.kafka.common.serialization.Deserializerinterface. One could create custom deserializer based on the need.

value.deserializer : Deserializer class for value that implements the org.apache.kafka.common.serialization.Deserializerinterface. One could create custom deserializer based on the need.

client.id : This value is specified by the kafka consumer client and is used to distinguish between different clients. This is different from group.id

Group Configuration

group.id : This property defines a unique identity for the set of consumers within the same consumer group.A new consumer with the same group Id would share message with the existing set of consumers.

session.timeout.ms : The timeout used to detect consumer failures when using Kafka’s group management facility. The consumer sends periodic heartbeats to indicate its liveness to the broker. If no heartbeats are received by the broker before the expiration of this session timeout, then the broker will remove this consumer from the group and initiate a rebalance. Note that the value must be in the allowable range as configured in the broker configuration by group.min.session.timeout.msand group.max.session.timeout.ms.

max.poll.records : The maximum number of records returned in a single call to poll().When not specified the default value is 500.

heartbeat.interval.ms : The expected time between heartbeats to the group coordinator when using Kafka’s group management facilities. Heartbeats are used to ensure that the worker’s session stays active and to facilitate rebalancing when new members join or leave the group. The value must be set lower than session.timeout.ms, but typically should be set no higher than 1/3 of that value. It can be adjusted even lower to control the expected time for normal rebalances.

Offset Management

enable.auto.commit : This allows for consumer’s offset to be periodically committed in the background if set true.

auto.commit.interval.ms : he frequency in milliseconds that the consumer offsets are auto-committed to Kafka if enable.auto.commit is set to true. default value is 5000

auto.offset.reset : This setting helps to identify the offset from where the consumer starts reading the data. Here are the options :

  • earliest: automatically reset the offset to the earliest offset
  • latest: automatically reset the offset to the latest offset
  • none: throw exception to the consumer if no previous offset is found for the consumer’s group
  • anything else: throw exception to the consumer.

Consumer group

A consumer group is a set of consumers which cooperate to consume data from some topics

  • Each consumer within a group is assigned a set of partitions to read from. For any given partition, only one consumer will read from it.
  • If you have more consumers than partitions, some consumers will be idle
  • Consumer uses a group coordination protocol built into Kafka
  • One of the brokers is selected as the group coordinator
  • Coordinator mediates partition assignment
  • Each member in the group must send heartbeats to the coordinator in order to remain a member of the group
  • Consumer must commit the offsets corresponding to the messages it has read
  • In case of consumer shutdown or crash, its partitions will be re-assigned to another member

Kafka Log Store(Segments and Indexes)

  • Kafka’s storage unit is a partition
  • A partition cannot be split across multiple brokers or even multiple disks
  • Topics are split into partitions
  • Partitions are split into segments
  • Partitions are split into segments
  • Segments are named by their base offset
  • Only one segment is active
  • Segment settings

log.segment.bytes — Maximum size of a single log file

log.roll.hours — Maximum time kafka will wait for rolling a new segment file

  • Partition is a directory and each segment is consist of an index file , timeindex file and a log file

What is a Zookeeper ?

  • ZooKeeper is a distributed, open-source coordination service for distributed applications
  • Keeps the list of brokers
  • Helps in leader election for partitions
  • Sends notification to Kafka for changes like
  • New Topic
  • Broker Failure
  • Topic deletion
  • Kafka needs Zookeeper to work
  • 3 to 5 servers works very well (odd quorum)
  • Only one server is leader and rest are followers

Its Time to Make your hands dirty ( Setting up and Running Kafka Locally)

You would need Zookeeper and Kafka started before we can run the local test.

bin/zookeeper-server-start etc/kafka/zookeeper.properties

2019–02–16 23:37:10,683] INFO Using org.apache.zookeeper.server.NIOServerCnxnFactory as server connection factory (org.apache.zookeeper.server.ServerCnxnFactory)

[2019–02–16 23:37:10,709] INFO binding to port 0.0.0.0/0.0.0.0:2181 (org.apache.zookeeper.server.NIOServerCnxnFactory)

  • Start Kafka

bin/kafka-server-start etc/kafka/server.properties

[2019–02–16 23:40:44,452] INFO Cluster ID: cqdE3HyPTQ-O6uGW9hKwVQ (org.apache.kafka.clients.Metadata)

[2019–02–16 23:40:44,557] INFO [Log partition=__confluent.support.metrics-0, dir=/tmp/kafka-logs] Loading producer state till offset 0 with message format version 2 (kafka.log.Log)

[2019–02–16 23:40:44,558] WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 3 : {__confluent.support.metrics=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

[2019–02–16 23:40:44,575] INFO [Log partition=__confluent.support.metrics-0, dir=/tmp/kafka-logs] Completed load of log with 1 segments, log start offset 0 and log end offset 0 in 141 ms (kafka.log.Log)

Log message when a Topic is created

[2019–02–16 23:40:44,580] INFO Created log for partition __confluent.support.metrics-0 in /tmp/kafka-logs with properties {compression.type -> producer, message.format.version -> 2.1-IV2, file.delete.delay.ms -> 60000, max.message.bytes -> 1000012, min.compaction.lag.ms -> 0, message.timestamp.type -> CreateTime, message.downconversion.enable -> true, min.insync.replicas -> 1, segment.jitter.ms -> 0, preallocate -> false, min.cleanable.dirty.ratio -> 0.5, index.interval.bytes -> 4096, unclean.leader.election.enable -> false, retention.bytes -> -1, delete.retention.ms -> 86400000, cleanup.policy -> [delete], flush.ms -> 9223372036854775807, segment.ms -> 604800000, segment.bytes -> 1073741824, retention.ms -> 31536000000, message.timestamp.difference.max.ms -> 9223372036854775807, segment.index.bytes -> 10485760, flush.messages -> 9223372036854775807}. (kafka.log.LogManager)

[2019–02–16 23:40:44,584] INFO [Partition __confluent.support.metrics-0 broker=0] No checkpointed highwatermark is found for partition __confluent.support.metrics-0 (kafka.cluster.Partition)

[2019–02–16 23:40:44,597] INFO Replica loaded for partition __confluent.support.metrics-0 with initial high watermark 0 (kafka.cluster.Replica)

[2019–02–16 23:40:44,608] INFO [Partition __confluent.support.metrics-0 broker=0] __confluent.support.metrics-0 starts at Leader Epoch 0 from offset 0. Previous Leader Epoch was: -1 (kafka.cluster.Partition)

[2019–02–16 23:40:44,679] WARN [Producer clientId=producer-1] Error while fetching metadata with correlation id 4 : {__confluent.support.metrics=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

[2019–02–16 23:40:44,993] INFO [Producer clientId=producer-1] Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms. (org.apache.kafka.clients.producer.KafkaProducer)

[2019–02–16 23:40:45,003] INFO Successfully submitted metrics to Kafka topic __confluent.support.metrics (io.confluent.support.metrics.submitters.KafkaSubmitter)

[2019–02–16 23:40:47,185] INFO Successfully submitted metrics to Confluent via secure endpoint (io.confluent.support.metrics.submitters.ConfluentSubmitter)

Log folder when no topic is created

  • Create Topic

bin/kafka-topics — create — zookeeper localhost:2181 — replication-factor 1 — partitions 1 — topic my-test

Log folder when topic is created

The necessary log files are created within a folder named after the partition number of the topic

Messages are stored in segment log file

Sending a message :

bin/kafka-console-producer — broker-list localhost:9092 — topic my-test

Listening and reading message from the topic :

bin/kafka-console-consumer — bootstrap-server localhost:9092 — topic my-test — from-beginning

Alternatively, topic creation and sample message generation, publish and listening can be done through confluent Control center.

Conclusion

Kafka is the most definitive and widely used Messaging system leveraged across various industries. This effort of mine is to simplify the process to get started with Kafka. In coming days, effort would be made to publish simplified articles to get started with Kafka Connect and Streams.

References

--

--

Suman Pattnaik
Walmart Global Tech Blog

MBA, Innovator, Distinguished Software Engineer at Walmart Labs