Scaling reactive APIs

Scalable solution for processing partitioned data and exposing it in realtime through an event streamed API

Realtime applications are becoming more and more popular, streams of data travel from one service to another continuously. We want our entire microservices architecture to behave like closed electronic circuits, always reacting to any input changes, triggers, or interruptions. Everything working together like a symphonic orchestra. And if this is not complex enough, we have to continuously grow our businesses, more and more data has to go through our reactive circuit. Services have to process more without appearing slower to the end consumer.

Frontend applications are becoming real-time mirrors that reflect any change that our orchestrated backend services do. We can achieve this by streaming data from backend services to frontend applications in realtime, keeping the end-user updated with all changes the system is going through.

It is an impressive evolution of cloud applications.

But growth always comes with challenges, one of which is making an API service scalable, a service that exposes a reactive API that opens an event stream on each HTTP request and keeps it open for a long period of time, updating the customer with any state change, continuously for many minutes or even hours.

At JustEat, the Tracker application uses this reactive API to inform millions of customers about the state of their orders every day in realtime.

How reactive APIs work?

A client does an HTTP request to the server and a data stream is opened where the server can communicate to the frontend. Think WebSockets but only one way, only the server emits data to the frontend. Both sides of the stream can close the stream and a heartbeat is present too. The client dictates how much data is being sent through the stream from the backend based on how fast it can process it, a feature known as backpressure.

So what is the problem?

On the backend side, data that is published through the API towards the frontend has to also come from a source. In our case, it is coming from a number of different partitioned Kafka topics. Because we are talking about high data streams, the input had to be distributed through multiple Kafka partitions to process multiple events in parallel.

In the diagram below I describe the architecture we started with:

  1. We receive the events from different partitions (Kafka Topic A: Partition 1, Partition 2, Partition 3). Each partition is sending data to one Service instance, ensuring each input event is processed once, by one Service instance only. In this diagram example, DataUpdatedEvent is being received through Kafka Topic A, Partition 3 and processed by Service Instance 2.
  2. In our service, for each input event, we create and update projections, our Customer Aggregate (Service: Instance 1, Instance 2). The projection is persisted in the DB that is shared between all instances. After the projection is updated successfully, we emit the update event to the reactive API so any listening customer gets updated too. Can be the same event or better a new event for the query side, but for our example, it does not matter. Each customer would receive updates only from its own Aggregate so we have to make sure we channel the right updates only to the customer that it corresponds to.
  3. A gateway is in front of the API that takes all Customer requests and directs them to only one Service instance. In this example the API Gateway is directing the /customer1 request to Service Instance 1.

And here is the problem: if data coming from the input Kafka topics is partitioned and we scale our service horizontally to take advantage of each instance processing only a part of the entire data, how can we ensure that the same service that received the HTTP request also processes the corresponding input partition.

disconnected events between input partitions and API

For example, a customer opens an event stream by doing an HTTP request on /customer1 to the API Gateway. The gateway assigns Service Instance 1 to process this request and set the stream publisher.

On the other end, Customer1: DataUpdatedEvent comes through Kafka Topic A, Partition 3 which is processed by Service Instance 2. But this service has no open streams on the API, there is no listener of the updates from the API side. Only Instance 1 knows about the opened event stream listener /customer1.


So it is clear that we need a solution to align input and output in a way that DataUpdatedEvents have a unique clear path through the entire pipeline: partition -> service instance -> event stream listener.

I have tested 3 possible solutions to find the optimal one:

#1 Quick Solution

One quick solution would be to share the DataUpdatedEvent between all the Service instances, after the projection is updated. We don’t know which Service Instance has the customer listener (request) but if we take each DataUpdatedEvent and do a fan-out to all the existing services, eventually, the instance with the stream listener will get the event and send it through.

In this case Customer1 DataUpdatedEvent would travel like this:

sharing data between instances using internal Kafka topics

Sharing the events between the instances can be done easily using another internal Kafka topic. Each Service instance will publish its own processed events from Topic A and publish all events for the Query side to the internal Kafka topic. Also, each instance will listen to all the events from all the partitions of the internal Kafka topic(using an asynchronous Kafka consumer-group).

This would work well until one point where the internal Kafka topic becomes the bottleneck, because we lose part of the partitioning benefits. Even if updating the projections are still done only by one Service instance, the DataUpdatedEvent still have to be received by all instances from the internal Kafka topic, published to the listener by 1 instance and ignored by the other instances.

#2 Simple Solution

This was my first approach. Use event sourcing to store the events which form the projection (the Aggregate) and let Reactive-Mongo emit it on each save.

So on a /customer1 HTTP request, we should open an event stream directly using the Reactive-Mongo Java driver by doing a query to get all events representing /customer1. This stream would be kept open. On each save of another Customer1 DataUpdatedEvent or another event representing the state update, Mongo would also run it against any open stream queries to publish the DataUpdatedEvent through the existing open stream.

In this case Customer1 DataUpdatedEvent would travel like this:

sharing data between instances using Reactive-Mongo

You can check my GitHub code example.

Nice and simple. This would be a good solution for internal private dashboards that have a limited number of users.

Unfortunately, this will not scale much. On each HTTP request, we create a new Reactive Mongo query that would open a separate Mongo connection. This database connection would be open until the HTTP event stream is also open. So our customer size and the Mongo connections will grow 1 to 1. Not scalable at all.

We need a better solution that can handle multiplexing.

#3 Best scalable solution

So I realized I need a solution that can do multiplexing to the data source, to use a limited connections pool. A way to add/remove queries on the fly to the existing multiplexed connections. Of course, while keeping the partitioning benefits all the way from input Kafka to the reactive API.

After investigating further, I came up with a viable solution using Redis-Streams (newly released with Redis v5) with a self-customized redis-streams-client.

A little about Redis-Streams first:

Basically, the Streams in Redis came with v5 and it’s a powerful feature where you can create a queue based on a Key where items are inserted and retrieved in a FIFO manner. On the other end, queries can be made on the Stream Keys and if there are no relevant old items in the Stream, the listener will wait until a new item will be published.

Redis can handle a huge number of different Stream keys. I have tested with 1 million + streams queried at the same time and it worked very fast.

You can read more about it directly on the Redis website.

The design of the Service

sharing data between instances using Redis Streams

Part1: Saving the events

The first part is persisting the events that will be served later by the API query. To do this we simply create a Kafka listener that will apply each incoming event to the Business Logic. For explaining how the scalable API works, we don’t care how the Business Logic works. In the end, the Business Logic will emit an event for the Query side of the application (as of CQRS architecture). This event will be saved in Redis to a particular Stream.

We need to define one stream for each customer’s data. To do this we will use the customerId as a stream Key and the event itself as the value. Every time a record is saved, Redis will create a new stream if the Key doesn’t exist already. If the stream exists, the event will be appended to the stream events queue.

There will be a collection of open API streams for all customers. Each instance will have its own streams collection, perfect for partitioning.

Also, there will be a RedisPuller, one per instance, that will pull events from Redis streams, querying multiple Redis streams in the same query. This puller will run continuously, taking the streams to query from the API streams collections. Also, the API collection would change dynamically without interrupting the puller, each new API request would add a stream to the collection and each time a customer closes the stream, it would remove it from the collection.

Let’s see this implementation in details:

The listener is using a reactive approach based on Reactive Flux from Spring (spring-cloud-streams-reactive). Each event is parsed, applied to the business logic using the doBusinessLogic method. This method will return a CustomerUpdatedEvent internal event for the Query side that represents the information we want to send to the API later. The CustomerUpdatedEvent is then passed to the CustomerRepository.

CustomerRepository will save the CustomerUpdatedEvent to Redis into a Stream. The stream name will be the id of the customer-defined by the getStringId() method. Actually, the Query side starts here, this part is completely separated from the business logic.

Indeed, the event will be saved as a byte array in Redis so no need for any special encoding. Since these Redis streams are internal for this application, both saving and querying is done in the same Repository. A standardized encoding protocol would just waste processing resources. The only downside is when inspecting Redis manually, it is not possible to directly read the events as text. If that is important, a JSON encoder/decoder would do fine here.

And finally the CustomerUpdatedEvent implementing an Identifiable to guarantee the string id that we need to be able to save in Redis.

Part2: Building the Redis puller

Next, we build an engine that will allow us to subscribe to many reactive streams and keep them open, limited by the allocated memory.

Then as soon as new data arrives in Redis, it pulls the new data and sends it to the correct streams. We will call this engine RedisPuller.

There are 2 sides to this puller engine. One to add and remove the API event streams and the second to pull real-time data regularly from Redis and publish it to the correct API event stream. Both of these sides will use a Map data structure to keep all open API event streams:

It will use a ConcurrentHashMap instead of a simple HashMap to be multi-threading proof while multiple customers can connect to the live RedisPuller at the same time.

The Key of the Map is the customerId as String. The value is an EventStream which is a wrapper over a collection of FluxSink<Object> that are the open API streams and some metadata.

  • String eventStreamId: the customerId, same as the Map key.
  • Long offsetTimeMs: offset to know which was the last event pulled from Redis. This is stored as milliseconds from Epoch.
  • Long offsetCount: counter offset, in case there are multiple events at the same time offset.
  • Set<FluxSink<Object>> fluxSinks: the open API stream collection that is used to publish data to the customer.

Pulling data from Redis-Streams in real-time and publishing to the correct API event stream

Now that we have a list of API event streams opened by the customers, we can use this to pull the data from Redis for all these customers.

First, we build an array (streamArray) of StreamOffset objects, that the Redis client needs to pull data from Redis.

If there are no API streams then we will stop the execution here without doing any Redis connection.

Next, it will do one Redis-Stream query to pull the first batch of events. The most important part is that the Redis client accepts a list of Streams to pull from. It will pull one batch of data from all the streams in one network request. In case of Redis connection fails, it will close all customer API streams to inform the customers about the error.

Once it has the new events batch from Redis, it iterates over them and publishes one by one to the correct stream based on customerId. For each published event, it will also update offsets in the main EventStream collection. This way the next time we call this method it will continue to pull from the last offset it left off.

the pulling part of the

The runPuller() method could measure execution time, and add a time limit to wait for next execution if it took less than 1 second for example. Just to not choke the network with unnecessary Redis queries. It is better to have fewer requests with more events than more requests with fewer events. This could cause a maximum of 1 second delay, totally acceptable.

There is one last important part here. This method needs to run continuously. And to build a resilient system, it will run in a separate thread and recover itself in case of a failure. For example, restarting Redis should not crash this system, just temporarily interrupt the events publishing.

Also the class implements AutoCloseable now so it can stop the Thread when the puller is not needed anymore, like a system restart.

The existing RedisPuller class will be updated with the continuous pulling thread:

starting Thread part of the

Part3: Building the API

The last part is about opening an event stream on each API request and add that stream to the RedisPuller, so the puller can pull events from Redis and send them to the stream.

When the customer interrupts the API request or there is a network issue, the stream will be closed and will trigger the removal of the stream from the RedisPuller. This way customers can open and close streams dynamically without interrupting the puller thread.

The method will have 2 arguments:

  • String eventStreamId: the customerId, sent by the customer on the API request
  • fromMs: the starting Redis-Streams offset, represented by the time in milliseconds when to start querying from. To receive only real-time events, current time in milliseconds can be passed here.

The method returns a flux of CustomerUpdatedEvent, the cast() method would do the conversion directly so there are minimum conversion resources needed.

On the controller side, it is pretty straight forward, injects the RedisPuller singleton and calls this getStreamEvents() method. Then just return the newly created Flux stream.


The circuit is complete. Saving and querying events using Redis-Streams work independently. Adding/removing customer event streams and continuously pulling data from Redis also work independently. The code can be updated to allow any of these parts to run in parallel.

Load testing was a bit challenging. But I managed to simulate over 250 000 simultaneously opened connections on an AWS production environment, so no networking latency. And I was continuously sending and reading events in realtime and measuring if delays are happening between input time and output time. The system could definitely handle more connections.

For load testing the RedisPuller integrated with `Redis` part only, I managed to simulate pulling data from Redis from 1 million streams in one single query. I am very impressed with how fast `Redis-Streams` is.

The architecture of the RedisPuller was inspired by kafka-clients which also pulls data from a source continuously. Unfortunately, Redis is not handling offsets with consumer-groups as Kafka does, that is why I added the offsets in the EventStream class.

So far this is running in production for more than 6 months and it was very stable.

The application services can be scaled horizontally to allow data to be distributed. All services will use the same Redis server. The next bottleneck point of the system will be the Redis connection, for sure. To improve this, sharding the Redis server would be the next step. I would suggest adding monitoring/alerting to be prepared when the business grows to that point.

API event streams can handle backpressure, this can be connected to the RedisPuller and configure to pull data slower from Redis when customer stream indicates to slow down the publishing rate. Of course, this has to be done independently on each EventStream object.

The implementation I explained here is a simplified version of what I use in production and the Customer domain is made up. Even so, this example is production ready but it needs some validations, monitoring, and logging. There should be a query limiter to not allow more than one query per second. Also, Redis needs a size limit cleanup to not consume memory forever. A graceful shutdown protection would be useful to allow the RedisPuller thread to finish pulling before interrupting. Gateways and firewalls also need to be updated to manage long-running requests.

I hope you enjoyed reading this as much as I enjoyed developing it. Would be great to have this feature implemented directly in the Redis-client directly. Maybe one day I can work on adding this when the time will have a different dimension.

Thanks for reading. :)

Thanks to Tracker team, CloudOps team and entire Scoober department of JustEat for all the great support offered over the time.

