The Redis Streams We Have Known and Loved

Oleksii (Alex) Zakharchenko
bitso.engineering
Published in
8 min readOct 5, 2023

Context

On our way to better architecture, better scalability, better performance — well, better everything — we made a big step from a monolithic all-in-one to microservices.

Our key service quality KPIs are the throughput and latency of our heart — the Order Engine.

Hence, we are making our engine as fast and powerful as possible, and one of the paths is to make the order matching loop lightweight: anything that is not really required to match an order should not be there.

Thus, we extract code that could be run separately and wrap it into dedicated microservices.

The Need

Most of our services mentioned above should be called asynchronously (otherwise, what’s the point, we are winning neither performance nor reliability). Along with the necessity to be resilient and survive traffic spikes, that leads us to the need to have queues in front.

Using a message queue has a number of benefits: enhances system scalability by allowing asynchronous processing, enabling workloads to be distributed and balanced efficiently. It promotes system decoupling, reducing dependencies between components hence facilitating easier maintenance and evolution.

And reliability: we cannot afford to lose a single message — it’s our users’ money! So a message must survive consumer crash, queue instance crash, COVID and crypto winter… and be reprocessed afterwards.

Hence, the queue itself should be reliable, highly available and durable, and never lose a message.

The Initial Solution.

At Bitso, we extensively utilize Redis for its exceptional performance, reliability, and durability, if employed with persistence and replication. Surely, there are trade-offs to consider, such as the eventual consistency (which, in our specific case, is maintained at a single-digit millisecond level, but nonetheless is a factor we pay particular attention to)

Consequently, our natural choice for a messaging queue was a Redis queue (which is, essentially, a Redis list). In order to guarantee failure-free message retention, we have implemented the Reliable Queue pattern. Redis provides a wonderful BLMOVE atomic operation. This operation effectively transfers a message from the primary list to a custom secondary list (referred to as an “in-flight queue”) before returning the message to the consumer. In the event of a system crash during this process, the message remains intact and can be reprocessed.

When a consumer processes a message successfully, it removes it from the in-flight queue. Surely, we can face consumer crashes after successful processing right before the removal, leaving the message “unprocessed” while it’s actually processed. The message will be processed twice — yet that’s not an issue as long as we adhere to the idempotency processing principle (which is a separate big story to tell). Attempts to re-process an already processed message will be identified and lead to no changes.

And as a message is removed from the main queue after BLMOVE, it naturally becomes unavailable to other concurrent consumers, eliminating the extra buzz of maintaining consumer pointers etc.

Then, we can add a periodic task to return messages that resided in-flight for too long.

Sounds good.

So what’s wrong?

If you worked with queues for some time, you could already notice a leak in this approach. What if a message is really broken? Like, unparsable or trying to do something prohibited? As implemented, that will lead to an infinite loop. Which, at its best, will consume capacity — and if due to some bad mistake, there’s a number of such messages, they could just DoS the service.

The common answer is a Dead Letter Queue (DLQ). If a message fails to be processed N times, give up and move it to a stash for manual review (and ring a bell, of course).

Another thing is, that we’d like to update message meta information when moving from a main queue to in-flight. E.g. we want to set the exact time it’s moved — but that’s not feasible with BLMOVE. We have to inject a custom function to that on the Redis side (Lua script), and that’s another thing to code and test.

Can we avoid this burden, please?

Options

Let’s look around at what we have in the mysterious universe of message queues.

Everyone loves Kafka, it’s like a magic pill, just use it and all your problems are gone (or, they are moved to DevOps side having fun maintaining Kafka cluster)

But we fight for every millisecond digit. Introducing queues significantly helped us with throughput, but latency became even more problematic. And when talking about Kafka, it’s fast. But not fast enough for us.

Another thing is adoption, or the learning curve. Although some teams at Bitso use Kafka, the trading engine team is far more familiar with Redis and its nuances.

SQS is another mature market solution, a managed one, and they have cookies support DLQs out of the box! But, essentially, it’s an even worse story about latency, plus vendor lock-in, which we are trying to avoid whenever possible.

The Choice

And after looking around for some more alternatives, we finally came back to Redis, specifically to Redis Streams. We didn’t consider them from the very beginning, as Streams were a relatively new feature in Redis at that time, and we just missed that from our radars.

Redis Streams are advertised to be fast, and have two “modes” of operations: fan-out mode via XREAD or concurrent processing using consumer groups. The latter is what we need, and in addition, it has the in-flight tracking out of the box — that’s called Pending Entries List (PEL) and works in the following way: when a message is read from the main stream, it’s automatically “moved” to PEL by Redis, and is not available until a configurable timeout is passed. Nothing extra to do for us!

Message from PEL is not returned to the main stream after timeout, consumers have to call an extra method for that — but it’s not a big deal. We just need to call it once in a while to get non-processed messages.

And we still have to implement DLQ on our own. Unfortunately.

Adoption

First, we ran performance tests to double-check that the throughput of a Redis Stream was sufficient for our needs (that was thousands of messages per second per single stream, at our setup). And yes, to ensure the latency is really small, yay.

That’s our live number under a moderate load

As a next step, we did a proof-of-concept implementation for a single service. Testing again, and here we go, canary deployment to live: we started to redirect portions of traffic to the new service via brand new Reliable Stream, from a tiny bit to larger and larger, until we caught all the pitfalls and reached 100% of traffic going through the queue.

And of course, everything was covered with metrics. So we watched all the numbers constantly to ensure we were on the correct path.

That led us to play with different settings’ values to achieve the best performance, e.g. finding the optimal combination of read batch size, block timeout, internal thread executor queue size and number of threads in the pool.

The next step was an evolution: after the technology proved itself in a single service, we abstracted the core logiс, added some nice configuration, and called it the Reliable Redis Streams framework (fancy name, isn’t it?).

So how does it look?

Essentially, it consists of several layers:

  • StreamRedisOperations. It is an adapter abstraction on top of existing Redis client libraries, to define stream-specific endpoints.
  • ReliableStream. That’s where the reliable lifecycle of a message is defined. It’s short and illustrates the main steps, so let’s put it here fully.
/**
* Interface to define ReliableStream operations.
*/
public interface ReliableStream {
/**
* Reads a message from <code>config.getStreamName()</code> stream via long polling:
* if no messages are present, blocks for
* <code>redisQueueBlockedMs</code> milis.
* If no messages arrived during that interval, returns empty optional.
* @return Optional of <code>List&lt;StreamIncomingMessage&gt;</code>,
* or empty optional if stream is empty for a long time
*/
Optional<List<StreamIncomingMessage>> readMessages();

/**
* Reads a message from PEL (Pending List).
* Does not block if no messages are in PEL (returns immediately)
* @param excludedMessageIds Set of ids to exclude from result. If nothing to exclude, pass an empty set
* @return Optional of <code>List&lt;StreamPendingMessage&gt;</code>, or empty optional if nothing is present.
*/
Optional<List<StreamPendingMessage>> readPendingMessages(Set<String> excludedMessageIds);


/**
* Acknowledges the given message Ids are processed successfully, and removes them from the input stream.
* @param messageIds Any number of message Ids
* @see StreamIncomingMessage
*/
void acknowledge(String... messageIds);

/**
* Moves message with the given Id to DLQ.
* That consists of: acknowledge (to remove from PEL), deleted (to be removed from main stream),
* converted to JSON and pushed to <code>DLQ_LIST_NAME</code>
*
* @param messages Any number of StreamIncomingMessages
* @see StreamIncomingMessage
*/
void moveToDeadLetterQueue(StreamIncomingMessage... messages);
}
  • StreamMessageConsumptionPipeline. It encapsulates the steps which a stream message undergoes, along with defining an abstract Consumer.
  • MessageReadingLoop. That’s a ready-to-use class which you just need to configure using your framework configuration way and run, and it will run all the pipeline steps for you in a loop, including reading messages from main stream, checking PEL, acknowledging and removing processed messages, and moving intruders to DLQ. Easy peasy!

The communication diagram looks like this. The boxes with blue headers show the most important properties which should be configured.

Some Results

At the moment, Reliable Redis Streams have been successfully used in several mission-critical services for more than half a year under heavy load, well-survived several BTC price storms and other traffic spikes.

We also applied some tricks to better cope with uneven traffic distribution, but that’s probably another story to tell later.

But…

It’s still far from being perfect. Just while writing this post, we’ve got feedback with a couple of suggestions on what could be done really, really better.

And that’s awesome. We keep evolving and constantly looking for enhancements.

That’s what Bitso is about.

--

--