An Opinionated Approach to Developing Event-Driven Microservice Applications with Kafka and Web-Sockets (Part 2 of 4)
This is the second part of a four part series. The topics covered in the series are;
- RabbitMQ vs. Kafka
- Delayed Executions
- Distributed Tracing
Part 2 — (this part)
- Message-Based Request/Response Structure
- WebSocket-Kafka Gateway
- Broadcasting Mechanisms
- Reusable Generic Commands
- Message Guaranties
- Automatic Retries(In-Order and Out-of-Order) with Exponential Backoffs
- Large Payload Support
- Choreography-Based Sagas
- Fault Injection
- Authorization and Authentication
- Automated Command API Generation
After the decision is made to use Kafka and some necessary capabilities are developed, it is time to move on to the next problem; How to turn this system into an actual application.
Message-Based Request/Response Structure && WebSocket-Kafka Gateway && Broadcasting Mechanisms
These three topics are tied together strongly, so I will explain them together.
The convenience of having a synced Restful API is tough to overcome. You make a request, some service processes it, and it serves you the result. Event-based microservices disappoint you big time. It is ok when machines are talking to themselves, but when a user is sitting in front of a screen waiting for something to happen after clicking that button, async messaging is a headache. You need to consider;
- How to know when a business process is concluded; Is it a success, is there a validation error, an unexpected exception?
- Time-to-Live; In case some services are not available, how long will you wait until telling the user, well, not to wait?
- Time-to-Live Part 2; How will you tell the user not to wait?
Let me explain the architecture further. Frontend connects to a gateway via Web Socket. We chose web sockets over other technologies because we have high message traffic that flows in both directions. The gateway is a thin layer that converts socket messages to Kafka messages (PlatformMessage) and vice versa. It has a few more responsibilities that we will mention later.
We have three main classes that we use as Payload, which are also generic;
Command<T>: The main request that starts the business process.
CommandResult<T>: The response that is generated at the end of the business process.
Event<T>: Info about already happened state changes so that other services can do their own thing.
The main differences between commands and events are;
- Commands have an address while events don’t.
- Events have an EventType property for easy filtering. (Created, Deleted, etc.)
The frontend generates a random RequestId for each request. It then sends a request, including RequestId and additional client information to the gateway. The gateway converts it to a command (we will visit this later), wraps it as PlatformMessage, and publishes it to a Kafka topic.
The services that are subscribed to that topic all receive it, the one that has a matching handler processes it while others discard it. We didn’t want to increase our topic count too much, so they are not too refined. It is cheap enough for us to receive a message and discard it, thus maintaining a high number of topics doesn’t worth the cost.
Since our services are allowed to get a little bit bigger, most of the time, the business process is concluded as a single transaction. In that case, a CommandResult object is created as a follow-up message and published. We publish to the same topic we received the Command, but you can publish it to anyone.
In the case of distributed transactions, services that handled the Command generates the CommandResult at the end of the process. There is no automated way to this; it is a design rule that we follow.
During these processes, services publish events about the state changes of entities that can be of common interest. Determining these is a DDD process that we won’t go into. The one important rule is, all messages, event or otherwise, generated during a business process has to be created as a follow-up message. This process makes sure that the following messages carried around the original RequestId and other client information as part of their BreadCrumbs attribute. So when the CommandResult object is published, it contains identifying information the gateway can use to send it to the correct web-socket channel and client. The client can check the RequestId of the message and understand which operation it started has ended. Gateway also receives Events, and after some filtering, it broadcasts the event info to all clients. While the result only goes to the client who sent the Command, events go to all clients, so they all have the updated information.
That’s the general flow. Now let’s get into a little bit more detail. Web socket connections can have sessions, but we are not using them in our topology. Instead, when a client is loaded (in our case, a Single Page Application), it generates a random ClientSessionId and Public/Private Keys. It then subscribes to these web-socket topics;
Every request contains RequestId, ClientSessionId, and the PublicKey. WebSocket-Kafka gateways are auto-scaled and live behind a load-balancer. Web-socket connections can drop because of a network issue, rebalancing, or container failures. In that case, the client reconnects, but it may connect to a different gateway instance. But it subscribes to the same topics because ClientSessionId doesn’t change. Since because we don’t know when a client will be connected to which gateway, gateways are not configured as a consumer group. Only one consumer in a consumer group can subscribe to a Kafka partition, as in a message can go to only one instance in a consumer group. Because of the previous requirement, all gateways receive all messages and forward them to designated channels. Only one of them will have the target client connected. The other channels will be dead-ends. It is a little wasteful, but it allows us to have fully stateless gateways and handle connection changes fluently.
Gateway sends the CommandResult to “socket/[some-topic]/[ClientSessionId]” web-socket topic. The content is encrypted with the Public Key. So even if someone guesses the ClientSessionId, content cannot be decrypted.
Similarly, Events are sent to “socket/[some-topic]/broadcast” web-socket topic. They are sent openly, without encryption. At this point, you have to make a distinction. Some information can be received by all clients, regardless of their roles, while some must be subject to authorization. For this distinction, we include event metadata in broadcasts and expect clients to make subsequent calls to get full data and go through authorization. We also have some cases where we put all the data inside the broadcast message to decrease the roundtrips.
The downside of this approach is gateways do not share the workload efficiently. Since they all receive all the messages from Kafka, they all have to do some excess processing. But socket connections are distributed, and client request processing is balanced. Our system serves a limited number of users but connected to a large number of external systems. State changes resulted from external system integrations will be broadcasted, and they do not affect the overall situation. So we can get away with this inefficiency. Keeping an external registry to store and manage socket client information can mitigate some of these problems, but again, we are trying to keep our complexity to a minimum. We are going with good enough for now. If gateway scalability becomes an issue, we will add the extra features.
When the client reconnects to the web socket, it may be missing the in-between messages. Since gateways are stateless, they don’t keep track. To solve this, gateways append topic/partition/offset info of the Kafka message to the socket messages they send. Clients keep track of the last offsets by topic/partition. When clients reconnect, they send the last offsets they received and ask for all the missing messages. In return, the gateway creates a temporary Kafka consumer, records the currents offsets, seeks to the requested offsets, read all messages between the two, and send the appropriate ones to the client. This way, during reconnects, clients are updated to the current state. The log nature of Kafka makes this approach possible.
Now, we also have to consider TTL (Time-to-Live). User experience requires that we give some feedback to the user after a specific time. The frontend sends messages with a TTL value. If the request has not been completed by that time, we must give the user a time-out error. And after that point, we shouldn’t process the request. In our solution, we are leaving a margin of error here, but I will come to that.
To solve this problem, we are relying on our Delayed Execution feature. When the WebSocket-Kafka Gateway receives a command request with a TTL value set, it creates two messages. It first creates a message saying that operation is timed-out. It sends it to Redis as a future job to be processed TTL amount of time from now. Then it adds the Redis key to the second message and sends it as a Command to Kafka.
When a service processes the Command, it removes the Redis entry using the key and sends the CommandResult. If, for some reason, the message waits too long in Kafka, the future job executes, and the client receives the time-out error. When the service finally receives the Command, it sees that TTL has expired and sends it to Dead-Letter-Queue (DLQ) topic.
Now there is the case where a message is received and being processed when the TTL expires. This is a case that we are leaving unhandled. It is not a breaking problem, a little confusing since the user receives both the time-out error and the result. We are mitigating the problem to some degree with buffer intervals, but at the edge cases, the problem persists. Remember the CAP Principle? We decided that solving this edge case was too troublesome.
Alternatively, the frontend could have kept a timer for each request and displayed a time-out message all by itself. It has to keep the state during context changes, and also implement some logic — not a big deal for a SPA. But it does not solve the edge case that we discussed earlier. Also, it is a client-specific logic. The approach mentioned above helps when there are multiple client technologies. Also makes it easier to send time-out messages to integration endpoints if there is such a use case.
In this case, both approaches are equally acceptable. We have chosen the first one because we already had all the moving parts in working order and it was an easy enough implementation after that. And if all backend services are down, showing the TTL message is the least of our problems :)
This is the end of the second part of the series. You can check Part 3 here.