How to create Redis Pub/Sub

Saurabh Singh
4 min readNov 13, 2019

--

Publish-Subscribe is a messaging pattern where senders of the messages, called publishers and receiver of messages called subscribers.

But messages are not sent directly by publisher to subscriber(s), rather publisher just characterized into channels without the knowledge of any subscriber. And subscriber can register to one or more channels to listen to those messages of their interest. Subscriber can unsubscribe from these channels anytime.

In Redis, N number of publishers can publish messages to a particular channel and M number of subscribers can subscribe to receive the published messages.

Single subscriber can also subscribe to receive messages from multiple channels.

I implemented this pattern in C#/.Net Core using StackExchange package. In our application we maintain the user’s detail in memory cache, but every time the user information is updated, we need to update the memory cache in all load balanced instances. So, every time we update a user, we publish this change to redis channel and all participating application subscribe to this event and update their respective memory cache.

Register Long Running Task

We need to start a long running task that will subscribe and listen to messages published to Redis channels.

One way of doing it is to start that task in Startup.cs. I created and called this function from public void Configure(IApplicationBuilder app, IHostingEnvironment env)

// create message subscribersvar messageSubscribersCancellation = new CancellationTokenSource();List<Task> subscriberTasks = null;subscriberTasks = app.CreateMessageSubscribers(messageSubscribersCancellation);public static List<Task> CreateMessageSubscribers(CancellationTokenSource cancellation)
{
List<Task> tasks = new List<Task>
{
Task.Factory.StartNew(async () =>
{
var userChangeSubscriber = app.ApplicationServices.GetService<UserChangeManager>() as UserChangeManager;
await userChangeSubscriber.Start(cancellation);
}, cancellation.Token, TaskCreationOptions.LongRunning, TaskScheduler.Default).Unwrap()
};
return tasks;
}

UserChangeManager Constructor

public UserChangeManager()
{
_redis = ConnectionMultiplexer.Connect(REDIS_IP);
InitUserChangeSubscriber();
}

Publisher

Use this method to publish messages to Redis channels. It accepts 3 parameters:

  1. RedisKey
  2. Object/Message
  3. Dispatch mode — depending on business need it can accept 8 different options. You can find more details in Redis official documents. I used FireAndForget, because it is not business critical.
private async Task PublishUserChange(string userChangeMessage)
{
try
{
if (userChangeMessage == null) return;
var sub = _redis.GetSubscriber();
await sub.PublishAsync("urn:redissubscriber:userchange", userChangeMessage, CommandFlags.FireAndForget);
}
catch (Exception ex)
{
Log.ApplicationError(ex);
}
}

Subscriber

Register the subscriber handler by calling InitUserChangeSubscriber in constructor. Please check the constructor above.

private void InitUserChangeSubscriber()
{
var subscriber = _redis.GetSubscriber();
subscriber.Subscribe(RedisCacheSubscriberUserChange, async (channel, message) =>
{
if (string.IsNullOrEmpty(message)) return;
try
{
var eventMessage = JsonConvert.DeserializeObject<UserChangeMessage>(message);
if (eventMessage == null) return;
await _commonPlayerManager.RefreshUserCache(eventMessage);
}
catch (Exception ex)
{
Log.ApplicationError(ex);
}
});
}

This is very simple and basic implementation of Redis Pub/Sub pattern in .Net. This is just for reference but it can be further customized. In my project I used the combination of RabbitMQ and Redis Pub/Sub to maintain/refresh memory cache in load balanced multiple instances of the same service and also in difference services.

I will explain that in another article. The design is very sophisticated and efficient in micro-service architecture.

Cheers.

--

--