Working with Redis Streams in Python-Basic

Anirudh Pratap Singh
Subex AI Labs
Published in
5 min readAug 8, 2023

Optional:

What is redis? Follow this link and get started with some python basics on different types of Redis datatypes.

Redis Streaming data
Just an image for data storage :)

A Redis database has many data-types to store information in. A few of these include strings, lists, hashes, streams, etc. In this article we will spend some considerable time with Redis streams.

Setting-up Redis

We will use Docker for our case to setup Redis server and database.

# Runs a docker container with lightweight redis 
$ docker run -d -p 6005:6379 --name your_redis_name redis:alpine3.17 redis-server --bind 0.0.0.0

# This gets us inside the container where we can connect via CLI
$ docker exec -it your_redis_name redis-cli

The first code snippet above spins off a Redis container with the image redis:alpine3.17. The image being used is lightweight, feel free to use any image of your choosing. The second code snippet is to get into the Redis container and get into the command line interface(CLI) to communicate with the Redis database

Why Use Redis Stream

Redis stream is a unique data type that is used to tackle the deadlock situation. The deadlock situation arises when two or more transactions access the database at the same time. This can happen when multiple process are producing data and want to store in the same database at the same time instance. To circumvent this problem Redis streams comes to the rescue. The Redis stream gives a unique ID to each entry of the stream (more will be talked about later).

For example, suppose there are multiple weather data sensors that access the same database to dump their data into the same key at the same time. In this situation a deadlock occurs and this may lead to data loss. The same example of temperature and humidity sensors will be used to simulate weather in Bangalore.

Working With Redis Streams

Having setup the Redis database an object needs to be created to work with it in a python script. For this the following command needs to be used to make an object.

import redis

# Note the port 6005 has been arbitrarily chosen. Default port is 6379.
# Can use any port number

redisCli = redis.Redis(host=host_ip, port=6005)

For demonstration we will add some fake data for weather in Bangalore. Data to be added to Redis stream is done with XADD command.

# Example of adding data using Redis CLI

[In] 127.0.0.1:6379> XADD weather_blr * temperature 24 humidity 62

The ‘*’ after the stream name refers to unique ID generation from Redis. One can give own ID but, it is recommended to let Redis do it. The same data can be added to the stream using python as follows(used loop to add mutiple data in same milli-second).

# Simulating a multiple producers

[In]for _ in range(10):
data = {"temperature":random.randint(20,25),
"humidity": random.randint(60,80)}
redisCli.xadd("weather_blr",data)

This adds stream key “weather_blr” to the Redis database. “temperature” and “humidity” are the fields for each of the stream.

How to consume the data added to the stream

Moving forward two code snippets will be given, one for Redis CLI and the other for python command. The output will be common for both.

XLEN (returns the length of the stream)

[In]127.0.0.1:6379> XLEN weather_blr
[Out](integer) 10
[In]print(redisCli.xlen("weather_blr"))
[Out] 10

XRANGE (query the stream data)

[In] 127.0.0.1:6379> XRANGE weather_blr - + COUNT 2
[Out] 1) 1) "1690179048588-0"
2) 1) "temperature"
2) "24"
3) "humidity"
4) "65"
2) 1) "1690179048588-1"
2) 1) "temperature"
2) "20"
3) "humidity"
4) "64"
[In] print(redisCli.xrange("weather_blr", max="+", min="-", count=2))
[Out] [(b'1690179048588-0', {b'temperature': b'24', b'humidity': b'65'}), (b'1690179048588-1', {b'temperature': b'20', b'humidity': b'64'})]

The First ID “1690179048588–0” is the unique ID generated for each entry of the stream. The part before the hyphen is the timestamp in millisecond. It can be seen that there are two entries with same millisecond stamp. The number after hyphen is the serial ID which can be 64bit number, practically making it impossible to have deadlock situation. The ‘b’ before each entry in the output refers to bit storage type in Redis database.

The ‘-’ and ‘+’ denotes the smallest and largest ID in the “weather_blr” stream. The COUNT gives number of entries to show in the stream starting from the first one.

XREVRANGE(query the data in reverse)

Queries the data in the reverse order. This is very useful especially in the case of accessing the last item in the stream.

[In] 127.0.0.1:6379> XREVRANGE weather_blr + - COUNT 1
[Out] 1) 1) "1690179048589-6"
2) 1) "temperature"
2) "25"
3) "humidity"
4) "68"
[In] print(redisCli.xrevrange("weather_blr", max="+", min="-", count=1))
[Out] [(b'1690179048589-6', {b'temperature': b'25', b'humidity': b'68'})]

The command is very similar to XRANGE and carry the same meaning except in reverse order.

XREAD(consuming the streaming data)

This is the best way to consume streaming data. This is extremely useful when producer continuously produces data. To read the latest entry into the stream the following command needs to be written.

[In] 127.0.0.1:6379> XREAD COUNT 2 BLOCK 5000 STREAMS weather_blr $
[Out](nil)
(5.06s)
[In] print(redisCli.xread({"weather_blr":'$'}, count = None, block = 5000))
[Out] []

The XREAD command reads the streaming data.

The parameter COUNT reads that many entries that come in the same millisecond after the command is executed. If only 1 entry comes from the producer side then it reads the single entry. If multiple entries do come

If the producer doesn’t produce information in the next millisecond after the command goes then it waits for 5s for the stream to be updated using the BLOCK parameter. Since, in our case the producer wasn’t working thus, it shows nil output.

The ‘$’ sign after the stream name is to indicate after which timestamp does the consumer needs to get the information.

Working with the gathered data

This small section will help with working with the gathered data from Redis database

[In] data =redisCli.xrevrange("weather_blr", max="+", min="-", count=1))

# To get the value for humidity the following code needs to be given
[In] print(data[0][1][b'humidity']
[Out] b'68'

The ‘b’ refers to the bit encoding of the data that is stored in Redis Database. To convert it for normal usage we want to decode it. The stream data is stored as a list. The first element of the list contains the unique stream ID while the second contains key value pair of data stored. We need to decode this for our usage.

# Without decoding the data
[In] print(data[0][1])
[Out] {b'temperature': b'25', b'humidity': b'68'}

# After decoding the data
[In] data_decoded = {key.decode(): value.decode() for key, value in data[0][1].items()}
[In] print(data_decoded)
[Out] {'temperature': '25', 'humidity': '68'}

Now, one can use the data in normal way without need of using ‘b’ as prefix.

To know more about complex features used in Redis stream watch the informative video from the Redis creator Salvatore Sanfilippo by following the link.

--

--