Using Redis Streams To Implement Near Cache Invalidation

Shekhar Gulati
Dec 16, 2019 · 7 min read
Photo by Jeremy Bishop on Unsplash

Early last year I was working on an application that kept data in a near cache. I will not delve into reasons why we used near cache other than that we had strict performance requirements that required data closer to the compute.

Near cache for those who have not heard this term is a smaller local(in the same process as your application) cache that stores most recently used or most frequently accessed data. So, if you are running a Java application near cache could be as simple as a ConcurrentHashMap or you can use libraries like Cache2k or Caffeine to implement a near cache.

We had a global Redis cache but network latency and number of Redis calls we had to make in our worst request made it unsuitable for this use case.

As most software engineers know,

There are only two hard things in Computer Science: cache invalidation and naming things. — Phil Karlton

Invalidating a cache is a hard problem and with near cache it becomes much harder. The reason near cache invalidation is hard is because you have to invalidate cache in all your application processes. It is O(N) where N is the number of application servers or containers. Also, It is difficult to guarantee that all near caches will have the same state.

One way software developers invalidate near cache is by using a Publish/Subscribe messaging paradigm. All application servers will subscribe to a channel where changes will be published.

Redis has an in-built support for pub/sub that we can use to invalidate the near cache. Our applications on start up can subscribe to changeset channel.

redis:6379> SUBSCRIBE changeset

And, then a producer will publish messages on the changeset channel.

redis:6379> PUBLISH changeset “{‘key’:’k123',’operation’: ‘insert’, ‘value’: ‘serialized_object’}”

The subscriber will receive the message published in the changeset and print it to console.

redis:6379> SUBSCRIBE changeset
Reading messages… (press Ctrl-C to quit)
1) “subscribe”
2) “changeset”
3) (integer) 1
1) “message”
2) “changeset”
3) “{‘key’:’k123',’operation’: ‘insert’, ‘value’: ‘serialized_object’}”

In the publish/subscribe pattern, publishers can publish messages to one or many subscribers.

However, there are a couple of limitations of Redis Pub/Sub that you should watch out if you use it for implementing cache invalidation solution:

  1. The Pub/Sub messages are fire and forget i.e. they are not stored in Redis. This means you don’t have the capability to replay messages and bring your local state to current state. This becomes important if you implement near cache using an embeddable persistent solution like RocksDB. RocksDB keeps data on disk and will move data to RAM depending on the usage. If your server dies and comes back after some time then you would only like to apply the changes that were made during the time your server was unavailable. This is not feasible with Pub/Sub as you can’t replay messages that a server might have missed.
  2. The Pub/Sub only allows you publish a String message as shown in the example above. This means you have to parse the message before you can do anything. For example, if the message is a JSON then you will have to parse JSON. If producer only publish key of the cached object then you will have to fetch the actual object from the main datasource. This could lead to high lock times as most caching libraries like Cache2k uses lock to avoid thundering herd problem.

A couple of weeks back I became aware of Redis streams. In my opinion, it overcomes both the limitations of the Redis pub/sub that I outlined above. So, to me it is a superior solution to implement cache invalidation problem.

Redis stream is a new data type introduced in Redis 5.0. As per Redis creator Salvatore Sanfilippo, it is the most complex data type introduced in Redis core. In this post, I will not cover all the aspects of Redis streams API but will cover aspects that are useful to cover the cache invalidation problem discussed above.

Redis Streams implement append only log data structure. It supports basic operations like to add and get elements from a stream.

If we have a changeset stream then we will use following syntax to append element to it.

redis:6379> XADD changeset * key k123 operation insert value serialized_object

The command above means that we want to add key value pairs key:k123, operation:insert, value:serialized_object to stream named changeset. The * is used to tell Redis server that it should create new ID for the item.

The command will return response like as shown below.

1576324472387–0

The format of the response is millisecondsTime-sequenceNumber. The milliseconds time is the unix timestamp of the Redis server. The sequence number is used to differentiate between two entries created in the same millisecond.

Now that we know how to add the entry to a stream let’s learn how to read the entry.

There are two ways we can read data from the stream. The first way as shown below will return all the unread entries in the stream whose id is greater than 0. Since, we have used 0 it will read all the Stream entries.

redis:6379> XREAD STREAMS changeset 0

Instead of 0 we can also pass $ or any other ID, which means that we should receive only new messages, starting from the time we started listening. It works by using the ID of the last published message.

The second version shown below will block forever and wait for new message to be published on the changeset stream.

redis:6379> XREAD BLOCK 0 STREAMS changeset $

Let’s publish another message to our changeset stream.

redis:6379> XADD changeset * key k124 operation update value serialized_object“1576325388820–0”

You will notice that the client running XREAD will print the message on the console.

redis:6379> XREAD BLOCK 0 STREAMS changeset $
1) 1) “changeset”
2) 1) 1) “1576325388820–0”
2) 1) “key”
2) “k124”
3) “operation”
4) “update”
5) “value”
6) “serialized_object”
(33.58s)

Now, if another client connect to the changeset stream they can read messages from the start or specific time or any new message.

redis:6379> XREAD BLOCK 0 STREAMS changeset 0
1) 1) “changeset”
2) 1) 1) “1576324472387–0”
2) 1) “key”
2) “k123”
3) “operation”
4) “insert”
5) “value”
6) “serialized_object”
2) 1) “1576325388820–0”
2) 1) “key”
2) “k124”
3) “operation”
4) “update”
5) “value”
6) “serialized_object”

As you can see above, the new client was able to consume both the messages appended to changeset stream.

All the clients that are connected to a stream will receive the message. Streams support the same fan-out access pattern as supported by Pub/Sub with the additional benefit that you can replay older messages as they are stored in the Redis.

Using Redis Streams in Java

In this section, we will build a simple Spring Boot application to show how we can build solution for cache invalidation problem outline above.

Go to start https://start.spring.io/ and create a new Spring Boot application with spring-boot-starter-data-redis-reactive and spring-boot-starter-webflux dependencies. We will be using Spring Reactive support as we are working with streams.

We will start by creating a RedisConfig configuration class.

package com.xebia.cacheinvalidationexample;import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.ReactiveRedisConnectionFactory;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.stream.StreamReceiver;
import java.time.Duration;@Configuration
public class RedisConfig {
@Bean
public StreamReceiver<String, MapRecord<String, String, String>> streamReceiver(
ReactiveRedisConnectionFactory factory) {
return StreamReceiver.create(factory,
StreamReceiver.StreamReceiverOptions.builder().pollTimeout(Duration.ofMillis(100)).build());
}
@Bean
public NearCacheUpdater updater(StreamReceiver<String, MapRecord<String, String, String>> streamReceiver, Cache cache) {
return new NearCacheUpdater(streamReceiver, cache);
}
}

In the code shown above, we have done following:

  1. We created a bean of type StreamReceiver . It provides a reactive variant of a message listener. It will be responsible for consuming messages from a Redis stream. It is configured using StreamReceiverOptions which you use to define configuration like polling timeout, batch size, etc.
  2. Next, we created our custom NearCacheUpater that will use StreamReceiever created above to patch the near cache. Interesting thing to note is that StreamReceiver is of type <String, MapRecord<String, String, String>>. The first type parameter is the stream key type. In our case it is String. The second type parameter is MapRecord<String, String, String>. It is a record within a stream backed by a collection of key value pairs.

Next, we will create NearCacheUpdater that will subscribe to events published on the stream and patch the near cache.

package com.xebia.cacheinvalidationexample;import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.connection.stream.MapRecord;
import org.springframework.data.redis.connection.stream.StreamOffset;
import org.springframework.data.redis.stream.StreamReceiver;
import reactor.core.Disposable;
import reactor.core.publisher.Flux;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import java.util.Map;
public class NearCacheUpdater {private Logger logger = LoggerFactory.getLogger(NearCacheUpdater.class);private final StreamReceiver<String, MapRecord<String, String, String>> streamReceiver;
private Cache cache;
public NearCacheUpdater(StreamReceiver<String, MapRecord<String, String, String>> streamReceiver, Cache cache) {
this.streamReceiver = streamReceiver;
this.cache = cache;
}
private Disposable subscription;@PostConstruct
private void postConstruct() {
Flux<MapRecord<String, String, String>> stream = streamReceiver
.receive(StreamOffset.fromStart(“changeset”));
subscription = stream.doOnNext(it -> {Map<String, String> event = it.getValue();
logger.info(“event = {}”, event);
cache.patch(event);
}).subscribe();
}
@PreDestroy
private void preDestroy() {
if (subscription != null) {
subscription.dispose();
subscription = null;
}
}
}

In the code shown above, we have done following:

  1. We started Redis stream consumer that consumes from the changeset stream. It returns a Flux of MapRecord.
  2. Next, we subscribed to the Flux. For each event published on the stream we call the cache to update itself.
  3. Finally, we dispose the Subscription before the beans is destroyed using @PreDestroy.

Run the application and publish events you will see the events in the console.

2019–12–14 19:36:15.293 INFO 14987 — — [ioEventLoop-4–2] c.x.c.ProductNearCacheUpdater : event = {key=k123, operation=insert, value=serialized_object}
2019–12–14 19:36:15.295 INFO 14987 — — [ioEventLoop-4–2] c.x.c.ProductNearCacheUpdater : event = {key=k124, operation=update, value=serialized_object}
2019–12–14 19:36:25.414 INFO 14987 — — [ioEventLoop-4–1] c.x.c.ProductNearCacheUpdater : event = {key=k125, operation=update, value=serialized_object}

Conclusion

Redis streams are powerful data type that makes it possible to solve difficult problems with ease. There is much more streams can do that I have not covered in this article. You should read Redis documentation on Streams if you want to know more about Streams and learn features like Consumer groups that I have not covered in this article.

Xebia Engineering Blog

The Xebia Engineering Blog

Shekhar Gulati

Written by

Programmer

Xebia Engineering Blog

The Xebia Engineering Blog

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade