How to use Apache Kafka to guarantee message ordering?

Chidambaram Kulandaian
Better Data Platforms
12 min readJan 8, 2021

LatentView has rich experience of building scalable data platforms with Kafka. Here, we address one of the pressing questions while deploying Kafka.

A Retail Scenario

Imagine that you are the VP of Data Platforms for a retailer. Your teams have built an analytics-ready data lake that delivers data for merchandizing, inventory, order management & fulfillment analytics. Your engineers & business analysts collaborated with consumption teams to build downstream, domain-specific applications leveraging the data platform, and deliver actionable insights to end users. Data scientists can build ML models and deploy them using automated workflows. You think everything is perfect.

However, you have a feeling that things could be better, and you start to look for improvements. You understand that your users are generally satisfied, but after the pandemic struck in early 2020, they desperately wanted more real-time information to adjust to rapidly changing consumer preferences. Your infrastructure was simply not equipped to deliver this at scale. However, you somehow managed to help them get what they want by hacking around your architecture.

Another issue is the cascading effects of source changes, data quality problems or job failures. Source systems are constantly changing. The ETL processing would break due to constantly changing schemas. Due to tight coupling in the ETL, changing the source schemas require significant co-ordination between the source systems and the analytics teams. In case of ETL job failures or data quality issues, downstream users continue to run their models and their analyses without being notified about things that can impact their work.

Recently, you met with the VP of your info sec team who wants to tap into your data management expertise to help analyze logs for suspicious activity and proactively identify fraud in real-time. But this requires collecting data much more granular and more real-time than what the current data platform allows.

All of these need a foundational architectural shift. It’s not that the existing architecture is a dud; after all, any architecture involves trade-offs. You need an architecture that supports granular data, low latency and high throughput processing, and decoupled consumers that have varying characteristics.

You need Apache Kafka!

Enter Apache Kafka

Apache Kafka is a messaging software used to transfer data and information between systems without coupling them too tightly. Invented by LinkedIn, Kafka is a staple of many modern distributed data platforms — both operational and analytical¹.

¹ If you have not already done so, please read this brilliant article by Jay Krepps on what every data engineer should know about the distributed log.

Given Kafka’s capabilities, it can support a wide range of use cases. At LinkedIn, it was built to collect metrics around application usage. In the retailer scenario above, Kafka can serve as the foundational platform that ingests data from multiple sources such as transactions, inventory, shipping, online customer tracking, supply chain IoT devices, etc. A variety of consumers can consume this data at real-time. For example, data lake can aggregate this data using batch and real-time processing. Security teams can do real-time streaming analytics to look for early warning signs of security-related issues or fraud.

The rest of this blog helps you to get better understanding on messaging and distributed logs and defines important Kafka concepts. Then, we finish this post with an explanation of how Kafka handles order of messages with multiple partitions / brokers.

The Basics of Kafka

Before exploring more on the basic understanding of Kafka and its components.

A message queue allows many disparate applications to communicate by sending messages to each other. A queue is a line of things waiting to be handled — in sequential order starting at the beginning of the line. The applications that read data from the message queue are called “consumers”. The applications that send messages to Kafka are called “producers”.

In a data platform, the messages can be the data, and the consumers can be the data processing programs. These processing programs can be batch oriented, or they can be real-time stream processing programs.

Illustration of a message queue

Why Kafka?

Kafka is highly scalable. It is very easy to add large number of consumers without affecting performance or reliability. That’s because Kafka does not track which messages in the topic have been consumed by consumers. It simply keeps all messages in the topic within a configurable period.

Kafka also supports different consumption models. You can have one consumer processing the messages at real-time and another consumer processing the messages in batch mode in a totally decoupled fashion.

Kafka supports the ability to source messages from a wide range of producers.

Message durability is high in Kafka. It persists the messages/events for the specified time period.

Kafka is linearly scalable for high volume of data. Adding more brokers / clusters will increase the throughput or decrease latency.

Kafka supports excellent integration with other processing frameworks. These include as Apache Storm, Spark, NiFi, Flume etc. to complete the job.

Kafka handles spike of the events more efficiently. This is where Kafka truly shines because it acts as a “shock absorber” between the producers and consumers.

Kafka Use Cases

Before we move on to our content let us discuss few more real-time applications of Kafka with its use cases.

Messaging

Kafka works well as a replacement for a more traditional message broker. Message brokers are used for a variety of reasons (to decouple processing from data producers, to buffer unprocessed messages, etc.).

Website Activity Tracking

Kafka can be used to build a user activity tracking pipeline as a set of real-time publish-subscribe feeds. Site activity (page views, searches, or other actions users may take) can be published to central topics with one topic per activity type.

Log Aggregation

Kafka is a replacement for a log aggregation solution. Log aggregation typically collects physical log files off servers and puts them in a central place (a file server or HDFS perhaps) for processing. In comparison to log-centric systems like Scribe or Flume, Kafka offers equally good performance, stronger durability guarantees due to replication, and much lower end-to-end latency.

Kafka Architecture and its Components

Drilling down below, here is a graphic that describes a Kafka set up.

Kafka Architecture

Kafka Brokers

A Kafka broker is a server running in a Kafka cluster (or, put another way: a Kafka cluster is made up of a number of brokers). Typically, multiple brokers work in concert to form the Kafka cluster and achieve load balancing and reliable redundancy and failover.

Apache ZooKeeper

Kafka brokers use ZooKeeper to manage and coordinate the Kafka cluster. ZooKeeper notifies all nodes when the topology of the Kafka cluster changes, including when brokers and topics are added or removed.

Kafka Producers

A Kafka producer serves as a data source that optimizes, writes, and publishes messages to one or more Kafka topics. Kafka producers also serialize, compress, and load balance data among brokers through partitioning.

Kafka Consumers

Consumers read data by reading messages from the topics to which they subscribe. Consumers will belong to a consumer group. Each consumer within a particular consumer group will have responsibility for reading a subset of the partitions of each topic that it is subscribed to.

Kafka Topics

A Kafka topic defines a channel through which data is streamed. Producers publish messages to topics, and consumers read messages from the topic they subscribe to.

Kafka Partitions

Within the Kafka cluster, topics are divided into partitions, and the partitions are replicated across brokers. From each partition, multiple consumers can read from a topic in parallel.

Consumer Group

A Kafka consumer group includes related consumers with a common task. Kafka sends messages from partitions of a topic to consumers in the consumer group. At the time it is read, each partition is read by only a single consumer within the group.

Topic Replication Factor

Topic replication is essential to designing resilient and highly available Kafka deployments. When a broker goes down, topic replicas on other brokers will remain available to ensure that data remains available and that the Kafka deployment avoids failures and downtime.

The graphic below provides an overview of various Kafka components and how they interact.

Kafka components and how they interact

How to Ensure the Order of Messages

In Kafka, order can only be guaranteed within a partition. This means that if messages were sent from the producer in a specific order, the broker will write them to a partition and all consumers will read from that in the same order. So naturally, single-partition topic is easier to enforce ordering compared to its multiple-partition siblings.

However, with a single partition, it is difficult to achieve parallelism and load balancing.

There are multiple ways in which we can achieve order of messages, parallelism and load balancing in a single way. Let’s see how Kafka handles order of messages with single broker/single partition and multiple brokers/multiple partitions.

Order of Messages with a Single Broker

The order of message in Kafka works well for a single partition. But with a single partition, parallelism and load balancing is difficult to achieve.

Let’s create a topic (with one replica and one partition) with topic name as “atm”:

kafka-topics.bat — create — zookeeper localhost:2181 — topic atm — replication-factor 1 — partitions 1

Consider a sample dataset with schema as row_no, name, transaction_type and amount as below:

0001,Robert,credit,100000002,Thomas,debit,200000003,Hari,credit,4000000004,John,debit,3000000005,peter,debit,398700

Kafka comes with a command line client that will take input from a file or from standard input and send it out as messages to the Kafka cluster. By default, each line will be sent as a separate message.

Let’s publish the above sample dataset to our new topic atm:

kafka-console-producer.bat — topic atm — bootstrap-server localhost:9092>0001,Robert,credit,10000>0002,Thomas,debit,20000>0003,Hari,credit,400000>0004,John,debit,300000>0005,peter,debit,398700

Let’s consume the same message which we created above:

The order will be obtained same as above for a single partition.

kafka-console-consumer.bat –bootstrap-server localhost:9092 — topic atm — from-beginning>0001,Robert,credit,10000>0002,Thomas,debit,20000>0003,Hari,credit,400000>0004,John,debit,300000>0005,peter,debit,398700

Order of Messages with Multiple Brokers

There are three methods in which we can retain the order of messages within partitions in Kafka. Each method has its own pros and cons.

Method 1: Round Robin or Spraying

Method 2 : Hashing Key Partition

Method 3 : Custom Partitioner

Method 1: Round Robin or Spraying (Default)

In this method, the partitioner will send messages to all the partitions in a round-robin fashion, ensuring a balanced server load. Over loading of any partition will not happen.

To illustrate, let’s create a topic (with three replicas and three partitions) with topic name as “atm2”:

kafka-topics.bat — create — zookeeper localhost:2181 — topic atm2 — replication-factor 3 — partitions 3

Let’s publish the same sample dataset to our new topic atm2:

kafka-console-producer.bat — topic atm2 — bootstrap-server localhost:9092>0001,Robert,credit,10000>0002,Thomas,debit,20000>0003,Hari,credit,400000>0004,John,debit,300000>0005,peter,debit,398700

Let’s create a consumer that consumes the messages from Kafka:

kafka-console-consumer.bat –bootstrap-server localhost:9092 — topic atm2 — from-beginning>0004,John,debit,300000>0001,Robert,credit,10000>0003,Hari,credit,400000>0002,Thomas,debit,20000>0005,peter,debit,398700

Let’s dive into what’s happening under the hood. There are three partitions (A, B & C). Partition B works fast because of low network and system latency, and the messages sent to it have been consumed first. Then comes Partition C, followed by A.

Illustration for Round Robin method

By this method parallelism and load balancing is achieved but it fails to maintain the overall order but the order within the partition will be maintained. This is a default method and it is not suitable for some business scenarios. If debit transaction happens before credit then it will be vague to the business users who consume the messages.

In order to overcome the above scenarios and to maintain message ordering, let’s try another approach.

Method 2: Hashing Key Partition

In this method we can create a ProducerRecord, specify a message key, by calling new ProducerRecord (topic name, message key, message).

The default partitioner will use the hash of the key to ensure that all messages for the same key go to same producer. This is the easiest and most common approach. This is the same method which has been used for hive bucketing as well. It uses modulo operation for hashing.

Hash(Key) % Number of partitions -> Partition number

However, simply sending lines of text will result in messages with null keys. In order to send messages with both keys and values we must set the parse.key and key.separator properties on the command line when running the producer.

The below is the code snippet for hashing method which sets the parse.key property to true, and specifies the key.separator as “:”.

The keys in the sample messages below are key1, key2 with the values being value1, value2.

— broker-list localhost:9092 \— topic my-topic \— property “parse.key=true” \— property “key.separator=:”key1:value1key2:value2

Let’s create a topic (with three replicas and three partitions) with topic name as atm1:

kafka-topics.bat — create — zookeeper localhost:2181 — topic atm1 — replication-factor 3 — partitions 3

Let’s publish a few messages to our new topic atm1 with a key value for all the records:

kafka-console-producer.bat — topic atm1 — bootstrap-server localhost:9092 — property “parse.key=true” — property “key.separator=:”>0001:0001,Robert,Credit,20000>0002:0002,Albert,Credit,5000>0001:0001,Robert,Debit,10000>0002:0002,Albert,Debit,3000>0001:0001,Robert,Credit,10000

Let’s consume the same message which we created above:

As seen below, the order of the messages within the keys is maintained.

kafka-console-consumer.bat –bootstrap-server localhost:9092 — topic atm1 — from-beginning>0002:0002,Albert,Credit,5000>0002:0002,Albert,Debit,3000>0001:0001,Robert,Credit,20000>0001:0001,Robert,Debit,10000>0001:0001,Robert,Credit,10000
Illustration for Hashing method

From the above diagram, we can see there are two partitions, producer produces the messages from message 1 to message 5 with keys and values.

The broker assigns key 0001 to partition A (the high latency partition) and key 0002 to Partition B (the low latency partition), using the hashing key method. The consumer consumes the message based on the key value within the order.

With this method we can maintain the order of messages within the key.

But, the drawback with this method is as it uses random hashing value to pull the data to assigned partition, and it follows overloading of data to single partition.

Method 3: Custom Partitioner

We can write our own business logic to decide which message need to be send to which partition. With this approach, we can make ordering of messages as per our business logic and achieve parallelism at the same time.

For example, consider the previous ATM transaction dataset. Let’s say we want all the credit transaction should go to Partition A and the remaining debit transaction should go to Partition B with topic named as ATM.

Illustration for Custom Partitioner method

We can achieve above logic by writing a Scala code both at the producer level and consumer level.

A sample Scala code is given here: https://gist.github.com/chidambaram005/719e76c46b86f26c5ca11d76203b43f8. We can tweak our code based on our requirements.

Here are the key takeaways:

  1. One of the most important features of Kafka is to do load balancing of messages and guarantee ordering in a distributed cluster to achieve parallelism.
  2. Using a higher number of partitions leads to higher throughput and latency. However, it may lead to maintenance overheads (not addressed in this article).
  3. Using a hashing key partition, we can deliver messages with the same key in order, by sending it to the same partition. Data within a partition will be stored in the order in which it is written. Therefore, data read from a partition will be read in order for that partition with producer key.
  4. Using a Custom Partitioner, we can route messages using arbitrary business rules.

--

--