Caching Distributed Work Output in Redis with ASP.NET Core

Choosing an appropriate persistence medium for caching interim work output in distributed systems built on .NET Core has come up a number of times in the past few weeks.

Let’s say you’ve built a large distributed system. A part of this system is comprised of microservices that operate on an event stream. Your system is reactive and modeled after some common best practices associated with Event Sourcing (ES) and Command Query Responsibility Segregation (CQRS).

You’ve got a service that handles converting incoming commands to events that go on a queue. Then, workers respond to the hundreds of thousands of incoming events and perform some unit of work per event. This work could be the processing of data, the calculation of financial tables, the execution of stock trades, or any other unit of work that applies to your business domain.

If your domain is the same as my favorite domain, then your microservices are taking in reports/sightings of zombies and performing some “big data” analytics and predictive modeling to determine who to notify in the field about herd movements and potential destinations. While processing all these events, the zombie sighting service keeps a cache of all current zombie locations based on the most recent sighting.

Assuming your business domain is not the zombie apocalypse, then at the end of your large distributed work process, you need to go around and gather up all of the interim work produced by the worker microservices and do some cleanup or post-processing. I realize that there are other solutions available to do map-reduce work and stream processing, and this blog post isn’t making any statements about when those may or may not be appropriate solutions instead of Redis. I had a need to use Redis and felt like others might be interested in what I found.

On to the nitty gritty details — We’ve got a class called Workoutputthat we want to serialize to our cache. As each individual worker finishes up, we don’t want to have to deal with optimistic, pessimistic locking, or the potential for “last write wins” data clobbering. We don’t want to have to attempt to perform any kind of remote or distributed transaction. In short, we need to isolate the work output so that the workers can maintain that data safely and on their own schedule.

This is an ideal place to use Redis’s hashes. We can create a hash that corresponds to a Job ID (a Job ID might represent a single run of distributed work containing n units of work), and then have each field within that hash correspond to some unique identifier for the unit of work being done. If we’ve designed our event sourcing system properly, then the information to constitute a unique ID for work output should be entirely contained within the message/event delivered to the worker. I’ve seen this key called an Event ID or more generic terms like Dispatch ID as well as domain-specific terms like PolicyID or ZombieSightingID, etc.

We can create a RedisWorkoutputCacheclass that wraps our access to Redis in a nicely testable fashion:

using System;
using System.Collections.Generic;
using Microsoft.Extensions.Logging;
using StackExchange.Redis;
namespace Samples.Redis
{
public class RedisWorkoutputCache : IWorkoutputCache
{
private ILogger logger;
private IConnectionMultiplexer connection;
    public RedisWorkoutputCache(
ILogger<RedisLocationCache> logger,
IConnectionMultiplexer connectionMultiplexer)
{
this.logger = logger;
this.connection = connectionMultiplexer;
}
    // DI Hack - shouldn't always need this
public RedisLocationCache(
ILogger<RedisWorkoutputCache> logger,
ConnectionMultiplexer connectionMultiplexer) : this(logger,
(IConnectionMultiplexer)connectionMultiplexer) {}
   public IList<Workoutput> GetAllWorkoutput(Guid jobId)
{
IDatabase db = connection.GetDatabase();
RedisValue[] vals = db.HashValues(jobId.ToString());

return ConvertRedisValsToWorkputputList(vals);
}
   public void Put(Guid jobId, Workoutput workOutput)
{
IDatabase db = connection.GetDatabase();

db.HashSet(jobId.ToString(),
workOutput.DispatchID.ToString(),
workOutput.ToJsonString());
}
    private IList<MemberLocation> 
ConvertRedisValsToWorkoutputList(RedisValue[] vals)
{
List<Workputput> workOutputs =
new List<Workoutput>();
       for (int x=0; x<vals.Length; x++) {
string val = (string)vals[x];
Workoutput wo = Workoutput.FromJsonString(val);
workOutputs.Add(ml);
}
       return workOutputs;
}
}
}

You may have noticed by looking at this code that we could further generalize it to work on any type of object that can be round-tripped through a JSON serialization with no data loss. That’s a good optimization to make after all our tests pass, but isn’t really relevant for this blog post.

The crux of the code is this:

  • Create a Redis hash for every Job ID
  • Create a Redis field within the hash for each Dispatch/Work Item/Event ID
  • This field contains a JSON serialization of the work output
  • Use the HashValues method to get a list of all work items within a parent job.

To get access to our Redis instance, we can make use of a Steeltoe connector to detect the presence of a Redis service binding in Cloud Foundry or on our workstation. If it finds one, it’ll create a connection for us and make it available through dependency injection. Unfortunately, at the moment, the Steeltoe connector only registers the concrete, sealed connection multiplexer. This means if our class is built against the interface IConnectionMultiplexer(which it should for testing purposes), we have to add a second constructor to support the right constructor injection parameter.

Adding the connection multiplexer to DI as is simple as using theAddCloudFoundry method in our Startup class constructor and theAddRedisConnectionMultiplexermethod in the ConfigureServices method.

To get this working locally, you just need to set up an appsettings.json file with JSON in it that matches what you would see in the VCAP_SERVICESenvironment variable for a Redis binding (and use IP addresses, not host names). To test this code, my file looked like this:

{
"vcap:services" : {
"rediscloud" : [
{
"name" : "redislocationcache",
"label" : "rediscloud",
"plan" : "30mb",
"credentials": {
"port": "6379",
"hostname": "127.0.0.1",
"password": ""
}
}
]
}

You should have the following dependencies in your csprojfile (using Preview3 of the .NET Core CLI) to work with the Steeltoe connector and the StackExchange Redis client that comes with it — you’ll get the dependency on the StackExchange Redis client transitively.

<PackageReference 
Include="Steeltoe.Extensions.Configuration.CloudFoundry">
<Version>1.0.0-rc2</Version>
</PackageReference>
<PackageReference
Include="Steeltoe.CloudFoundry.Connector.Redis">
<Version>1.0.0-rc2</Version>
</PackageReference>

While the use of Redis as a cache for interim work output storage is a largely architectural concern and not usually something considered difficult from a code perspective, I felt like I should share some of this code because of how new everything is with .NET Core.

The code in this article shows us a couple of things:

  • We can use a well-known Redis client in our ASP.NET Core apps (Stack Exchange)
  • We can use Steeltoe connectors in ASP.NET Core on Linux/Mac/Windows to pre-configure and instantiate a Redis client
  • We can use more advanced Redis concepts on the connection multiplexer like hashes in addition to the simple key-value pair pattern available on the distributed cache interface.

Hopefully you were searching for “how do I cache work output in Redis in ASP.NET Core??? HELP ME” and this article showed up just in time.

[Shameless self-promotion alert]: If you’re looking for more code and tips like this on building Microservices with ASP.NET Core, then make sure you pre-order my upcoming book, appropriately titled Building Microservices with ASP.NET Core.