Faster and safer event stream processing

Shani Cohen
Israeli Tech Radar
Published in
5 min readDec 8, 2020

“Experience is knowledge. All the rest is information.”

There are many ways to learn. I believe that Experience is the most effective one.

When I face new challenges, do research, get hands-on experience, and reach conclusions, the knowledge is acquired better, and I can easily implement it to solve additional problems. The more experience I gain, the better problem solver I become.

People say that a year in Tikal is equivalent to five years of experience elsewhere, and after only three months I’m beginning to understand why. My role is to provide solutions for diverse challenges. When encountering a problem, I explore, test, and implement my solution. Every few months I will be given a new assignment, and so my learning curve is ever progressing

A Gentle Introduction to Stream Processing

The first challenge I encountered in my current assignment, led me to learn about processing the event stream.

Data-Stream or Event-Stream is a representation of an unbounded dataset, which is infinite, ever growing. The opposite of event-stream processing is batch processing, where the dataset is bounded.

The event-stream model can represent almost any business activity we want to analyze, e.g. financial transactions, updating user information, and gaming data.

There are 3 additional characteristics to event-streaming:

  1. The events of the stream cannot be modified (Immutable). Once the event is recorded, it cannot be changed
  2. Can be processed more than once (replayable)
  3. Event stream are ordered

One By One

We’ll focus mainly on the third characteristic — Event stream ordering. Essentially, this means that I must handle events in the same order in which they occurred.

Let’s take bank transactions as an example. If a deposit event happened before a withdrawal event, it’s important that the bank updates my bank account balance, by events time order.

If the order of the recorded transactions is not maintained, the bank may refuse to allow me to withdraw money because I don’t have enough money in my account before the deposit.

Basically, a dataset of events stores in a Messaging queue. In contrast to DataBase storage, where the order can be lost. Records in DB tables represent only a current state. This is one of the essential differences between DB and Event Stream.

How to safely process ordered events?

To maintain order, only a single consumer can handle ordered events. In my example, if one consumer handles my deposit request, while another consumer handles the withdrawal request, It cannot be guaranteed that the first consumer thread will finish before the second thread.

In a simple form of a solution, I would like one process to read the events in order and execute them one after the other.

So, what’s the problem?

This can work if I have a low load and short processing time, but if I have a lot of messages, or the processing time is long, the events will accumulate in the queue faster than the processing time, which will result in memory overload and may slow down the producer.

By the definition of the naive solution, I can not add consumers. Adding consumers is the best way to scale the processing time on a messaging system.

Let’s scale!

The solution lies in the fact that there are usually events that can be called in parallel. I don’t want the bank to process my information in the wrong order. But, I have no problem with the bank processing the information about other people at the same time!

If two different persons are making different operations on their accounts, one process would always track the events of the first person, and another process would track the second one. And in terms of messaging, the same sequence of events will be read by two different consumers.

But there are a lot of customers in the bank! This is true. There isn’t a unique consumer for each customer. I will use a hashing function to transform many customers into a few consumers.

Let’s say that according to my system requirements, I need to use three consumers to handle the load. Each customer has an ID number or other unique identifier. I’ll use modulo as the hash function to transform customer IDs to consumers. For example, customer number 5 will be processed by consumer number 2, because 5Mod3 is 2. From my hash function definition, all the customer transactions from a particular customer will always arrive to the same consumer, and that means that his events will be processed in the order. Transactions from another customer may also reach the same consumer, which is fine. What matters is that only one consumer processes all the events of a specific customer.

In my specific implementation, I managed the events in ActiveMQ server.

I used the message grouping concept to control the message targets.

Message groups are sets of messages that have the following characteristics:

  • Messages in a message group share the same group id, i.e. they have the same group identifier property (JMSXGroupID header).
  • Messages in a message group are always consumed by the same consumer, even if there are many consumers in a queue. They pin all messages with the same group id to the same consumer. If that consumer closes another consumer is chosen and will receive all messages with the same group id.

So, when a message gets a JMSXGroupID header, with the hashed customer ID, I can be sure that the messages from a specific customer, will be handled by the same consumer

Define groupId in ActiveMQ producer

Anyone who uses Kafka will probably use Message Key for the same purpose. Messages with the same key will reach the same partition, and eventually the same consumer in the consumer group.

Don’t let me down

One last tip towards the end. In general, the consumer code is very sensitive. If the consumer performs I/O operations, you should always set a timeout and treat it as an error. If the consumer is alive but stuck, all the messages that are supposed to reach it will get stuck in the queue, blow up the memory and slow down the system.

In conclusion. How did I scale an ordering event stream?

Ordered stream event processing can be done by one consumer to maintain order. When there are many of these, there is no choice and consumers need to be added. This is done by dividing tasks that are not dependent on each other into different consumers

References

  1. The great book, Kafka: The Definitive Guide: https://www.oreilly.com/library/view/kafka-the-definitive/9781491936153/ Chapter 11- Stream Processing

2. ActiveMQ — message groups: https://activemq.apache.org/message-groups

--

--

Shani Cohen
Israeli Tech Radar

Software Developer at Tikal - Fullstack as a service