How to write a (completely lock-free) concurrent LRU Cache with ZIO STM
DISCLAIMER: THIS BLOG WAS FIRST PUBLISHED ON SCALAC BLOG.
Introduction
Writing concurrent data structures using traditional tools, like everything under java.util.concurrent,
is generally a very complicated task. You really need to think about what you are doing so you avoid typical concurrency issues, such as deadlocks and race conditions. And let’s be honest, predicting all the possible scenarios that could arise is not just hard, but also sometimes infeasible.
Therefore, in this article, we are going to see how ZIO STM makes our lives a lot easier when it comes to writing concurrent data structures, such as a concurrent LRU Cache, in a completely lock-free fashion that is a lot simpler to reason about.
Requirements of an LRU Cache
A cache is a structure that stores data (which might be the result of an earlier computation or obtained from external sources such as databases) so that future requests for that data can be served faster.
Now, a Least Recently Used (LRU) Cache must fulfill these requirements:
- Fixed capacity: This is for limiting memory usage.
- Fast access: Insert and lookup operations should be executed in O(1) time.
- An efficient eviction algorithm: The idea is that, when the cache capacity is reached and a new entry needs to be inserted into the cache, the Least Recently Used entry gets replaced.
More concretely, an LRU cache should support the two following operations:
- get(key): Get the value of a given key if it exists in the cache, otherwise return an error.
- put(key, value): Put the given (key, value) into the cache. When the cache reaches its capacity, it should evict the Least Recently Used entry before inserting a new one.
That means, for implementing an efficient LRU Cache (meaning get and put operations are executed in O(1) time), we could use two data structures:
- Hash Map: containing (key, value) pairs.
- Doubly linked list: which will contain the history of the referenced keys. The The Most Recently Used key will be at the start of the list, and the Least Recently Used will be at the end.
In the following image, we can see an example of the status of an LRU Cache (with a capacity of 4) at a given moment:
That means that the history of the referenced keys (1, 3, 2 and 4) shows that Key 1 is the Most Recently Used (because it’s at the start of the list), and that Key 4 is the Least Recently Used (because it’s at the end of the list). So if a new item needs to be stored into the cache, Item 4 would have to be replaced.
Don’t forget to check out
7 ZIO experts share why they choose ZIO
Quick introduction to ZIO
According to the ZIO documentation page, ZIO is a library for “Type-safe, composable asynchronous and concurrent programming for Scala”. This means ZIO allows us to build applications that are:
- Highly composable: Because ZIO is based on functional programming principles, such as using pure functions and immutable values, it allows us to easily compose solutions to complex problems from simple building blocks.
- 100% asynchronous and non-blocking.
- Highly performant and concurrent: ZIO implements Fiber-based concurrency. By the way, you can read more about ZIO Fibers in this really nice article written by Mateusz Sokół here.
- Type-safe: ZIO leverages the Scala Type System so it can catch more bugs at compile time.
The most important data type in ZIO (and also the basic building block of ZIO applications) is also called ZIO:
ZIO[-R, +E, +A]
The ZIO data type is called a functional effect, which means it is a lazy, immutable value which contains a description of a series of interactions with the outside world (database interactions, calling external APIs, etc.). A nice mental model of the ZIO data type is the following:
ZEnvironment[R] => Either[E, A]
That means a ZIO effect needs an environment of type ZEnvironment[R] to run (the environment could contain anything: a database connection, a REST client, a configuration object, etc.), and it can either fail with an error of type E or succeed with a value of type A.
Finally, it’s worth mentioning that ZIO provides some type aliases for the ZIO effect type which are very useful to represent some common use cases:
Task[+A] = ZIO[Any, Throwable, A]
: This means a Task[A]
is a ZIO
effect that:
- Doesn’t require an environment to run (that’s why the
R
type is replaced byAny
, meaning the effect will run no matter what we provide to it as an environment) - Can fail with a
Throwable
- Can succeed with an
A
UIO[+A] = ZIO[Any, Nothing, A]
: This means a UIO[A]
is a ZIO effect that:
- Doesn’t require an environment to run.
- Can’t fail
- Can succeed with an
A
RIO[-R, +A] = ZIO[R, Throwable, A]
: This means a RIO[R, A]
is a ZIO effect that:
- Requires an environment
R
to run - Can fail with a
Throwable
- Can succeed with an
A
IO[+E, +A] = ZIO[Any, E, A]
: This means a IO[E, A]
is a ZIO effect that:
- Doesn’t require an environment to run.
- Can fail with an
E
- Can succeed with an
A
URIO[-R, +A] = ZIO[R, Nothing, A]
: This means a URIO[R, A]
is a ZIO effect that:
- Requires an environment
R
to run - Can’t fail
- Can succeed with an
A
Implementing the LRU Cache with ZIO Ref
First, we need to add some ZIO dependencies to our build.sbt
:
val scalaVer = "2.13.10"
val zioVersion = "2.0.13"
lazy val compileDependencies = Seq(
"dev.zio" %% "zio" % zioVersion,
"dev.zio" %% "zio-macros" % zioVersion
) map (_ % Compile)
lazy val testDependencies = Seq(
"dev.zio" %% "zio-test" % zioVersion,
"dev.zio" %% "zio-test-sbt" % zioVersion
) map (_ % Test)
lazy val settings = Seq(
name := "zio-lru-cache",
version := "2.0.0",
scalaVersion := scalaVer,
scalacOptions += "-Ymacro-annotations",
libraryDependencies ++= compileDependencies ++ testDependencies,
testFrameworks := Seq(new TestFramework("zio.test.sbt.ZTestFramework"))
)
lazy val root = (project in file("."))
.settings(settings)
Now, we can begin with the implementation of the LRU Cache. An initial model could be:
final class LRUCache[K, V](
private val capacity: Int,
private var items: Map[K, CacheItem[K, V]],
private var start: Option[K],
private var end: Option[K]
)
Now, the LRUCache should have:
- A capacity (which should be a positive integer set on creation and shouldn’t change anymore, that’s why it’s modeled as a
val
). - A Map containing items, this will change all the time, which is why we are modeling this as a
var
. By the way, the model of aCacheItem
would be like this:
final case class CacheItem[K, V](value: V, left: Option[K], right: Option[K])
This means each CacheItem
should not just contain a value to be stored, but also references to the left and right keys in the history of referenced keys (remember we’ll use a doubly linked list for keeping a history of referenced keys). These are modeled as Option
because, if an item is at the start of the history (meaning it’s the Most Recently Used item), there won’t be any item on its left. Something similar happens when an item is at the end of the history (meaning it’s the Least Recently Used item), there won’t be any item on its right.
- References to the start and end keys, these will also change all the time, and that’s why they are
vars
.
There’s a problem with this implementation though: the fact we are resorting to vars
. In functional programming, we should model everything as immutable values, and also using vars
will make it harder to use the LRUCache
in concurrent scenarios (using mutability in our applications instantly makes them prone to race conditions!).
So, what can we do? Well, ZIO has the answer! We can use its Ref[A]
data type, which is a purely functional description of a mutable reference. The fundamental operations of a Ref are get and set, and both of them return ZIO
effects which describe the operations of reading from and writing to the Ref
.
Then, a better (and purely functional) version of our LRUCache
would be:
import zio._
final case class LRUCacheRef[K, V](
private val capacity: Int,
private val itemsRef: Ref[Map[K, CacheItem[K, V]]],
private val startRef: Ref[Option[K]],
private val endRef: Ref[Option[K]]
)
In addition, let’s define the interface LRUCacheRef
which should implement:
import zio._
import zio.macros.accessible
@accessible
trait LRUCache[K, V] {
def get(key: K): IO[NoSuchElementException, V] def put(key: K, value: V): UIO[Unit] def getStatus: UIO[(Map[K, CacheItem[K, V]], Option[K], Option[K])]
}
By the way, you can see we are using the @accessible
annotation from the zio-macros
library, which just generates the following accessor methods in the LRUCache
companion object:
object LRUCache {
def get[K, V](key: K): ZIO[LRUCache[K, V], NoSuchElementException, V]
def put[K, V](key: K, value: V): URIO[LRUCache[K, V], Unit] def getStatus[K, V]: URIO[LRUCache[K, V], (Map[K, CacheItem[K, V]], Option[K], Option[K])]
}
If you want to understand more about accessors (and also ZLayers), you can take a look at the “Mastering Modularity in ZIO with ZLayer” ebook in the Scalac blog.
Now we can do the following:
- Make the constructor private.
- Make
LRUCacheRef
extend theLRUCache
trait. - Create a
ZLayer
in theLRUCacheRef
companion object that will contain the description of how to instantiate anLRUCacheRef
.
import zio._
final class LRUCacheRef[K, V] private (
private val capacity: Int,
private val itemsRef: Ref[Map[K, CacheItem[K, V]]],
private val startRef: Ref[Option[K]],
private val endRef: Ref[Option[K]]
) extends LRUCache[K, V]object LRUCacheRef {
def layer[K: Tag, V: Tag](capacity: Int): ULayer[LRUCache[K, V]] =ZLayer {
if (capacity > 0) {
for {
itemsRef <- Ref.make(Map.empty[K, CacheItem[K, V]])
startRef <- Ref.make(Option.empty[K])
endRef <- Ref.make(Option.empty[K])
} yield LRUCacheRef(capacity, itemsRef, startRef, endRef)
} else ZIO.die(new IllegalArgumentException("Capacity must be a positive number!"))
}
}
We can see the LRUCacheRef.layer
method expects to receive a capacity
, and it returns a ZLayer
which can die with an IllegalArgumentException
(when a non-positive capacity is provided) or can succeed with an LRUCache[K, V]
. We also know that the LRUCacheRef
constructor expects to receive not just the capacity
, but also the initial values for itemsRef,
startRef
and endRef
. For creating these Refs
, we can use the Ref.make
function, which receives the initial value for the Ref
and returns a UIO[Ref[A]]
, and because ZIO
effects are monads (meaning they have map
and flatMap
methods), we can combine the results of calling Ref.make
using for-comprehension syntax, for yielding a new LRUCacheRef
.
Next, we can implement the get
and put
methods for LRUCacheRef
. Let’s start with the get
method first:
def get(key: K): IO[NoSuchElementException, V] =
for {
items <- self.itemsRef.get
item <- ZIO.from(items.get(key)).orElseFail(new NoSuchElementException(s"Key does not exist: $key"))
_ <- removeKeyFromList(key) *> addKeyToStartOfList(key)
} yield item.value
As you can see, the method implementation looks really nice and simple: it’s practically just a description of what to do when getting an element from the cache:
- Firstly, we need to get the
items
Map from theitemsRef
- Next, we need to obtain the requested
key
from theitems
map. Thiskey
may or may not exist, in the case it exists, the flow just continues and, if it doesn’t, the method fails with aNoSuchElementException
and the flow execution stops. - After the item is obtained from the Map, we need to update the history of referenced keys, because the requested key becomes the Most Recently Used . That’s why we need to call the auxiliary functions
removeKeyFromList
andaddKeyToStartOfList
. - Finally, the item value is returned.
Now, let’s see the put
method implementation, again. It’s really simple:
def put(key: K, value: V): UIO[Unit] =
ZIO
.ifZIO(self.itemsRef.get.map(_.contains(key)))(
updateItem(key, value),
addNewItem(key, value)
)
We can see that the method checks whether the provided key
is already present in the items
map (again, we are accessing the Map calling the get
method on itemsRef
):
- If the
key
is already present, theupdateItem
auxiliary function is called. - Otherwise, a new item is added, by calling the
addNewItem
auxiliary function.
Now we can take a look at some auxiliary functions (we won’t go into the details of every auxiliary function, for more details, you can take a look at the complete source code in the jorge-vasquez-2301/zio-lru-cache repository). First off, we have the removeKeyFromList
function:
private def removeKeyFromList(key: K): UIO[Unit] =
for {
cacheItem <- getExistingCacheItem(key)
optionLeftKey = cacheItem.left
optionRightKey = cacheItem.right
_ <- (optionLeftKey, optionRightKey) match {
case (Some(l), Some(r)) =>
updateLeftAndRightCacheItems(l, r)
case (Some(l), None) =>
setNewEnd(l)
case (None, Some(r)) =>
setNewStart(r)
case (None, None) =>
clearStartAndEnd
}
} yield ()
As you can see, the implementation is pretty straightforward, and it considers all the possible cases when removing a key from the history of referenced keys:
- When the key to be removed has other keys to its left and right, the corresponding cache items have to be updated so they point to each other.
- When the key to be removed has another key to its left, but not to its right, it means the key to be removed is at the end of the list, so the end has to be updated.
- When the key to be removed has another key to its right, but not to its left, it means the key to be removed is at the start of the list, so the start has to be updated.
- When the key to be removed has no keys to left nor right, that means the key to be removed is the only one, so the start and end references have to be cleared.
And here is the getExistingCacheItem
function implementation:
private def getExistingCacheItem(key: K): UIO[CacheItem[K, V]] =
self.itemsRef.get.map(_.get(key)).someOrElseZIO(ZIO.dieMessage(s"Key does not exist: $key, but it should!"))
This function is named like this because the idea is that, when we use it, we are expecting that the cache item we want to get exists. If the item does not exist, it means there’s some kind of problem, and we are signaling that with ZIO.dieMessage
.
Another interesting function to look at is updateLeftAndRightCacheItems
, because it shows a use case of ZIO Ref#update
, which atomically modifies a Ref
with the specified function.
private def updateLeftAndRightCacheItems(l: K, r: K): UIO[Unit] =
for {
leftCacheItem <- getExistingCacheItem(l)
rightCacheItem <- getExistingCacheItem(r)
_ <- self.itemsRef.update(_.updated(l, leftCacheItem.copy(right = Some(r))))
_ <- self.itemsRef.update(_.updated(r, rightCacheItem.copy(left = Some(l))))
} yield ()
Finally, let’s take a look at the addKeyToStartOfList
function, which is also pretty straightforward. Something to notice is we are using Ref#updateSome
for updating the endRef
value, just when it’s empty.
private def addKeyToStartOfList(key: K): UIO[Unit] =
for {
oldOptionStart <- self.startRef.get
_ <- getExistingCacheItem(key).flatMap { cacheItem =>
self.itemsRef.update(_.updated(key, cacheItem.copy(left = None, right = oldOptionStart)))
}
_ <- oldOptionStart match {
case Some(oldStart) =>
getExistingCacheItem(oldStart).flatMap { oldStartCacheItem =>
self.itemsRef.update(_.updated(oldStart, oldStartCacheItem.copy(left = Some(key))))
}
case None => ZIO.unit
}
_ <- self.startRef.set(Some(key))
_ <- self.endRef.updateSome { case None => Some(key) }
} yield ()
Testing implementation with a single fiber
It’s time to put our LRUCacheRef
under test! Firstly, we are going to test it under a single-fiber scenario. The testing code is the following (by the way, this testing code reflects the example shown on this link):
object UseLRUCacheRefWithOneFiber extends ZIOAppDefault {
lazy val run: UIO[Unit] =
(for {
_ <- put(1, 1)
_ <- put(2, 2)
_ <- get(1)
_ <- put(3, 3)
_ <- get(2)
_ <- put(4, 4)
_ <- get(1)
_ <- get(3)
_ <- get(4)
} yield ()).provideLayer(LRUCacheRef.layer(capacity = 2))
private def get(key: Int): URIO[LRUCache[Int, Int], Unit] =
(for {
v <- Console.printLine(s"Getting key: $key") *> LRUCache.get[Int, Int](key)
_ <- Console.printLine(s"Obtained value: $v")
} yield ()).catchAll(ex => Console.printLine(ex.getMessage).orDie) private def put(key: Int, value: Int): URIO[LRUCache[Int, Int], Unit] =
Console.printLine(s"Putting ($key, $value)").orDie *> LRUCache.put(key, value)
}
So now we are running the application with a LRUCacheRef
, with a capacity of 2. After executing the above program, the following result is obtained:
Putting (1, 1)
Putting (2, 2)
Getting key: 1
Obtained value: 1
Putting (3, 3)
Getting key: 2
Key does not exist: 2
Putting (4, 4)
Getting key: 1
Key does not exist: 1
Getting key: 3
Obtained value: 3
Getting key: 4
Obtained value: 4
As we can see, the behavior is correct! That means our implementation looks good so far.
Testing implementation with multiple concurrent fibers
Now, let’s test the LRUCacheRef
again, but against multiple concurrent fibers this time. The testing code is the following:
object UseLRUCacheWithMultipleFibers extends ZIOAppDefault {
lazy val run =
(for {
fiberReporter <- reporter.forever.fork
fiberProducers <- startWorkers(producer)
fiberConsumers <- startWorkers(consumer)
_ <- Console.readLine.orDie *> (fiberReporter <*> fiberProducers <*> fiberConsumers).interrupt
} yield ()).provideLayer(layer)
lazy val layer = LRUCacheRef.layer[Int, Int](capacity = 3) def startWorkers(worker: URIO[LRUCache[Int, Int], Unit]) =
ZIO.forkAll {
ZIO.replicate(100) {
worker.forever.catchAllCause(cause => Console.printLineError(cause.prettyPrint))
}
} lazy val producer: URIO[LRUCache[Int, Int], Unit] =
for {
number <- Random.nextIntBounded(100)
_ <- Console.printLine(s"Producing ($number, $number)").orDie *> LRUCache.put(number, number)
} yield () lazy val consumer: URIO[LRUCache[Int, Int], Unit] =
(for {
key <- Random.nextIntBounded(100)
value <- Console.printLine(s"Consuming key: $key") *> LRUCache.get[Int, Int](key)
_ <- Console.printLine(s"Consumed value: $value")
} yield ()).catchAll(ex => Console.printLine(ex.getMessage).orDie) lazy val reporter: URIO[LRUCache[Int, Int], Unit] =
for {
status <- LRUCache.getStatus[Int, Int]
(items, optionStart, optionEnd) = status
_ <- Console.printLine(s"Items: $items, Start: $optionStart, End: $optionEnd").orDie
} yield ()
}
We can see that a layer
which builds an LRUCacheRef
with a capacity of 3 is provided. In addition, 100 producers and 100 consumers of random integers are started in different fibers, and we have a reporter that will just print to the console the cache current status (stored items, start and end keys of the recently used items history). When we execute this, some ugly stuff happens:
- Firstly, more items than the defined capacity (a lot more) are being stored! And also, the stored items have a lot of inconsistencies. For example, you can see below that, for any given moment, the end key (the Least Recently Used key) is 97, but looking at the corresponding
CacheItem
we see it has other keys to its left and right (58 and 9 respectively), but… if 97 is at the end of the list, it shouldn’t have an item to its right! Besides this, there are a lot more discrepancies betweenCacheItems
:
Items: HashMap(5 -> CacheItem(5,Some(45),Some(6)), 84 -> CacheItem(84,Some(51),Some(91)), 69 -> CacheItem(69,Some(83),Some(36)), 0 -> CacheItem(0,None,Some(37)), 88 -> CacheItem(88,Some(82),Some(94)), 10 -> CacheItem(10,Some(37),Some(45)), 56 -> CacheItem(56,Some(54),Some(42)), 42 -> CacheItem(42,Some(6),Some(60)), 24 -> CacheItem(24,Some(30),Some(18)), 37 -> CacheItem(37,Some(0),Some(10)), 52 -> CacheItem(52,Some(70),Some(91)), 14 -> CacheItem(14,Some(72),Some(1)), 20 -> CacheItem(20,None,Some(46)), 46 -> CacheItem(46,Some(28),Some(70)), 93 -> CacheItem(93,Some(40),Some(6)), 57 -> CacheItem(57,Some(12),Some(45)), 78 -> CacheItem(78,None,Some(41)), 61 -> CacheItem(61,None,Some(26)), 1 -> CacheItem(1,Some(14),Some(2)), 74 -> CacheItem(74,None,Some(33)), 6 -> CacheItem(6,Some(5),Some(42)), 60 -> CacheItem(60,Some(42),Some(80)), 85 -> CacheItem(85,None,Some(99)), 70 -> CacheItem(70,Some(46),Some(52)), 21 -> CacheItem(21,None,Some(65)), 33 -> CacheItem(33,Some(77),Some(32)), 28 -> CacheItem(28,None,Some(46)), 38 -> CacheItem(38,Some(98),Some(68)), 92 -> CacheItem(92,Some(63),Some(0)), 65 -> CacheItem(65,Some(21),Some(51)), 97 -> <strong>CacheItem(97,Some(58),Some(9))</strong>, 9 -> CacheItem(9,Some(97),Some(99)), 53 -> CacheItem(53,None,Some(91)), 77 -> CacheItem(77,Some(27),Some(33)), 96 -> CacheItem(96,Some(3),Some(58)), 13 -> CacheItem(13,Some(14),Some(28)), 41 -> CacheItem(41,Some(78),Some(90)), 73 -> CacheItem(73,None,Some(41)), 2 -> CacheItem(2,Some(1),Some(92)), 32 -> CacheItem(32,Some(33),Some(98)), 45 -> CacheItem(45,Some(10),Some(5)), 64 -> CacheItem(64,None,Some(34)), 17 -> CacheItem(17,None,Some(35)), 22 -> CacheItem(22,None,Some(7)), 44 -> CacheItem(44,Some(79),Some(92)), 59 -> CacheItem(59,Some(15),Some(68)), 27 -> CacheItem(27,Some(4),Some(77)), 71 -> CacheItem(71,Some(46),Some(19)), 12 -> CacheItem(12,Some(75),Some(57)), 54 -> CacheItem(54,None,Some(56)), 49 -> CacheItem(49,None,Some(63)), 86 -> CacheItem(86,None,Some(43)), 81 -> CacheItem(81,Some(98),Some(1)), 76 -> CacheItem(76,None,Some(35)), 7 -> CacheItem(7,Some(22),Some(33)), 39 -> CacheItem(39,None,Some(4)), 98 -> CacheItem(98,Some(32),Some(81)), 91 -> CacheItem(91,Some(52),Some(75)), 66 -> CacheItem(66,None,Some(27)), 3 -> CacheItem(3,Some(94),Some(96)), 80 -> CacheItem(80,Some(60),Some(84)), 48 -> CacheItem(48,None,Some(9)), 63 -> CacheItem(63,Some(49),Some(3)), 18 -> CacheItem(18,Some(24),Some(26)), 95 -> CacheItem(95,None,Some(65)), 50 -> CacheItem(50,Some(68),Some(58)), 67 -> CacheItem(67,None,Some(21)), 16 -> CacheItem(16,None,Some(82)), 11 -> CacheItem(11,Some(5),Some(73)), 72 -> CacheItem(72,Some(99),Some(14)), 43 -> CacheItem(43,Some(86),Some(3)), 99 -> CacheItem(99,Some(9),Some(72)), 87 -> CacheItem(87,Some(36),Some(46)), 40 -> CacheItem(40,Some(11),Some(93)), 26 -> CacheItem(26,Some(18),Some(16)), 8 -> CacheItem(8,Some(3),Some(0)), 75 -> CacheItem(75,Some(91),Some(12)), 58 -> CacheItem(58,Some(96),Some(97)), 82 -> CacheItem(82,Some(16),Some(88)), 36 -> CacheItem(36,Some(69),Some(87)), 30 -> CacheItem(30,Some(11),Some(24)), 51 -> CacheItem(51,Some(65),Some(84)), 19 -> CacheItem(19,None,Some(83)), 4 -> CacheItem(4,Some(62),Some(27)), 79 -> CacheItem(79,None,Some(44)), 94 -> CacheItem(94,Some(88),Some(3)), 47 -> CacheItem(47,Some(35),Some(37)), 15 -> CacheItem(15,Some(68),Some(59)), 68 -> CacheItem(68,Some(38),Some(50)), 62 -> CacheItem(62,None,Some(4)), 90 -> CacheItem(90,Some(41),Some(33)), 83 -> CacheItem(83,Some(19),Some(69))), Start: Some(16), <strong>End: Some(97)</strong>
- On top of that, because of the issues mentioned above, we see fibers dying because of unexpected errors. For example:
Exception in thread "zio-fiber-102" java.lang.RuntimeException: Key does not exist: 54, but it should!
at com.example.cache.LRUCacheRef.getExistingCacheItem(LRUCacheRef.scala:107)
at com.example.cache.LRUCacheRef.removeKeyFromList(LRUCacheRef.scala:69)
at com.example.cache.LRUCacheRef.replaceEndCacheItem(LRUCacheRef.scala:47)
at com.example.UseLRUCacheRefWithMultipleFibers.producer(Main.scala:46)
at com.example.UseLRUCacheRefWithMultipleFibers.run(Main.scala:36)
at com.example.UseLRUCacheRefWithMultipleFibers.run(Main.scala:34)
And so, it seems our current LRUCacheRef
implementation just works correctly in a single-fiber scenario, but not in a concurrent scenario, which is really bad. So now, let’s reflect on what’s happening.
Why doesn’t our implementation work in a concurrent scenario?
It may seem weird that our current implementation is not working as expected when multiple fibers use it concurrently. We have used immutable values everywhere, pure functions, purely functional mutable references (Ref[A]
) that provide atomic operations on them. But wait a moment, Ref[A]
provides atomic operations on SINGLE VALUES, but what happens if we need to keep consistency across MULTIPLE VALUES? Remember that in our LRUCacheRef
implementation, we have three Refs: itemsRef
, startRef
and endRef
. So it seems using Ref[A]
is not a powerful enough solution for our use case:
Refs
allow atomic operations on single values only.Refs
don’t compose! So you can’t compose twoRefs
to get a resultingRef
.
So what can we do now?
Enter ZIO STM!
The solution to our problem is to use ZIO STM (Software Transactional Memory). For that, ZIO provides two basic data types:
ZSTM[-R, +E, +A]
: Represents an effect that can be performed transactionally, that requires an environmentZEnvironment[R]
to run and that may fail with an errorE
or succeed with a valueA
. Also,ZSTM
has type aliases very similar to those forZIO
:
TaskSTM[+A] = ZSTM[Any, Throwable, A]
USTM[+A] = ZSTM[Any, Nothing, A]
RSTM[-R, +A] = ZSTM[R, Throwable, A]
STM[+E, +A] = ZSTM[Any, E, A]
URSTM[-R, +A] = ZSTM[R, Nothing, A]
TRef[A]
: Represents a TransactionalRef
, meaning a purely functional mutable reference that can be used in the context of a transaction.
Therefore, basically, a ZSTM
describes a bunch of operations across several TRefs
.
Important things to note:
ZSTMs
are composable (we can use them in for-comprehensions!)- All those methods in
TRef
are very similar to those inRef
, but they returnZSTM
effects instead ofZIO
effects. - To convert a
ZSTM
effect to aZIO
effect, you need to commit the transaction. When you commit a transaction, all of its operations are performed in an atomic, consistent and isolated fashion, very similar to how relational databases work.
It’s also worth mentioning that we could use other classic concurrency structures from java.util.concurrent
such as Locks
and Semaphores
for solving concurrency issues. However, that’s really complicated, low-level and error prone, and race conditions or deadlocks are very likely to happen. Instead, ZIO STM replaces all of this low-level machinery with a high-level concept: transactions in memory, and we have no race conditions and no deadlocks!
Finally, ZIO STM provides other nice data structures that can participate in transactions (all of them are based in TRef
):
TArray
TDequeue
TEnqueue
THub
TMap
TPriorityQueue
TPromise
TQueue
TRandom
TReentrantLock
TRef
TSemaphore
TSet
Our LRU Cache goes concurrent! Moving from Ref to TRef
TThe concurrent version of our LRUCache
will be very similar to what we had before, but we are going to make some changes to use ZIO STM:
import zio._
import zio.stm._
final case class LRUCacheSTM[K, V] private (
private val capacity: Int,
private val items: TMap[K, CacheItem[K, V]],
private val startRef: TRef[Option[K]],
private val endRef: TRef[Option[K]]
) extends LRUCache[K, V]
As you can see, we are changing Refs
to TRefs
, and instead of having items
: TRef[Map[K, CacheItem[K, V]]]
, we are using the more convenient and efficient TMap
data type that ZIO STM provides.
The LRUCacheSTM.layer
method will also be very similar to the LRUCacheRef.layer
:
object LRUCacheSTM {
def layer[K: Tag, V: Tag](capacity: Int): ULayer[LRUCache[K, V]] =
ZLayer {
if (capacity > 0) {
(for {
itemsRef <- TMap.empty[K, CacheItem[K, V]]
startRef <- TRef.make(Option.empty[K])
endRef <- TRef.make(Option.empty[K])
} yield LRUCacheSTM[K, V](capacity, itemsRef, startRef, endRef)).commit
} else ZIO.die(new IllegalArgumentException("Capacity must be a positive number!"))
}
}
The biggest difference in this method is that the for-comprehension returns a ZSTM
effect, and we need to commit
it to get a ZIO effect, which is what we want to return.
Next, we have the get
and put
methods for the LRUCacheSTM
:
def get(key: K): IO[NoSuchElementException, V] =
(for {
optionItem <- self.items.get(key)
item <- STM.fromOption(optionItem).mapError(_ => new NoSuchElementException(s"Key does not exist: $key"))
_ <- removeKeyFromList(key) *> addKeyToStartOfList(key)
} yield item.value).commitEitherdef put(key: K, value: V): UIO[Unit] =
STM.ifSTM(self.items.contains(key))(updateItem(key, value), addNewItem(key, value)).commitEither
you can see these methods are very similar to the ones we had before! The only difference is that the for-comprehensions in both methods return values of type ZSTM, so we need to commit the transactions (we are using commitEither in this case, so transactions are always committed despite errors, and failures are handled at the ZIO level).
Now, we can take a look at the same auxiliary functions we’ve seen before, but this time with ZIO STM. Firstly, we have the removeKeyFromList
function:
private def removeKeyFromList(key: K): USTM[Unit] =
for {
cacheItem <- getExistingCacheItem(key)
optionLeftKey = cacheItem.left
optionRightKey = cacheItem.right
_ <- (optionLeftKey, optionRightKey) match {
case (Some(l), Some(r)) =>
updateLeftAndRightCacheItems(l, r)
case (Some(l), None) =>
setNewEnd(l)
case (None, Some(r)) =>
setNewStart(r)
case (None, None) =>
clearStartAndEnd
}
} yield ()
As you may realize, the implementation is practically the same! The difference is that the function is returning a ZSTM
effect instead of a ZIO
effect. In this case (and the same happens for all private methods) we are not committing the transaction yet, that’s because we want to use these private functions in combination with others, to form bigger transactions that are committed in the get
and put
methods.
And so here is the getExistingCacheItem
function implementation. Again, it’s very similar to the one we had before, but now a ZSTM
effect is returned, and also getting an element from the items Map is easier now, thanks to TMap
:
pprivate def getExistingCacheItem(key: K): USTM[CacheItem[K, V]] =
self.items.get(key).someOrElseSTM(STM.dieMessage(s"Key $key does not exist, but it should!"))
And for updateLeftAndRightCacheItems
, putting elements into the items Map is a lot easier now too:
private def updateLeftAndRightCacheItems(l: K, r: K): USTM[Unit] =
for {
leftCacheItem <- getExistingCacheItem(l)
rightCacheItem <- getExistingCacheItem(r)
_ <- self.items.put(l, leftCacheItem.copy(right = Some(r)))
_ <- self.items.put(r, rightCacheItem.copy(left = Some(l)))
} yield ()
We also have addKeyToStartOfList
, which again is very similar to the previous version:
private def addKeyToStartOfList(key: K): USTM[Unit] =
for {
oldOptionStart <- self.startRef.get
_ <- getExistingCacheItem(key).flatMap { cacheItem =>
self.items.put(key, cacheItem.copy(left = None, right = oldOptionStart))
}
_ <- oldOptionStart match {
case Some(oldStart) =>
getExistingCacheItem(oldStart).flatMap { oldStartCacheItem =>
self.items.put(oldStart, oldStartCacheItem.copy(left = Some(key)))
}
case None => STM.unit
}
_ <- self.startRef.set(Some(key))
_ <- self.endRef.updateSome { case None => Some(key) }
} yield ()
If you’re looking for more beginner content, see:
Introduction to Programming with ZIO Functional Effects
Introducción a la Programación con Efectos Funcionales usando ZIO
Testing implementation with multiple fibers
Now that we have our LRUCacheSTM
, let’s put it under test with the testing code we already have. The only difference is that we will provide an LRUCacheSTM.layer
now:
object UseLRUCacheWithMultipleFibers extends ZIOAppDefault {
lazy val run = …
lazy val layer = LRUCacheSTM.layer[Int, Int](capacity = 3) …
}
When we run this, everything works as it should! (and the best part is, we didn’t need to use Locks at all!) No more unexpected errors happen, and the reporter shows our cache keeps internal consistency. Here is an example of what is printed to console for two executions of the reporter:
Items: Map(43 -> CacheItem(43,Some(16),None), 16 -> CacheItem(16,Some(32),Some(43)), 32 -> CacheItem(32,None,Some(16))), Start: Some(32), End: Some(43)<br>Items: Map(30 -> CacheItem(30,None,Some(69)), 53 -> CacheItem(53,Some(69),None), 69 -> CacheItem(69,Some(30),Some(53))), Start: Some(30), End: Some(53)
Writing unit tests for the Concurrent LRU Cache using ZIO Test
Finally, we can write some unit tests for our LRUCacheSTM
using zio-test:
import zio._
import zio.test._
object LRUCacheSTMSpec extends ZIOSpecDefault {
def spec =
suite("LRUCacheSTM")(
test("can't be created with non-positive capacity") {
for {
result <- LRUCache
.put(1, 1)
.provideLayer(LRUCacheSTM.layer(-2))
.absorb
.either
.left
.map(_.getMessage)
} yield assertTrue(result == "Capacity must be a positive number!")
},
test("works as expected") {
val expectedOutput = Vector(
"Putting (1, 1)\n",
"Putting (2, 2)\n",
"Getting key: 1\n",
"Obtained value: 1\n",
"Putting (3, 3)\n",
"Getting key: 2\n",
"Key does not exist: 2\n",
"Putting (4, 4)\n",
"Getting key: 1\n",
"Key does not exist: 1\n",
"Getting key: 3\n",
"Obtained value: 3\n",
"Getting key: 4\n",
"Obtained value: 4\n"
) for {
_ <- put(1, 1)
_ <- put(2, 2)
_ <- get(1)
_ <- put(3, 3)
_ <- get(2)
_ <- put(4, 4)
_ <- get(1)
_ <- get(3)
_ <- get(4)
output <- TestConsole.output
} yield assertTrue(output == expectedOutput)
}.provideLayer(LRUCacheSTM.layer[Int, Int](2))
) @@ TestAspect.silent private def get(key: Int): URIO[LRUCache[Int, Int], Unit] =
(for {
v <- Console.printLine(s"Getting key: $key") *> LRUCache.get[Int, Int](key)
_ <- Console.printLine(s"Obtained value: $v")
} yield ()).catchAll(ex => Console.printLine(ex.getMessage).orDie) private def put(key: Int, value: Int): URIO[LRUCache[Int, Int], Unit] =
Console.printLine(s"Putting ($key, $value)").orDie *> LRUCache.put(key, value)
}
Scala
Copy
The first test is for asserting that trying to create an LRUCacheSTM
with a non-positive capacity would result in a failure.
The second test is for asserting that the cache works as expected. For that we use the TestConsole
module provided by zio-test, for asserting that the expected messages are printed to the console.
I won’t go into more details about how zio-test works, but you can read about it on the ZIO documentation page.
Summary
In this article, we’ve seen a concrete example of writing concurrent data structures with ZIO: a concurrent LRU cache, and because ZIO is based on functional programming principles, such as using pure functions and immutable values, it was really easy and painless to evolve our initial implementation — which didn’t support concurrency and was based on Ref
– to a fully concurrent version, based on TRef
from ZIO STM. This is without all the complicated stuff that comes when using lower-level concurrency structures such as Locks, and with no deadlocks or race conditions at all.
In addition, this was just a very specific example of what you can do with ZIO STM for writing concurrent data structures, so there’s a lot more you can do with it in your own projects, in a totally async, non-blocking and thread-safe fashion. So make sure to give it a try!
AUTHOR: JORGE VASQUEZ
If you are intrested in more, visit Scalac BLOG.