Redis client-side cache with async Python

Dmitry Kryukov
PandaDoc Tech Blog
Published in
6 min readMay 30, 2022

We can’t imagine a modern application that uses cache without Redis, because Redis is the most powerful and fast storage. It also supports different kinds of data structures and provides everything needed to build effective caching. Here I will highlight one feature that was released with Redis 6.0 and explain how you can use it with async python: client-side caching.

What is client-side caching?

Imagine that you have an application that uses Redis to store cache values. To get value from Redis you should connect to a server, build command and pack it, send the data to a socket, wait a bit, receive a response, and unpack it. This is fast, but what if you want get data faster?

What if you could have a copy of some values in your application’s memory? This means that when you write to Redis or get values from it you also store the key and the value in memory. However, you need to keep a consistent copy, because typically you will have more than one copy of the application or you can have another application that can change a value in Redis storage. You need a way to know when a value has expired or someone has rewritten it.

Starting from version 6.0 Redis provides a mechanism to make this simple. The application can subscribe for changes and after that receive messages that values have mutated/expired. Two different strategies are available. You can:

  • receive messages for keys that the connection has used — normal mode
  • receive messages for keys that match a particular pattern — Broadcasting mode

You can choose a strategy based on how your application uses a Redis connection and workload. If you want more details on how modes work, the Redis documentation explains them well.

There is always a connection pool in the Python application, and in this case, it is simpler to use broadcasting mode.

Let’s get started

First you’ll need a Redis server running on our machine:

> docker run --name redis -p 6379:6379 -d --rm redis:6-alpine

Start with a simple wrapper for a Redis client, that only can set and get a key:

import asyncio

from redis.asyncio import BlockingConnectionPool, Redis


class ClientSideCache:
def __init__(self, redis_host):
self.__pool = BlockingConnectionPool(host=redis_host, decode_responses=True)

def __await__(self):
return self.init().__await__()

async def init(self):
self._pool = await Redis(connection_pool=self.__pool)
return self

async def set(self, key, value):
await self._pool.set(key, value)

async def get(self, key):
return await self._pool.get(key)

async def main():
client = await ClientSideCache("localhost")
await client.set("my_key", "my_value")
print(await client.get("my_key"))

if __name__ == "__main__":
asyncio.run(main())

Check to make sure it uses Redis:

> docker exec -it redis redis-cli monitor
OK
1652658570.800467 [0 172.17.0.1:64786] "SET" "my_key" "my_value"
1652658570.804626 [0 172.17.0.1:64786] "GET" "my_key"

Now add your local storage and collect keys and their values on read and write:

class ClientSideCache:
def __init__(self, redis_host):
self._local_cache = {}

...

async def set(self, key, value):
self._local_cache[key] = value
await self._pool.set(key, value)

async def get(self, key):
if key in self._local_cache:
return self._local_cache[key]
value = await self._pool.get(key)
if value is not None:
self._local_cache[key] = value
return value

This _local_cache will be your client-side cache storage. Let’s check that you don’t have any get requests to Redis:

async def main():
client = await ClientSideCache("localhost")
await client.set("my_key", "my_value")
await client.get("my_key")
> docker exec -it redis redis-cli monitor
OK
1652659187.314014 [0 172.17.0.1:64830] "SET" "my_key" "my_value"

With broadcasting mode next you need to:

  1. open a connection to the Redis server
  2. execute the CLIENT ID command to get an ID of the connection
  3. execute CLIENT TRACKING on REDIRECT {client_id} BCAST PREFIX '' to tell Redis that you (your current connection) would like to know about changes to any key
  4. execute SUBSCRIBE __redis__:invalidate to subscribe for invalidation messages

Now you can open a new connection and set a key with a value to see the invalidation message received in __redis__:invalidate channel:

1) "message"
2) "__redis__:invalidate"
3) 1) "key"

Basically, this should be enough to keep our memory copy consistent. Now implement this in your wrapper:

class ClientSideCache:
...
async def init(self):
self._pool = await Redis(connection_pool=self.__pool)
asyncio.create_task(self._listen_invalidate())
return self

async def _listen_invalidate(self):
pubsub = self._pool.pubsub()
await pubsub.execute_command(b"CLIENT", b"ID")
client_id = await pubsub.connection.read_response()
await pubsub.execute_command(f"CLIENT TRACKING on REDIRECT {client_id} BCAST")
await pubsub.connection.read_response()
await pubsub.subscribe("__redis__:invalidate")

while True:
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=0.1)
if message is None or not message.get("data"):
continue
key = message["data"][0]
del self._local_cache[key]
...

In the init method you need to create a coroutine asyncio.create_task(self._listen_invalidate()).

In this task create a PubSub object and use it further, so you can use one connection to turn on client tracking and subscribe to messages. That’s basically it.

This is the entire our wrapper code:

import asyncio

from aioredis import BlockingConnectionPool, Redis


class ClientSideCache:
def __init__(self, redis_host):
self._local_cache = {}
self.__pool = BlockingConnectionPool(host=redis_host, decode_responses=True)

def __await__(self):
return self.init().__await__()

async def init(self):
self._pool = await Redis(connection_pool=self.__pool)
asyncio.create_task(self._listen_invalidate())
return self

async def _listen_invalidate(self):
pubsub = self._pool.pubsub()
await pubsub.execute_command(b"CLIENT", b"ID")
client_id = await pubsub.connection.read_response()
await pubsub.execute_command(
f"CLIENT TRACKING on REDIRECT {client_id} BCAST"
)
await pubsub.connection.read_response()
await pubsub.subscribe("__redis__:invalidate")

while True:
message = await pubsub.get_message(ignore_subscribe_messages=True, timeout=0.1)
if message is None or not message.get("data"):
continue
key = message["data"][0]
if key in self._local_cache:
del self._local_cache[key]

async def set(self, key, value):
self._local_cache[key] = value
await self._pool.set(key, value)

async def get(self, key):
if key in self._local_cache:
return self._local_cache[key]
value = await self._pool.get(key)
if value is not None:
self._local_cache[key] = value
return value

To check how it works, first set 10 values:

for i in {1..10}
do
docker exec redis redis-cli set key_${i} value_${i}
done

In a separate terminal session run a Redis monitor:

docker exec -it redis redis-cli monitor

Then run the script:

async def main():
client = await ClientSideCache("localhost")
# getting values and store it in local cache
for i in range(1, 11):
print(await client.get(f"key_{i}"))

input("Stop! Press enter to continue")
# second block
for i in range(1, 11):
print(await client.get(f"key_{i}"))

While Python waits for your input (input), change a few values in Redis. Set a new value for key_5 and set an expiration value for key_6 :

> docker exec -it redis redis-cli set key_5 new
OK
> docker exec -it redis redis-cli expire key_6 1
(integer) 1

Return to your script and press enter to let program finish its execution. After that you will see the following output in your Redis monitor

> docker exec -it redis redis-cli monitor
OK
"CLIENT" "ID"
"CLIENT" "TRACKING" "on" "REDIRECT" "1361" "BCAST"
"SUBSCRIBE" "__redis__:invalidate"
"GET" "key_1"
"GET" "key_2"
"GET" "key_3"
"GET" "key_4"
"GET" "key_5"
"GET" "key_6"
"GET" "key_7"
"GET" "key_8"
"GET" "key_9"
"GET" "key_10"
# here we did manual commands to change 2 keys
"set" "key_5" "new"
"expire" "key_6" "10"
# here we pressed enter for our script
"GET" "key_5"
"GET" "key_6"

You see that only 2 keys were requested from Redis. That’s because these keys were invalidated. Others keys were requested from your in-memory storage.

Other variables to consider

There are a few problems that we don’t cover with a simple implementation:

  • handle disconnects for the connection used for invalidation messages
  • we should not use local cache while connection to Redis are broken
  • in our example we set the key value using redis-cli. If we try to set a key value with our wrapper we’ll get an invalidation message and invalidate (remove) that key immediately.
  • we don’t limit the size of in-memory storage

Cashews to the rescue!

To solve these problems and much more I wrote simple-to-use library - cashews , a wrapper for cache with multi-backend options that has full support for client-side cache feature:

from cashews import cachecache.setup("redis://localhost", client_side=True)

# use a decorator-based API
@cache(ttl="3h", key="user:{request.user.uid}")
async def long_running_function(request):
...
await cache.set("key", 1.12)
await cache.get("key")

Cashews — https://github.com/Krukov/cashews

--

--