An Opinionated Approach to Developing Event-Driven Microservice Applications with Kafka and Web-Sockets (Part 3 of 4)

Orhan Tuncer
Nov 4, 2019 · 9 min read

This is the third part of a four part series. The topics covered in the series are;

Part 1

  • RabbitMQ vs. Kafka
  • Delayed Executions
  • Distributed Tracing

Part 2

  • Message-Based Request/Response Structure
  • WebSocket-Kafka Gateway
  • Broadcasting Mechanisms

Part 3 — (this part)

  • Reusable Generic Commands
  • Message Guaranties
  • Automatic Retries(In-Order and Out-of-Order) with Exponential Backoffs

Part 4

  • Large Payload Support
  • Choreography-Based Sagas
  • Fault Injection
  • Authorization and Authentication
  • Automated Command API Generation

We now have a working application that actual users can use. It is time to think about some improvements and resiliency.


Reusable Generic Commands

With REST, you can have two APIs with the same path, “api/create/”; one of them accepts an apple, the other orange. And Spring Boot routes the request to the correct function.

Now think in terms of CQRS. When you want to create an apple, you use a CreateApple command. When it is an orange, you use a CreateOrange command. That generally means you need a new Command class for each new API you add. Maybe not entirely, but a lot of times, that is the way to go. They felt it was too cumbersome and wanted usage similar to REST.

Before we continue, let’s address the elephant in the room; Decoupling. Having different commands gives you a good amount of decoupling between services and also different versions. Having shared libraries and entities create coupling. Usually, decoupling is one of the differences between success and failure in a microservice application. It was a hot debate; the nature of the project, development and production lifecycles was also an input. After a lot of consideration, we decided to proceed with reusable commands. It is a trade-off where we are getting less code and configurable usage in exchange for some coupling between services.

All in all, it was a fun thing to develop. Let me walk you through it. Web socket messages use a specific path structure;

111.222.333.444/socket/[KafkaTopicAlias]/[P1]/[P2]/../[Pn]/[CommandAlias]

The first parameter is always the alias of the Kafka topic that we want to send. The last parameter is always the alias of the Command we are using. And in between is the ContextPath. The idea is that you can send the same Command to different paths or different Command classes to the same path.

myTopic/fresh/fruit/apple/ → create: Apple

myTopic/fresh/fruit/orange/ → create: Orange

myTopic/fresh/fruit/orange/ → update: Orange

On startup, services create a <ContextPath, Command, Payload>-to-<Class, Function> mapping. When it receives a message that it can deserialize and have the correct mapping, it forwards it to the correct handler method. If not, it ignores it, because it is possibly some other services responsibility. In this case, it is hard to figure out if a message has no handlers at all.

Now, CommandAlias-to-Command mappings can be done on the services or the gateways.

In the service case, after the Kafka listener receives the message, it tries to map the message payload to a Command. Remember, we do not have fine-tuned topics, so we will receive messages that we aren’t interested in. If it cannot, again it ignores it, because it is possibly some other services responsibility. RecordFilterStrategy can help here, so consider using it.

Gateway mappings fail faster, but it needs to know all command classes. During startup, it scans the modules and creates a CommandAlias-Command mapping. If there is no such Command or it cannot deserialize the sent value into the Command, it can return an error right away. You can go one step further and let it scan command handler classes too. In this case, it can conclude that the request won’t be handled and give an error about that also. Of course, this couples the gateway to other services via command and handler classes.

Again after some debates, we went with the gateway. For this to work, we decorated the Command classes with an alias annotation, added ContextPath property to PlatformMessage, and of course, implemented the command handlers on service level;

The generic part of the commands is handled more loosely. We serialize the raw data as a map; in the above example, it is inside “data” property. Kafka listener tries to deserialize the map into whatever the handler function is expecting. This approach proved to be quite powerful. For example, while we handle concrete classes in Service A, we can handle abstract classes in Service B and have a single handler for all fruit types.

In the end, in addition to reusable generic commands, we had a single access point to the application via a web socket connection. Kafka topics, context paths, and command aliases are parsed generically; therefore, gateway APIs do not change as the application grows. It requires a rebuild since the dependencies change in terms of command and handler classes. We have continuous deployment processes in place to mitigate the problem, but we won’t go into DevOps topic here.


Message Guaranties

It is about syncing the business transaction with the Kafka transaction. Consider these scenarios;

These scenarios leave us in an inconsistent state. Spring has methods to chain transactions but does not offer any guarantees. Solving this problem required some work. We created a proxy method for sending messages to Kafka. This method works in the same transaction scope of the parent method; Spring provides a powerful transaction model that helps a lot with this.

It writes all information of the Kafka messages to be sent in a separate table called “ProducedMessages” as “not sent”. After the transaction is committed, it creates an application event for messages to be sent. The event is handled by a method that invokes the Kafka producer and publishes the messages in a separate transaction scope.

The new scenario is like this;

Fail option A: Business transaction rollbacks, no event gets fired. We stay in a consistent state. The message will be retried; we will cover that later.

Fail option B: Business transaction commits, but messages cannot be sent. We have Kafka producer retries, which we will cover later, but let’s say, in the end, we could not publish the messages.

For these cases, a scheduled job periodically checks unsent messages and try to send them. Eventually (embrace the eventual consistency, right?), messages will be sent, and we will reach a consistent state.

Fail option C: Same as Fail option B.

Fail option D: Messages were sent, but ProducedMessages table was not updated. In this case, the scheduled job will send the same messages again. To handle this, we also keep the id of the messages we consume in a ConsumedMessages table as one level of idempotency. This way, if a consumer receives the same message more than once, it ignores the later ones.

As I said, it required a bit of coding, but in the end, we achieved a satisfactory result. You should probably check Listen-to-Yourself Pattern too. In some cases, it provides a more straightforward solution, but our complexity steered us to the above method.


Automatic Retries (In-Order and Out-of-Order) with Exponential Backoffs

The point is you don’t want to break a business operation for possibly momentary problems. Retry mechanisms are one way of doing this. If something goes wrong, try it again. How often and until when; you have to decide. As long as it’s not the user who’s doing the retrying, you are on the right path.

Kafka offers retry mechanisms for both producers and consumers with retry policies and backoff policies. But performance optimizations and retry strategies can break message ordering. Let me explain;

When aiming for performance, you do things in bulk and parallel. Let’s say we retrieved five messages in a single poll. Processing them one by one will limit our throughput, but if you handle them in async threads, you cannot control the order. So we choose option one. But if a message fails, we still have a decision to make. We can keep retrying the message. In this case, all the remaining messages will wait, and it will impact performance. If we skip it to try later, we again loose ordering.

There is more. Let’s say we successfully processed all messages. Now we will send the acknowledgment to commit our new offset and poll the next batch. But these two can also be done in an async fashion to increase performance. And if our commit fails, we may need to reprocess the second batch again. And even worse, we may have already finished the second batch when the previous commit’s retries fail.

It is the same with producers too. You have five messages to send. You can send them one by one and wait for each acknowledgment or send them in bulk or parallel. But if one fails, messages may be written out of order.

As you can see, strict message ordering and high performance do not go hand in hand. You need to understand your requirements in this area well and design accordingly. But first, let’s talk about how we can keep a strict message ordering with Kafka.

For consumers, if you are using max.poll.records=1 and your concurrency are set to 1, the next message will not be processed until the one at hand is completed. This configuration practically makes your retries in-order. For producers you should set max.in.flight.requests.per.connection=1 to keep your retries in-order.

As I said, these settings are not good for performance. They dictate that services can handle only one request at a time. Increasing the number of Kafka topic partitions and service replicas will help. But if you have a high load, it may not be sufficient. In this case, if you are still interested in having message ordering consider having different topics for business cases that require ordering. Then have two (or multiple) Kafka listeners with different settings, one optimized for performance, and one optimized for ordering, subscribed to relevant topics accordingly.

Retry policy has a StopExceptions property. It excludes some exception types from the retry mechanism. It is useful when handling validation exceptions and similar situations where you know you don’t have to retry. I recommend inheriting those exceptions from a NonRetriableException class because retry policy can traverse causes, which makes it a lot easier to handle.

Out-of-Order retries require a little more tooling. We again rely on our Delayed Execution feature. When in-order retries are exhausted, retry policy’s RecoveryCallback function handles the message. After checking for NonRetriableException, we can make a copy of the message and create a new future job from it. This way, we can acknowledge the message at hand and move on to the next one. When the future job executes, the same message will be inserted in Kafka, and we can try to process it again.

In this method, we will lose message ordering, but for cases that do not require it, we gain resiliency. For improved fine-tuning, we include an OutOfOrderRetriable attribute in our PlatformMessage. This way, we can manage scenarios that need ordering from those that don’t. By also adding an OutOfOrderRetryCount attribute, we can calculate the scheduled time of the future job exponentially.

If more than one service handles a message, republishing it could cause a problem where it is consumed more than once. But our ConsumedMessages table helps us around here too. So the services process only one copy of the message.


This is the end of the third part of the series. You can check Part 4 here.

Codable

Code Everything

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade