Ordering of events in Kafka

Siddhartha Bhattacharya
Nerd For Tech
Published in
4 min readApr 19, 2020
Image source: https://www.dealsshutter.com

Certain use cases require strict ordering of events (messages/records with data payload and/or state) to be maintained across producers and consumers in a data pipeline. For example in financial institutions, order of transactions need to be preserved to compute account balances correctly. This article focuses on aspects that can be considered by producers and consumers to maintain such ordering.

Producer:

As per Kafka guarantees:

Messages sent by a producer to a particular topic partition will be appended in the order they are sent.

To ensure strict ordering across all events, the topic should be single partitioned. Well, it’s not too scalable. Also sequence may not need to be preserved across all events. For the use case above, we like to preserve the order of transactions for each account (and not across accounts). That’s where we use partition by keys while publishing events.

Kafka chooses which event to assign to which partition within the topic. However we can choose the partitioning key. Each event in Kafka consists of a key, a value and a timestamp. Default partitioner of Kafka uses hash of the event’s key to choose the partition — i.e., records with same key gets assigned to the same partition. Here’s how to implement in Python:

a) First import libraries — we plan to use a json event for this example.

b) Sample event to post:

c) Initiate producer: acknowledgement is set to all to guarantee records will not be lost as long as at least one in-sync replica in the broker list remains alive.

Note: If you are using confluent_kafka library instead, I will recommend also setting 'enable.idempotence': True that ensures exactly one copy of the message is written to the brokers.

d) Define key and post event — here we choose key1 as the partitioning key:

In above example, Kafka will send all events with key1 = 1 to the same partition. It is simple to include in any coding language of our choice. There are other considerations to keep in mind for the producers.

  1. Choice of keys may lead to “hot partitions” or in other words some partitions may be busier than others based on the key.
  2. Processing of events should maintain the order in which the events were created. Multi-threaded processes within the producer program for example, may disturb the ordering.
  3. Certain edge conditions may lead to Kafka guarantee being violated (and the fix) as explained by Kamil Charłampowicz here.

Consumer

Kafka guarantees:

A consumer instance sees records in the order they are stored in the log.

So why does consumer program bother about ordering if Kafka does the job? Well, consumer doesn’t until it encounters an exception. On exceptions, we may like to pause consumption at the point of failure and resume after the exception is resolved to maintain strict ordering. To do so, we can use the following api’s of Kafka:

PUT /connectors/{name}/pause - pause the connector and its tasks, which stops message processing until the connector is resumed

PUT /connectors/{name}/resume - resume a paused connector (or do nothing if the connector is not paused)

Implementation in Python would include the following steps (needless to say, can be implemented in other languages of our choice):

a) Include libraries:

b) Initiate consumer:

c) Poll for events:

d) Process events:

Let’s examine the above snippet. The consumer tries to post every event to database using fnPostDB(data). In case the database endpoint is not available, it notifies the exception using fnPostException(data) and pauses consumption from the topic partition.

Next step is to check if the exception is resolved (endpoint is available in this case). fnResume() may check the endpoint at fixed intervals and return True when available. When the exception is resolved, we like to retry failed task fnPostDB(data) and resume consumption from the partition.

Additional considerations for the consumer:

  1. Not all exceptions may be interface related. Though less likely in Production applications, we may encounter payload related exceptions (for example failure to de-serialize or required data element missing). In such cases, there’s no point to retry failed task. Instead, we may like to read corrected event from a replay topic before resuming the consumer.
  2. Consumer instance within the consumer group will stop processing from the topic partition once pause is issued. It may be apt for our example. However if different account numbers use different end points (as an argument), then we end up pausing consumption for all account numbers in the partition in our use case. The impact can be minimized by increasing the number of topic partitions and consumer instances but it may not be possible to match the cardinality of the partitioning key. As a result, the proposed solution will increase latency within the data pipeline.

Summary:

There may be several techniques for addressing the problem of event ordering in distributed systems — above is an approach which is “Kafka-native” and possibly needs least amount of coding.

--

--