How we improved reliability of Kafka consumers

Vivek N
Open House
Published in
9 min readOct 31, 2022

By: Vivek Nagaraju, Software Engineer and Bastian Wieck, Senior Software Engineer

Buying and selling a home can be one of the most important, and sometimes stressful, moments in our customers’ lives which is why we want to make the experience as smooth as possible. Building reliable and robust engineering systems are instrumental to that goal.

At Opendoor, Kafka serves as our event streaming platform that lets different parts of our systems know when certain events happen. For example, we emit an “event” when the repair assessment is complete for a home. If that event capture is missed, customers do not receive their final repair numbers. Even just a small mistake, or loss of an event, can lead to greater problems down the road. We noticed events were being managed disparately and so to mediate any potential issues in the transaction we prioritized improving reliability for Kafka.

In this blog post we’ll discuss how we improved the reliability of our Golang Kafka SDK which is a wrapper around the Confluent Go SDK. We use code generation to simplify the lives of our application developers, so we wanted to continue using the Confluent Go SDK (some of our Kafka code generation is coupled with the library’s SDK ) and have our consumer be a drop-in replacement.

Basics of Kafka

Kafka allows us to emit and consume events asynchronously. It’s great for fanout architecture (e.g. broadcasting important events to multiple services), which allows applications to work together without depending on a synchronous interface. That is beneficial because if one application goes down, it doesn’t mean others have to stop working.

These events are organized and durably stored in “topics” within Kafka. Think of it this way: a topic is akin to a folder in a filesystem, and the events are like the files in that folder.

Topics are partitioned, meaning a topic is spread over a number of buckets located on different Kafka brokers. In the example below you can see that topic 1 is spread across Broker 1 and 3.

This distributed placement of data is very important for scalability because it allows client applications to both read from, and write to, many brokers at the same time. Events with the same key are sent to the same partition. It’s important to note that Kafka guarantees event ordering only within a single partition and not across partitions.

Image Source: https://docs.confluent.io/5.5.1/kafka/introduction.html#:~:text=Apache%20Kafka%C2%AE%20is%20a,of%20records%20as%20they%20occur.

Topics can have any number of consumers that subscribe to the data written to it. Each consumer can read the records at their own pace (as long as they consume an event within the retention period). Kafka uses offsets to determine which records to read next. An offset is a position within a partition for the next event to be sent to a consumer. Since each partition stores the events in order, a consumer uses the offset to ensure they’ve read all events. A consumer commits an event offset to indicate it does not need to read it anymore. For example, a consumer may try to read offset 9, but it crashes. When it comes back it is still at the same offset and can resume reading events without missing anything.

Image Source: https://docs.confluent.io/5.5.1/kafka/introduction.html#:~:text=Apache%20Kafka%C2%AE%20is%20a,of%20records%20as%20they%20occur.

Delivery Semantics in Kafka

Kafka supports at-least-once delivery semantics, but it’s important to understand the nuances of what this actually means.

With the default configuration, events which are polled are auto-committed every 5 seconds. This means events can be committed even before they’re processed leading to at-most-once delivery semantics.

At-most-once delivery: The publisher will send events at most one time, and will not try sending them again if it receives an error or timeout event from the broker.

It’s important to note that in Kafka, much of the consumer logic is handled by the clients and different clients can support different delivery semantics depending on the implementation. For instance, in the Java SDK at-least-once delivery semantics are possible with the default configuration (in a single threaded consumer) because the offsets are committed only during the poll call and not in a background thread.

It’s common for many applications to introduce multithreading to scale the consumers and this exacerbates the problem of achieving at-least-once delivery semantics.

The solution to this problem is to turn off auto-commit and manually commit the offsets when events are successfully processed.

An interesting question is what delivery semantics should you aim for? The answer is that it depends on how you plan to use Kafka! We started using Kafka for non critical background tasks and hence the default configuration with at-most-once delivery semantics has worked for us for a long time. We recently started investing in our event driven architecture and plan to use Kafka to orchestrate microservices. This means event reliability and ordering of events will be important, hence our focus on making our SDK more reliable.

Note that there’s a notion of exactly-once-delivery semantics in Kafka using transactional producers and consumers, but they have their own set of caveats which is a good topic for another blog post.

In general, if you want to build a reliable distributed system, your time is well spent on ensuring at-least-once delivery semantics with idempotent consumers.

Old Consumer Implementation

Let’s look at how our old consumer was implemented and discuss the limitations of that implementation.

Our Golang SDK is a wrapper around the confluent Go SDK. For context, multithreading in Golang is accomplished by using goroutines.

Each topic was processed in a separate goroutine and polled events were sent to a buffered channel (which was configured per topic). We had ‘auto-commit’ turned on (with default configuration) so offsets were committed every 5 seconds. There are limitations of this implementation:

  1. There’s no at-least-once delivery guarantee because of the issues we discussed above.
  2. Slow processing can block the entire topic since each topic is processed in its own thread. In the worst case, it can block processing events from all the topics if the buffered channel is full and we’re not able to send events to the topic.
  3. No built-in retry support. If there was an error processing the event, we just move on to the next event.

New Consumer Implementation

These were the goals we set for the new consumer:

  1. Should provide at-least-once processing guarantee.
  2. Should maintain high throughput.
  3. Should provide built-in retry support.
  4. Should be a drop-in replacement for the old consumer.

Let’s discuss how we solved the above limitations with our new implementation

With the new implementation, we process each partition in a separate goroutine. This is possible because Kafka only guarantees event ordering within a partition and not across the entire topic. This means slow processing of events will now only block the respective partition and other partitions can continue processing events successfully.

  1. We start a commit handler which periodically commits offsets for the events which have been successfully processed in a background thread. To guarantee delivery we only commit offsets after the event handler has returned successfully. Batching commits is important because committing offsets is an expensive operation.
  2. The events which are polled are sent to the respective partitions through a buffered channel. If the buffer is full, we pause the partition and continue polling for new events. This will ensure we don’t get events for the paused partitions in our subsequent poll calls until the partition is resumed.

We periodically check for paused partitions and resume them if the buffered channel has capacity to accept new events.

  1. If the event is successfully processed, we send the offset to the commit handler which will commit the offset based on the configured commit frequency.
  2. If the event is not successfully processed, we retry the event. Our SDK provides implementation for fixed and exponential backoff and also the ability to define custom backoff logic.
  3. Our SDK also provides an option for the application to recover the event which has exhausted all retry attempts. Applications can use this to perform clean up actions, send events to a dead letter queue, etc.

Consumer Rebalancing

Consumer rebalancing in Kafka means reassignment of partitions between different consumers. This usually happens when consumers join/leave the consumer group. A common scenario for this is when deployments happen. It’s important that we handle these events properly to keep our at-least-once delivery guarantee. When partitions are revoked:

  1. We remove the revoked partitions from the paused partition list.
  2. Gracefully shutdown all the threads processing the revoked partitions.
  3. Commit the offsets of successfully processed events for the revoked partitions. Although it is not our guarantee, we still prefer to minimize the possibility of duplicate events being processed by the applications.

Results and Future Improvements

The new consumer has been working smoothly, and we no longer see mismanaged events during deployments. Additionally, built-in retry support has helped us to not lose events during transient network failures.

That being said, there’s always room for improvement. These are some of our current focus areas we’re working on to continue making our consumers more reliable.

  1. When retry attempts are exhausted we let the application know and they can decide what to do with the error. Usually the application will record the failed event, but it continues processing subsequent events. We are exploring automatically pushing those errors to a retry topic. Any event for a topic with the same key after an error would go to the retry topic. That way ordering of events is maintained.
  2. Currently we retry every time the consumer returns an error. Sometimes an error is not retryable. It would be nice if non-retryable errors are sent directly to the retry topic.
  3. We will potentially explore different Kafka libraries to simplify our code.

We’d also like to explore replacing the Confluent Go SDK with kafka-go or sarama which provide better support for per partition consumption of events. This could simplify our code and make it more maintainable.

Conclusion

Kafka is a great tool for building event driven architecture but it’s important to understand the nuances of introducing multithreading when using Kafka.

Our Opendoor systems drive many important decisions that impact our partners, internal team and ultimately the end consumer. That means reliability, even down to the smallest events, is essential to ensure our operations run flawlessly to make any and all home transactions as seamless as possible.

We’d like to thank Rob Clemons who has been instrumental in making these improvements with us, alongside Troy Fendall, Mark Beilfuss and Eric Elmore for their valuable suggestions during the design phase of this project.

Interested in joining our team? Opendoor is hiring across all teams, including Engineering, Product Management and Data Science. Check out our current open positions!

Want to learn more about working at Opendoor? Check out our Product Management and Engineering and Data Science blogs.

--

--