How to write a (completely lock-free) concurrent LRU Cache with ZIO STM

jorge.vasquez
Scalac
Published in
21 min readApr 27, 2020

Introduction

Writing concurrent data structures using traditional tools — just like everything else under java.util.concurrent — is generally very complicated. You need to really think about what you are doing to avoid typical concurrency issues, such as deadlocks and race conditions. And let’s be honest, thinking about all the possible scenarios that could arise is not just hard, but also sometimes infeasible.

So, in this article, we are going to see how ZIO STM can make our lives a lot easier when it comes to writing concurrent data structures — such as a concurrent LRU Cache — in a completely lock-free fashion that is a lot simpler to reason about.

Requirements of an LRU Cache

A cache is a structure that stores data (which might be the result of an earlier computation or obtained from external sources such as databases) so that future requests for this data can be served faster.

Now, a Least Recently Used (LRU) Cache must fulfill these requirements:

  • Fixed capacity: This is for limiting memory usage.
  • Fast access: Insert and lookup operations should be executed in O(1) time.
  • An efficient eviction algorithm: The idea is that, when the cache capacity is reached and a new entry needs to be inserted into the cache, the Least Recently Used entry gets replaced.

More concretely, an LRU cache should support these two operations:

  • get(key): Get the value of a given key if it exists in the cache, otherwise return an error.
  • put(key, value): Put the given (key, value) into the cache. When the cache reaches its capacity, it should evict the Least Recently Used entry before inserting a new one.

So, for implementing an efficient LRU Cache (meaning get and put operations are executed in O(1) time), we could use two data structures:

  • Hash Map: containing (key, value) pairs.
  • Doubly linked list: which will contain the history of referenced keys. The Most Recently Used key will be at the start of the list, and the Least Recently Used one will be at the end.

In the following image, we can see an example of the status of a LRU Cache (with capacity of 4) at a given moment:

So, the history of referenced keys (1, 3, 2 and 4) shows that Key 1 is the Most Recently Used one (because it’s at the start of the list), and that Key 4 is the Least Recently Used one (because it’s at the end of the list). So, if a new item needs to be stored into the cache, Item 4 would have to be replaced.

Quick introduction to ZIO

According to the ZIO documentation page, ZIO is a library for “Type-safe, composable asynchronous and concurrent programming for Scala”. This means ZIO allows us to build applications that are:

  • Highly composable: Because ZIO is based on functional programming principles, such as using pure functions and immutable values, it allows us to easily compose solutions to complex problems from simple building blocks.
  • 100% asynchronous and non-blocking.
  • Highly performant and concurrent: ZIO implements Fiber-based concurrency, and by the way, you can read more about ZIO Fibers in this really nice article written by Mateusz Sokół here.
  • Type-safe: ZIO leverages the Scala Type System so it can catch more bugs at compile time.

The most important data type in ZIO (and also the basic building block of ZIO applications) is also called ZIO:

ZIO[-R, +E, +A]

The ZIO data type is called a functional effect, which means it is a lazy, immutable value that contains a description of a series of interactions with the outside world (database interactions, calling external APIs, etc.). A nice mental model of the ZIO data type is the following:

R => Either[E, A]

This means that a ZIO effect needs an environment of type R to run (the environment could be anything: a database connection, a REST client, a configuration object, etc.), and it can either fail with an error of type Eor succeed with a value of type A.

Finally, it’s worth mentioning that ZIO provides some type aliases for the ZIO effect type which are very useful in representing some common use cases:

Task[+A] = ZIO[Any,Throwable,A]:This means a Task[A]is a ZIO effect that:

  • Doesn’t require an environment to run (that’s why the R type is replaced by Any, meaning the effect will run no matter what we provide to it as environment)
  • Can fail with a Throwable
  • Can succeed with an A

UIO[+A] = ZIO[Any,Throwable,A]: This means a UIO[A] is a ZIOeffect that:

  • Doesn’t require an environment to run.
  • Can’t fail
  • Can succeed with an A

RIO[-R,+A] = ZIO[Any,Throwable,A]: This means a RIO[R,A]is a ZIOeffect that:

  • Requires an environment Rto run
  • Can fail with a Throwable
  • Can succeed with an A

IO[+E,A+] = ZIO[Any,Throwable,A]: This means a IO[E,A]is a ZIOeffect that:

  • Doesn’t require an environment to run.
  • Can fail with an E
  • Can succeed with an A

URIO[-R, +A] = ZIO[R, Nothing, A]: This means a URIO[R, A] is a ZIO effect that:

  • Requires an environment R to run
  • Can’t fail
  • Can succeed with an A

Implementing the LRU Cache with ZIO Ref

First, we need to add some ZIO dependencies to our build.sbt:

val zioVersion = “1.0.0-RC18–2”lazy val compileDependencies = Seq(“dev.zio” %% “zio” % zioVersion) map (_ % Compile)lazy val testDependencies = Seq(“dev.zio” %% “zio-test” % zioVersion,“dev.zio” %% “zio-test-sbt” % zioVersion) map (_ % Test)

Now, we can begin with the implementation of the LRUCache. An initial model could be:

final class LRUCache[K, V](private val capacity: Int,private var items: Map[K, CacheItem[K, V]],private var start: Option[K],private var end: Option[K])

So, the LRUCacheshould have:

  • A capacity (which should be a positive integer set on creation and shouldn’t change anymore, which is why it’s modeled as a val).
  • A Map containing items, this will change all the time, which is why we are modeling this as a varv. By the way, the model of a CacheItem would be like this:
final case class CacheItem[K, V](value: V, left: Option[K], right: Option[K])

This means that each CacheItem should not just contain a value to be stored, but also references to the left and right keys in the history of referenced keys (remember we’ll use a doubly linked list for keeping a history of referenced keys). These are modeled as Options because, if an item is at the start of the history (meaning it’s the Most Recently Used item), there won’t be any item on its left. Something similar happens when an item is at the end of the history (meaning it’s the Least Recently Used item), there won’t be any item on its right.

  • References to the start and end keys, these will also change all the time, and that’s why they are vars.

There’s a problem with this implementation though: the fact we are resorting to vars. In functional programming, we should model everything as immutable values, and also using varswill make it harder to use the LRUCache in concurrent scenarios (using mutability in our applications instantly makes them prone to race conditions!).

So, what can we do? Well, ZIO has the answer! We can use its Ref[A]data type, which is a purely functional description of a mutable reference. The fundamental operations of a Ref are get and set, and both of them return ZIO effects which describe the operations of reading from and writing to the Ref.

Then, a better (and purely functional) version of our LRUCache would be:

final class LRUCache[K, V](private val capacity: Int,private val itemsRef: Ref[Map[K, CacheItem[K, V]]],private val startRef: Ref[Option[K]],private val endRef: Ref[Option[K]])

Now, we can make the constructor private, and create a smart constructor in the companion object:

final class LRUCache[K, V] private (private val capacity: Int,private val itemsRef: Ref[Map[K, CacheItem[K, V]]],private val startRef: Ref[Option[K]],private val endRef: Ref[Option[K]])object LRUCache {def make[K, V](capacity: Int): IO[IllegalArgumentException, LRUCache[K, V]] =if (capacity > 0) {for {itemsRef <- Ref.make(Map.empty[K, CacheItem[K, V]])startRef <- Ref.make(Option.empty[K])endRef <- Ref.make(Option.empty[K])} yield new LRUCache[K, V](capacity, itemsRef, startRef, endRef)} else {ZIO.fail(new IllegalArgumentException(“Capacity must be a positive number!”))}}

The make function is our smart constructor, and we can see it expects to receive a capacity, and it returns an effect which can fail with an IllegalArgumentException (when a non-positive capacity is provided) or can succeed with an LRUCache[K,V].We also know that the LRUCacheconstructor expects to receive not just the capacity, but also the initial values for itemsRef, startRef and endReg. For creating these Refs, we can use the Ref.make function, which receives the initial value for the Ref and returns a UIO[Ref[A]]. And because ZIO effects are monads (meaning they have map and flatMap methods), we can combine the results of calling Ref.make using for-comprehension syntax, for yielding a new RUCache.

Now, we can implement the get and put methods for the RUCache. Let’s start with the get method first:

def get(key: K): IO[NoSuchElementException, V] =(for {items <- self.itemsRef.getitem <- ZIO.fromOption(items.get(key)).mapError(_ => new NoSuchElementException(s”Key does not exist: $key”))_ <- removeKeyFromList(key) *> addKeyToStartOfList(key)} yield item.value).refineToOrDie[NoSuchElementException]

As you can see, the method implementation looks really nice and simple: it’s practically just a description of what to do when getting an element from the cache:

  • Firstly, we need to get the items Map from the itemsRef
  • Next, we need to obtain the requested key from the items map. This key may or not exist, if it does exist the flow just continues and, if it doesn’t, the method fails with a NoSuchElementExceptionand the flow execution stops.
  • After the item is obtained from the Map, we need to update the history of referenced keys, because the requested key becomes the Most Recently Used one. That’s why we need to call the auxiliary functions removeKeyFromList and addKeyToStartOfList
  • Finally, the item value is returned, and the error type is refined to be just NoSuchElementException(this is the only error we are expecting to happen and that should be handled when calling get). Any other errors should make the fiber execution die because they are bugs that need to be exposed at execution time and fixed.

Now, let’s see the put method implementation. Again it’s really simple:

def put(key: K, value: V): UIO[Unit] =(for {optionStart <- self.startRef.getoptionEnd <- self.endRef.get_ <- ZIO.ifM(self.itemsRef.get.map(_.contains(key)))(updateItem(key, value),addNewItem(key, value, optionStart, optionEnd))} yield ()).orDie

We can see that:

  • The method checks whether the provided key is already present in the items map (again, we are accessing the Map calling the getmethod on itemsRef):
  • If the keyis already present, the updateItem auxiliary function is called.
  • Otherwise, a new item is added, by calling the addNewItemauxiliary function.
  • Finally, the method just yields a unit value and dies in the case of an error. This is because this method should never fail, otherwise there’s a bug that needs to be exposed at runtime and fixed.

Now we can take a look at some auxiliary functions (we won’t go into the details of every auxiliary function, for more details you can take a look at the complete source code in the jorge-vasquez-2301/zio-lru-cache repository). First up, we have the removeKeyFromList function:

private def removeKeyFromList(key: K): IO[Error, Unit] =for {cacheItem <- getExistingCacheItem(key)optionLeftKey = cacheItem.leftoptionRightKey = cacheItem.right_ <- (optionLeftKey, optionRightKey) match {case (Some(l), Some(r)) =>updateLeftAndRightCacheItems(l, r)case (Some(l), None) =>setNewEnd(l)case (None, Some(r)) =>setNewStart(r)case (None, None) =>clearStartAndEnd}} yield ()

As you can see, the implementation is pretty straightforward, and it considers all the possible cases when removing a key from the history of referenced keys:

  • When the key to be removed has other keys to its left and right, the corresponding cache items have to be updated so they point to each other.
  • When the key to be removed has another key to its left, but not to its right, it means the key to be removed is at the end of the list, so the end has to be updated.
  • When the key to be removed has another key to its right, but not to its left, it means the key to be removed is at the start of the list, so the start has to be updated.
  • When the key to be removed has no keys to left nor right, that means the key to be removed is the only one, so the start and end references have to be cleared.

And here is the getExistingCacheItem function implementation:

private def getExistingCacheItem(key: K): IO[Error, CacheItem[K, V]] =ZIO.require(new Error(s”Key does not exist: $key”))(self.itemsRef.get.map(_.get(key)))

This function is named this way because the idea is that when we use it, we expect that the cache item we want to get exists. If the item does not exist, it means there’s some kind of problem, and we are signaling that with an Error. (By the way, if you look again at the get and putmethods of LRUCache, you can see the application will die if an Erroris produced).

Another interesting function to look at is updateLeftAndRightItems, because it shows a use case of ZIO Ref.update, which atomically modifies an Ref with the specified function.

private def updateLeftAndRightCacheItems(l: K, r: K): IO[Error, Unit] =for {leftCacheItem <- getExistingCacheItem(l)rightCacheItem <- getExistingCacheItem(r)_ <- self.itemsRef.update(_.updated(l, leftCacheItem.copy(right = Some(r))))_ <- self.itemsRef.update(_.updated(r, rightCacheItem.copy(left = Some(l))))} yield ()

Finally, let’s take a look at the addKeyToStartOfListfunction, which is also pretty straightforward. Something to notice is that we are using Ref.updateSome for updating the endRef value, when it’s empty.

private def addKeyToStartOfList(key: K): IO[Error, Unit] =for {oldOptionStart <- self.startRef.get_ <- getExistingCacheItem(key).flatMap { cacheItem =>self.itemsRef.update(_.updated(key, cacheItem.copy(left = None, right = oldOptionStart)))}_ <- oldOptionStart match {case Some(oldStart) =>getExistingCacheItem(oldStart).flatMap { oldStartCacheItem =>self.itemsRef.update(_.updated(oldStart, oldStartCacheItem.copy(left = Some(key))))}case None => ZIO.unit}_ <- self.startRef.set(Some(key))_ <- self.endRef.updateSome { case None => Some(key) }} yield ()

There is one more thing to do before testing: create an IntLRUCacheEnv module in the com.example.cache package object (this module will be used for testing, and for simplicity, it considers integer keys and values):

package object cache {type IntLRUCacheEnv = Has[IntLRUCacheEnv.Service]object IntLRUCacheEnv {trait Service {def getInt(key: Int): IO[NoSuchElementException, Int]def putInt(key: Int, value: Int): UIO[Unit]val getCacheStatus: UIO[(Map[Int, CacheItem[Int, Int]], Option[Int], Option[Int])]}object Service {val zioRefImpl: ZLayer[Has[Int], IllegalArgumentException, IntLRUCacheEnv] =ZLayer.fromFunctionM { hasInt: Has[Int] =>LRUCache.make[Int, Int](hasInt.get).map { lruCache =>new Service {override def getInt(key: Int): IO[NoSuchElementException, Int] = lruCache.get(key)override def putInt(key: Int, value: Int): UIO[Unit] = lruCache.put(key, value)override val getCacheStatus: UIO[(Map[Int, CacheItem[Int, Int]], Option[Int], Option[Int])] =lruCache.getStatus}}}}}def getInt(key: Int): ZIO[IntLRUCacheEnv, NoSuchElementException, Int] = ZIO.accessM(_.get.getInt(key))def putInt(key: Int, value: Int): ZIO[IntLRUCacheEnv, Nothing, Unit]= ZIO.accessM(_.get.putInt(key, value))val getCacheStatus: ZIO[IntLRUCacheEnv, Nothing, (Map[Int, CacheItem[Int, Int]], Option[Int], Option[Int])] =ZIO.accessM(_.get.getCacheStatus)}

By the way, this module is defined based on the new ZLayer data type that comes with ZIO since 1.0.0-RC18, and you can see there’s a zioRefImpl that makes use of our LRUCache. I won’t go into the details of using Zlayerhere, but you can read the ZIO documentation page to get more information, and you can also take a look at these really nice articles:

Testing implementation with a single fiber

It’s time to put our LRUCacheto test! Firstly, we are going to test it under a single-fiber scenario, the testing code is the following (by the way, this testing code reflects the example shown on this link):

object UseLRUCacheWithOneFiber extends App {def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =(for {_ <- put(1, 1)_ <- put(2, 2)_ <- get(1)_ <- put(3, 3)_ <- get(2)_ <- put(4, 4)_ <- get(1)_ <- get(3)_ <- get(4)} yield 0).provideCustomLayer(ZLayer.succeed(2) >>> IntLRUCacheEnv.Service.zioRefImpl).catchAll(ex => putStrLn(ex.getMessage) *> ZIO.succeed(1))private def get(key: Int): URIO[Console with IntLRUCacheEnv, Unit] =(for {_ <- putStrLn(s”Getting key: $key”)v <- getInt(key)_ <- putStrLn(s”Obtained value: $v”)} yield ()).catchAll(ex => putStrLn(ex.getMessage))private def put(key: Int, value: Int): URIO[Console with IntLRUCacheEnv, Unit] =putStrLn(s”Putting ($key, $value)”) *> putInt(key, value)}

So, we are running the application with an IntLRUCacheENV.Service.zioTefImpl, with a capacity of 2. After executing the above program, the following result is obtained:

Putting (1, 1)Putting (2, 2)Getting key: 1Obtained value: 1Putting (3, 3)Getting key: 2Key does not exist: 2Putting (4, 4)Getting key: 1Key does not exist: 1Getting key: 3Obtained value: 3Getting key: 4Obtained value: 4

As we can see, the behavior is correct! So, our implementation looks good so far.

Testing implementation with multiple concurrent fibers

Now, let’s test the LRUCacheagain, but against multiple concurrent fibers this time. The testing code is the following:

object UseLRUCacheWithMultipleFibers extends App {def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =(for {fiberReporter <- reporter.forever.forkfiberProducers <- ZIO.forkAll(ZIO.replicate(100)(producer.forever))fiberConsumers <- ZIO.forkAll(ZIO.replicate(100)(consumer.forever))_ <- getStrLn.orDie *> (fiberReporter <*> fiberProducers <*> fiberConsumers).interrupt} yield 0).provideCustomLayer(ZLayer.succeed(3) >>> IntLRUCacheEnv.Service.zioRefImpl).catchAll(ex => putStrLn(ex.getMessage) *> ZIO.succeed(1))val producer: URIO[Console with Random with IntLRUCacheEnv, Unit] =for {number <- nextInt(100)_ <- putStrLn(s”Producing ($number, $number)”)_ <- putInt(number, number)} yield ()val consumer: URIO[Console with Random with IntLRUCacheEnv, Unit] =(for {key <- nextInt(100)_ <- putStrLn(s”Consuming key: $key”)value <- getInt(key)_ <- putStrLn(s”Consumed value: $value”)} yield ()).catchAll(ex => putStrLn(ex.getMessage))val reporter: ZIO[Console with IntLRUCacheEnv, NoSuchElementException, Unit] =for {(items, optionStart, optionEnd) <- getCacheStatus_ <- putStrLn(s”Items: $items, Start: $optionStart, End: $optionEnd”)} yield ()}

We can see that an IntLRUCacheENV.Service.zioTefImplwith a capacity of 3 is provided. Also, 100 producers and 100 consumers of random integers are started in different fibers, and we have a reporter that will just print to the console the cache current status (stored items, start and end keys of the recently used items history). When we execute this, some ugly stuff happens:

  • First, more items than the defined capacity (a lot more) are being stored! And also, stored items have a lot of inconsistencies: for example you can see below that, for a given moment, the end key (the Least Recently Used key) is 97, but looking at the corresponding CacheItem we see it has other keys to its left and right (58 and 9 respectively), but… if 97 is at the end of the list, it shouldn’t have an item to its right!,Besides this, there are a lot more discrepancies among CacheItems:
Items: HashMap(5 -> CacheItem(5,Some(45),Some(6)), 84 -> CacheItem(84,Some(51),Some(91)), 69 -> CacheItem(69,Some(83),Some(36)), 0 -> CacheItem(0,None,Some(37)), 88 -> CacheItem(88,Some(82),Some(94)), 10 -> CacheItem(10,Some(37),Some(45)), 56 -> CacheItem(56,Some(54),Some(42)), 42 -> CacheItem(42,Some(6),Some(60)), 24 -> CacheItem(24,Some(30),Some(18)), 37 -> CacheItem(37,Some(0),Some(10)), 52 -> CacheItem(52,Some(70),Some(91)), 14 -> CacheItem(14,Some(72),Some(1)), 20 -> CacheItem(20,None,Some(46)), 46 -> CacheItem(46,Some(28),Some(70)), 93 -> CacheItem(93,Some(40),Some(6)), 57 -> CacheItem(57,Some(12),Some(45)), 78 -> CacheItem(78,None,Some(41)), 61 -> CacheItem(61,None,Some(26)), 1 -> CacheItem(1,Some(14),Some(2)), 74 -> CacheItem(74,None,Some(33)), 6 -> CacheItem(6,Some(5),Some(42)), 60 -> CacheItem(60,Some(42),Some(80)), 85 -> CacheItem(85,None,Some(99)), 70 -> CacheItem(70,Some(46),Some(52)), 21 -> CacheItem(21,None,Some(65)), 33 -> CacheItem(33,Some(77),Some(32)), 28 -> CacheItem(28,None,Some(46)), 38 -> CacheItem(38,Some(98),Some(68)), 92 -> CacheItem(92,Some(63),Some(0)), 65 -> CacheItem(65,Some(21),Some(51)), 97 -> CacheItem(97,Some(58),Some(9)), 9 -> CacheItem(9,Some(97),Some(99)), 53 -> CacheItem(53,None,Some(91)), 77 -> CacheItem(77,Some(27),Some(33)), 96 -> CacheItem(96,Some(3),Some(58)), 13 -> CacheItem(13,Some(14),Some(28)), 41 -> CacheItem(41,Some(78),Some(90)), 73 -> CacheItem(73,None,Some(41)), 2 -> CacheItem(2,Some(1),Some(92)), 32 -> CacheItem(32,Some(33),Some(98)), 45 -> CacheItem(45,Some(10),Some(5)), 64 -> CacheItem(64,None,Some(34)), 17 -> CacheItem(17,None,Some(35)), 22 -> CacheItem(22,None,Some(7)), 44 -> CacheItem(44,Some(79),Some(92)), 59 -> CacheItem(59,Some(15),Some(68)), 27 -> CacheItem(27,Some(4),Some(77)), 71 -> CacheItem(71,Some(46),Some(19)), 12 -> CacheItem(12,Some(75),Some(57)), 54 -> CacheItem(54,None,Some(56)), 49 -> CacheItem(49,None,Some(63)), 86 -> CacheItem(86,None,Some(43)), 81 -> CacheItem(81,Some(98),Some(1)), 76 -> CacheItem(76,None,Some(35)), 7 -> CacheItem(7,Some(22),Some(33)), 39 -> CacheItem(39,None,Some(4)), 98 -> CacheItem(98,Some(32),Some(81)), 91 -> CacheItem(91,Some(52),Some(75)), 66 -> CacheItem(66,None,Some(27)), 3 -> CacheItem(3,Some(94),Some(96)), 80 -> CacheItem(80,Some(60),Some(84)), 48 -> CacheItem(48,None,Some(9)), 63 -> CacheItem(63,Some(49),Some(3)), 18 -> CacheItem(18,Some(24),Some(26)), 95 -> CacheItem(95,None,Some(65)), 50 -> CacheItem(50,Some(68),Some(58)), 67 -> CacheItem(67,None,Some(21)), 16 -> CacheItem(16,None,Some(82)), 11 -> CacheItem(11,Some(5),Some(73)), 72 -> CacheItem(72,Some(99),Some(14)), 43 -> CacheItem(43,Some(86),Some(3)), 99 -> CacheItem(99,Some(9),Some(72)), 87 -> CacheItem(87,Some(36),Some(46)), 40 -> CacheItem(40,Some(11),Some(93)), 26 -> CacheItem(26,Some(18),Some(16)), 8 -> CacheItem(8,Some(3),Some(0)), 75 -> CacheItem(75,Some(91),Some(12)), 58 -> CacheItem(58,Some(96),Some(97)), 82 -> CacheItem(82,Some(16),Some(88)), 36 -> CacheItem(36,Some(69),Some(87)), 30 -> CacheItem(30,Some(11),Some(24)), 51 -> CacheItem(51,Some(65),Some(84)), 19 -> CacheItem(19,None,Some(83)), 4 -> CacheItem(4,Some(62),Some(27)), 79 -> CacheItem(79,None,Some(44)), 94 -> CacheItem(94,Some(88),Some(3)), 47 -> CacheItem(47,Some(35),Some(37)), 15 -> CacheItem(15,Some(68),Some(59)), 68 -> CacheItem(68,Some(38),Some(50)), 62 -> CacheItem(62,None,Some(4)), 90 -> CacheItem(90,Some(41),Some(33)), 83 -> CacheItem(83,Some(19),Some(69))), Start: Some(16), End: Some(97)
  • And, because of the issues mentioned above, we see fibers dying because of unexpected errors like this:
Fiber failed.An unchecked error was produced.java.lang.Error: Key does not exist: 35at io.scalac.LRUCache.$anonfun$getExistingCacheItem$1(LRUCache.scala:113)

And well, it seems our current LRUCache implementation just works correctly in a single-fiber scenario, but not in a concurrent scenario, that is really bad. So now, let’s reflect on what’s happening.

Why doesn’t our implementation work in a concurrent scenario?

It may seem weird that our current implementation does not work as expected when multiple fibers use it concurrently We have used immutable values everywhere, pure functions, purely functional mutable references (Ref[A]) that provide atomic operations on them… but wait a moment, Ref[A]provides atomic operations on SINGLE VALUES, but what happens if we need to keep consistency across MULTIPLE VALUES? Remember that in our LRUCacheimplementation, we have three Refs: itemsRef, startRef and endRef. So, it seems using Ref[A] is not a powerful enough solution for our use case:

  • Refs allow atomic operations on single values only.
  • Refs don’t compose! So you can’t compose two Refsto get a resulting Ref.

So, what can we do now?

Enter ZIO STM!

The solution to our problem is using ZIO STM (Software Transactional Memory). For that, ZIO provides two basic data types:

  • ZSTM[-R,+E,+A: Represents an effect that can be performed transactionally, that requires an environment Rto run and that may fail with an error E or succeed with a value A. Also, ZIO provides a type alias when no environment R is required: STM[+E,+A], which is equivalent to ZSTM[Any,E, A].
  • TRef[A]: Represents a Transactional Ref, meaning a purely functional mutable reference that can be used in the context of a transaction.

So, basically, an STMdescribes a bunch of operations across several TRefs.

Important things to notice:

  • STMs are composable (we can use them in for-comprehensions!)
  • All methods in TRref are very similar to the ones in Ref, but they return STM effects instead of ZIO effects.
  • To convert a STM effect to a ZIO effect, you need to commit the transaction: When you commit a transaction, all of its operations are performed in an atomic, consistent and isolated fashion, very similar to how relational databases work.

It’s also worth mentioning that we could use other classic concurrency structures from java.util.concurrent like Locksand Semaphores for solving concurrency issues, but that’s really complicated, low-level and error-prone, and race conditions or deadlocks are very likely to happen. Instead, ZIO STM replaces all of this low-level machinery with a high-level concept: transactions in memory, and we have no race conditions and no deadlocks!

Finally, ZIO STM provides other nice data structures that can participate in transactions (all of them are based in TRef):

  • TArray
  • TQueue
  • TSet
  • TMap
  • TPromise
  • TReentrantLock
  • TSemaphore

Our LRU Cache goes concurrent! Moving from ZIO Ref to ZIO STM

The concurrent version of our LRUCachewill be very similar to what we had before, but we are going to make some changes to use ZIO STM:

final class ConcurrentLRUCache[K, V] private (private val capacity: Int,private val items: TMap[K, CacheItem[K, V]],private val startRef: TRef[Option[K]],private val endRef: TRef[Option[K]])

As you can see, we are changing Refs to TRefs, and instead of having items: TRef[Map[K,CacheItem[K,V]]], we are using the more convenient and efficient TMap data type that ZIO STM provides.

The smart constructor will also be very similar to the one we had before:

object ConcurrentLRUCache {def make[K, V](capacity: Int): IO[IllegalArgumentException, ConcurrentLRUCache[K, V]] =if (capacity > 0) {(for {itemsRef <- TMap.empty[K, CacheItem[K, V]]startRef <- TRef.make(Option.empty[K])endRef <- TRef.make(Option.empty[K])} yield new ConcurrentLRUCache[K, V](capacity, itemsRef, startRef, endRef)).commit} else {ZIO.fail(new IllegalArgumentException(“Capacity must be a positive number!”))}}

The biggest difference in this smart constructor is that the for-comprehension returns a STM[Nothing, ConcurrentLRUCache[K,V]], and we need to commit it for getting a ZIO effect, which is what we want to return.

Next, we have the get and put methods for the ConcurrentLRUCache:

def get(key: K): IO[NoSuchElementException, V] =(for {optionItem <- self.items.get(key)item <- STM.fromOption(optionItem).mapError(_ => new NoSuchElementException(s”Key does not exist: $key”))_ <- removeKeyFromList(key) *> addKeyToStartOfList(key)} yield item.value).commitEither.refineToOrDie[NoSuchElementException]def put(key: K, value: V): UIO[Unit] =(for {optionStart <- self.startRef.getoptionEnd <- self.endRef.get_ <- STM.ifM(self.items.contains(key))(updateItem(key, value), addNewItem(key, value, optionStart, optionEnd))} yield ()).commitEither.orDie

Again, you can see these methods are very similar to the ones we had before! The only difference is that the for-comprehensions in both methods return values of type STM, so we need to commit the transactions (we are using commitEither in this case, so transactions are always committed despite errors, and failures are handled at the ZIO level).

Now, we can take a look at the same auxiliary functions we’ve seen before, but this time with ZIO STM. First, we have the removeKeyFromList function:

private def removeKeyFromList(key: K): STM[Error, Unit] =for {cacheItem <- getExistingCacheItem(key)optionLeftKey = cacheItem.leftoptionRightKey = cacheItem.right_ <- (optionLeftKey, optionRightKey) match {case (Some(l), Some(r)) =>updateLeftAndRightCacheItems(l, r)case (Some(l), None) =>setNewEnd(l)case (None, Some(r)) =>setNewStart(r)case (None, None) =>clearStartAndEnd}} yield ()

As you may have realized, the implementation is practically the same! The difference is that the function returns a STM effect instead of a ZIOeffect. In this case (and the same happens for all private methods) we are not committing the transaction yet, that’s because we want to use these private functions in combination with others, to form bigger transactions that are committed in the get and put methods.

And, here is the getExistingCacheItem function implementation, again it’s very similar to the one we had before, but now an STM effect is returned, and also getting an element from the items Map is a lot easier now, thanks to TMap:

private def getExistingCacheItem(key: K): STM[Error, CacheItem[K, V]] =STM.require(new Error(s”Key does not exist: $key”))(self.items.get(key))

And for updateLeftAndRightCacheItems, putting elements into the items Map is a lot easier now too:

private def updateLeftAndRightCacheItems(l: K, r: K): STM[Error, Unit] =for {leftCacheItem <- getExistingCacheItem(l)rightCacheItem <- getExistingCacheItem(r)_ <- self.items.put(l, leftCacheItem.copy(right = Some(r)))_ <- self.items.put(r, rightCacheItem.copy(left = Some(l)))} yield ()

And, we have addKeyToStartOfList, which again is very similar to the previous version:

private def addKeyToStartOfList(key: K): STM[Error, Unit] =for {oldOptionStart <- self.startRef.get_ <- getExistingCacheItem(key).flatMap { cacheItem =>self.items.put(key, cacheItem.copy(left = None, right = oldOptionStart))}_ <- oldOptionStart match {case Some(oldStart) =>getExistingCacheItem(oldStart).flatMap { oldStartCacheItem =>self.items.put(oldStart, oldStartCacheItem.copy(left = Some(key)))}case None => STM.unit}_ <- self.startRef.set(Some(key))_ <- self.endRef.updateSome { case None => Some(key) }} yield ()

Finally, before testing, let’s add a new zioStmImpl to the IntLRUCacheEnv module. This implementation should make use of the ConcurrentLRUCache we’ve just created:

object IntLRUCacheEnv {object Service {val zioStmImpl: ZLayer[Has[Int], IllegalArgumentException, IntLRUCacheEnv] =ZLayer.fromFunctionM { hasInt: Has[Int] =>ConcurrentLRUCache.make[Int, Int](hasInt.get).map { concurrentLruCache =>new Service {override def getInt(key: Int): IO[NoSuchElementException, Int] = concurrentLruCache.get(key)override def putInt(key: Int, value: Int): UIO[Unit] = concurrentLruCache.put(key, value)override val getCacheStatus: UIO[(Map[Int, CacheItem[Int, Int]], Option[Int], Option[Int])] =concurrentLruCache.getStatus}}}}}

Testing implementation with multiple fibers

Now that we have our ConcurrentLRUCache, let’s put it to test with the following testing code, which is practically the same one we had before (the only difference is that we are providing a IntLRUCacheEnv.Service.zioStmImplnow):

object UseConcurrentLRUCacheWithMultipleFibers extends App {def run(args: List[String]): ZIO[ZEnv, Nothing, Int] =(for {fiberReporter <- reporter.forever.forkfiberProducers <- ZIO.forkAll(ZIO.replicate(100)(producer.forever))fiberConsumers <- ZIO.forkAll(ZIO.replicate(100)(consumer.forever))_ <- getStrLn.orDie *> (fiberReporter <*> fiberProducers <*> fiberConsumers).interrupt} yield 0).provideCustomLayer(ZLayer.succeed(3) >>> IntLRUCacheEnv.Service.zioStmImpl).catchAll(ex => putStrLn(ex.getMessage) *> ZIO.succeed(1))val producer: URIO[Console with Random with IntLRUCacheEnv, Unit] =for {number <- nextInt(100)_ <- putStrLn(s”Producing ($number, $number)”)_ <- putInt(number, number)} yield ()val consumer: URIO[Console with Random with IntLRUCacheEnv, Unit] =(for {key <- nextInt(100)_ <- putStrLn(s”Consuming key: $key”)value <- getInt(key)_ <- putStrLn(s”Consumed value: $value”)} yield ()).catchAll(ex => putStrLn(ex.getMessage))val reporter: ZIO[Console with IntLRUCacheEnv, NoSuchElementException, Unit] =for {(items, optionStart, optionEnd) <- getCacheStatus_ <- putStrLn(s”Items: $items, Start: $optionStart, End: $optionEnd”)} yield ()}

When we run this, everything works as it should! (and the best part is, we didn’t need to use Locks at all!) No more unexpected errors, and the reporter shows our cache keeps internal consistency, this is an example of what is printed to console for two executions of the reporter:

Items: Map(43 -> CacheItem(43,Some(16),None), 16 -> CacheItem(16,Some(32),Some(43)), 32 -> CacheItem(32,None,Some(16))), Start: Some(32), End: Some(43)Items: Map(30 -> CacheItem(30,None,Some(69)), 53 -> CacheItem(53,Some(69),None), 69 -> CacheItem(69,Some(30),Some(53))), Start: Some(30), End: Some(53)

Writing unit tests for the Concurrent LRU Cache using ZIO Test

Finally, we can write some unit tests for our ConcurrentLRUCacheusing zio-test:

object ConcurrentLRUCacheTest extends DefaultRunnableSpec {def spec = suite(“ConcurrentLRUCache”)(testM(“can’t be created with non-positive capacity”) {assertM(ConcurrentLRUCache.make[Int, Int](0).run)(fails(hasMessage(equalTo(“Capacity must be a positive number!”))))},testM(“works as expected”) {val expectedOutput = Vector(“Putting (1, 1)\n”,“Putting (2, 2)\n”,“Getting key: 1\n”,“Obtained value: 1\n”,“Putting (3, 3)\n”,“Getting key: 2\n”,“Key does not exist: 2\n”,“Putting (4, 4)\n”,“Getting key: 1\n”,“Key does not exist: 1\n”,“Getting key: 3\n”,“Obtained value: 3\n”,“Getting key: 4\n”,“Obtained value: 4\n”)for {lruCache <- ConcurrentLRUCache.make[Int, Int](2)_ <- put(lruCache, 1, 1)_ <- put(lruCache, 2, 2)_ <- get(lruCache, 1)_ <- put(lruCache, 3, 3)_ <- get(lruCache, 2)_ <- put(lruCache, 4, 4)_ <- get(lruCache, 1)_ <- get(lruCache, 3)_ <- get(lruCache, 4)output <- TestConsole.output} yield {assert(output)(equalTo(expectedOutput))}})private def get[K, V](concurrentLruCache: ConcurrentLRUCache[K, V], key: K): ZIO[Console, Nothing, Unit] =(for {_ <- putStrLn(s”Getting key: $key”)v <- concurrentLruCache.get(key)_ <- putStrLn(s”Obtained value: $v”)} yield ()).catchAll(ex => putStrLn(ex.getMessage))private def put[K, V](concurrentLruCache: ConcurrentLRUCache[K, V], key: K, value: V): ZIO[Console, Nothing, Unit] =putStrLn(s”Putting ($key, $value)”) *> concurrentLruCache.put(key, value)}

The first test is for asserting that trying to create a ConcurrentLRUCachewith a non-positive capacity would result in a failure.

The second test is for asserting that the cache works as expected, for that we use the TestConsole module provided by zio-test, for asserting that the expected messages are printed to the console.

I won’t go into more details about how zio-test works, but you can read about it in the ZIO documentation page.

Summary

In this article, we’ve seen a concrete example of writing concurrent data structures with ZIO: a concurrent LRU cache. Because ZIO is based on functional programming principles — such as using pure functions and immutable values — it was really easy and painless to evolve our initial implementation. This didn’t support concurrency and was based on ZIO Ref, to a fully concurrent version, based on ZIO STM, without all the complicated stuff that comes when using lower-level concurrency structures such as Locks, and with no deadlocks or race conditions at all.

In addition, this was just a very specific example of what you can do with ZIO STM for writing concurrent data structures, so there’s a lot more you can do with it in your own projects, in a totally async, non-blocking and thread-safe fashion. So make sure to give it a try!

References

--

--

jorge.vasquez
Scalac
Writer for

I‘m a software developer at @scalac. I’m excited about new libraries like ZIO that are making Scala Functional Programming more accessible.