Understanding the Power of the Async Observer Pattern

Saurabh Kishore Tiwari
The Thought Mill
7 min readMay 15, 2023

--

In the realm of modern software development, writing scalable and responsive code is of paramount importance. As our applications grow in complexity, it becomes crucial to handle asynchronous events and data streams efficiently. This is where the Async Observer pattern comes into play.

The Async Observer pattern, often referred to as the Reactive programming model, provides a powerful solution for handling asynchronous data streams, event-driven systems, and reactive user interfaces. By embracing this pattern, developers can build highly responsive and efficient applications in Scala.

We have already covered the Observer Pattern in Scala. So in this article, we’ll strictly focus on how to implement Observer pattern asynchronously.

To achieve async feature, we will be taking the help of Actor model.

What is Actor Model?

The Actor Model is a mathematical and conceptual framework for designing and implementing concurrent and distributed systems. It provides a high-level abstraction for organizing and coordinating concurrent computations through the use of actors.

In the Actor Model, an actor represents a fundamental unit of computation. Actors are independent entities that encapsulate state, behavior, and communication. They interact with each other exclusively through asynchronous message passing, allowing for concurrency and distribution without the need for locks or shared memory.

Some features of Actor model

  1. Encapsulation: Each actor maintains its internal state, which can only be accessed and modified by the actor itself. This isolation ensures that actors can operate independently and avoid data races or conflicts.
  2. Asynchronous Message Passing: Actors communicate by sending and receiving messages asynchronously. Messages are the primary means of interaction between actors and can include requests, commands, or notifications. Actors process incoming messages one at a time, preserving message order within each actor.
  3. Addressability: Actors have unique addresses or references, which enable other actors to send messages to them. The address provides a way to identify and interact with a specific actor, allowing for loose coupling and location transparency.
  4. Concurrency and Scalability: Actors can execute concurrently, and multiple actors can operate simultaneously without interfering with each other. This inherent concurrency allows for the efficient utilization of resources and enables scalable systems.

The Actor Model provides a natural way to reason about and manage concurrency, as each actor operates independently and communicates with others through message passing. It promotes a system design that is fault-tolerant, scalable, and easy to reason about, making it particularly well-suited for building distributed and parallel systems.

Languages like Erlang, Scala with Akka, and Elixir have incorporated the Actor Model as a fundamental paradigm, allowing developers to embrace its principles and build highly concurrent and resilient applications. By leveraging the Actor Model, developers can tackle complex concurrent and distributed challenges with greater ease and confidence.

Let’s have a look at the Code

So, Observer pattern has 2 main components— Subject and Observer.

Now, Actors communicate with other actors using objects. In our case, we have a stock company as the subject. Atomic operations which it can perform are, RegisterUser, Unregister, RegisterStock, UpdateStockPrice and NotifyUsers.

We will maintain a Set of all actors which register at the stock company and a map of all the stocks which are registered.

Subject.scala

case class Stock(id: Int, name: String, price: Double)

sealed trait Subject:
def registerNewUser(user: ActorRef): Unit
def notifyUsers(): Unit
def deleteUser(user: ActorRef): Unit
def registerStocks(stock: Stock): Unit
def updateStockPrices(stock: Stock, amount: Double): Unit


object StockCompany extends Actor, Subject, ActorLogging :
case class RegisterUser(user: ActorRef)

case class Unregister(user: ActorRef)

case class RegisterStock(stock: Stock)

case class UpdateStockPrice(stock: Stock, amount: Double)

private case object NotifyUser

private val subs: mutable.HashSet[ActorRef] = mutable.HashSet.empty[ActorRef]
private val stocks: mutable.Map[Int, Stock] = mutable.Map.empty[Int, Stock]

override def receive: Receive =
case RegisterUser(user) => registerNewUser(user)
case Unregister(user) => deleteUser(user)
case RegisterStock(stock) => registerStocks(stock)
case UpdateStockPrice(stock, amount) => updateStockPrices(stock, amount)
case NotifyUser => notifyUsers()

override def registerNewUser(user: ActorRef): Unit =
subs += user
user ! stocks
log.info(s"Registered User $user")

override def notifyUsers(): Unit =
log.info("Notifying all users.")
subs.foreach(_ ! stocks)
log.info(s"All users notified of the stocks")

override def deleteUser(user: ActorRef): Unit =
subs -= user
log.info(s"Deleted User $user")

override def registerStocks(stock: Stock): Unit = {
stocks += (stock.id -> stock)
self ! NotifyUser
log.info(s"Registered a new $stock")
}

override def updateStockPrices(stock: Stock, amount: Double): Unit =
stocks += (stock.id -> Stock(stock.id, stock.name, amount))
self ! NotifyUser
log.info(s"Updated stock value of $stock to $amount")

Observer.scala

As an observer, we have users which will register with the stock company. In Event Driven Architecture, message delivery is not guaranteed and we generally don’t worry about feedback unless they are some sort of financial transaction or the message has to be delivered at least once. In our case, we don’t have any of the above requirement. So, the user actor is just processing the data it received and not sending back any feedback.

sealed trait Observer {
def publishUpdate(stocks: mutable.Map[Int, Stock]): Unit
}

class User(name: String) extends Actor, Observer, ActorLogging{
override def receive: Receive = {
case stocks: mutable.Map[Int, Stock] => publishUpdate(stocks)
}

override def publishUpdate(stocks: mutable.Map[Int, Stock]): Unit = {
log.info(name + " " + stocks.values.map(x => x.name + " " + x.price))
}
}

MainRunner.scala

Akka provides the capability of creating an actor system at the system level. It’s kind of like the factory method. We provide the details and get back the ActorRef required without worrying about what’s happening in the background.

Using the actor system, we can create multiple child actors. Like in the example below, we have created actors for the stock company and users.

I’ve used Thread.sleep to see the differences in the output. You can remove them in case you want to experience the concurrent execution.

object MainRunner extends App{
val system: ActorSystem = ActorSystem("MainRunner")
private val stockCompany = system.actorOf(Props(StockCompany), "StockCompany")
val user1 = system.actorOf(Props(new User("user 1")), "User1")
val user2 = system.actorOf(Props(new User("user 2")), "User2")
val user3 = system.actorOf(Props(new User("user 3")), "User3")

val stock1 = Stock(123, "ABC", 23.45)
val stock2 = Stock(456, "CDE", 67.89)

stockCompany ! RegisterStock(stock1)
stockCompany ! RegisterStock(stock2)
Thread.sleep(1000)
stockCompany ! RegisterUser(user1)
Thread.sleep(1000)
stockCompany ! RegisterUser(user2)
stockCompany ! UpdateStockPrice(stock2, 89.56)
Thread.sleep(1000)
stockCompany ! UpdateStockPrice(stock1, 32.54)
stockCompany ! RegisterUser(user3)
Thread.sleep(1000)
stockCompany ! UpdateStockPrice(stock2, 101.45)
Thread.sleep(1000)
stockCompany ! Unregister(user3)
Thread.sleep(1000)
stockCompany ! UpdateStockPrice(stock1, 35.45)

system.terminate()
}

Output

[INFO] [05/15/2023 22:38:32.154] [MainRunner-akka.actor.default-dispatcher-5] [akka://MainRunner/user/StockCompany] Registered a new Stock(123,ABC,23.45)
[INFO] [05/15/2023 22:38:32.154] [MainRunner-akka.actor.default-dispatcher-5] [akka://MainRunner/user/StockCompany] Registered a new Stock(456,CDE,67.89)
[INFO] [05/15/2023 22:38:32.155] [MainRunner-akka.actor.default-dispatcher-5] [akka://MainRunner/user/StockCompany] Notifying all users.
[INFO] [05/15/2023 22:38:32.159] [MainRunner-akka.actor.default-dispatcher-5] [akka://MainRunner/user/StockCompany] All users notified of the stocks
[INFO] [05/15/2023 22:38:32.159] [MainRunner-akka.actor.default-dispatcher-5] [akka://MainRunner/user/StockCompany] Notifying all users.
[INFO] [05/15/2023 22:38:32.159] [MainRunner-akka.actor.default-dispatcher-5] [akka://MainRunner/user/StockCompany] All users notified of the stocks
[INFO] [05/15/2023 22:38:33.153] [MainRunner-akka.actor.default-dispatcher-5] [akka://MainRunner/user/StockCompany] Registered User Actor[akka://MainRunner/user/User1#-176782719]
[INFO] [05/15/2023 22:38:33.154] [MainRunner-akka.actor.default-dispatcher-7] [akka://MainRunner/user/User1] user 1 List(CDE 67.89, ABC 23.45)
[INFO] [05/15/2023 22:38:34.156] [MainRunner-akka.actor.default-dispatcher-7] [akka://MainRunner/user/StockCompany] Registered User Actor[akka://MainRunner/user/User2#-1638680048]
[INFO] [05/15/2023 22:38:34.157] [MainRunner-akka.actor.default-dispatcher-5] [akka://MainRunner/user/User2] user 2 List(CDE 67.89, ABC 23.45)
[INFO] [05/15/2023 22:38:34.157] [MainRunner-akka.actor.default-dispatcher-5] [akka://MainRunner/user/StockCompany] Updated stock value of Stock(456,CDE,67.89) to 89.56
[INFO] [05/15/2023 22:38:34.158] [MainRunner-akka.actor.default-dispatcher-5] [akka://MainRunner/user/StockCompany] Notifying all users.
[INFO] [05/15/2023 22:38:34.158] [MainRunner-akka.actor.default-dispatcher-5] [akka://MainRunner/user/StockCompany] All users notified of the stocks
[INFO] [05/15/2023 22:38:34.158] [MainRunner-akka.actor.default-dispatcher-5] [akka://MainRunner/user/User1] user 1 List(CDE 89.56, ABC 23.45)
[INFO] [05/15/2023 22:38:34.158] [MainRunner-akka.actor.default-dispatcher-6] [akka://MainRunner/user/User2] user 2 List(CDE 89.56, ABC 23.45)
[INFO] [05/15/2023 22:38:35.160] [MainRunner-akka.actor.default-dispatcher-7] [akka://MainRunner/user/StockCompany] Updated stock value of Stock(123,ABC,23.45) to 32.54
[INFO] [05/15/2023 22:38:35.161] [MainRunner-akka.actor.default-dispatcher-7] [akka://MainRunner/user/StockCompany] Registered User Actor[akka://MainRunner/user/User3#-455573140]
[INFO] [05/15/2023 22:38:35.161] [MainRunner-akka.actor.default-dispatcher-7] [akka://MainRunner/user/StockCompany] Notifying all users.
[INFO] [05/15/2023 22:38:35.161] [MainRunner-akka.actor.default-dispatcher-7] [akka://MainRunner/user/StockCompany] All users notified of the stocks
[INFO] [05/15/2023 22:38:35.161] [MainRunner-akka.actor.default-dispatcher-9] [akka://MainRunner/user/User3] user 3 List(CDE 89.56, ABC 32.54)
[INFO] [05/15/2023 22:38:35.161] [MainRunner-akka.actor.default-dispatcher-8] [akka://MainRunner/user/User1] user 1 List(CDE 89.56, ABC 32.54)
[INFO] [05/15/2023 22:38:35.161] [MainRunner-akka.actor.default-dispatcher-9] [akka://MainRunner/user/User3] user 3 List(CDE 89.56, ABC 32.54)
[INFO] [05/15/2023 22:38:35.161] [MainRunner-akka.actor.default-dispatcher-5] [akka://MainRunner/user/User2] user 2 List(CDE 89.56, ABC 32.54)
[INFO] [05/15/2023 22:38:36.165] [MainRunner-akka.actor.default-dispatcher-6] [akka://MainRunner/user/StockCompany] Updated stock value of Stock(456,CDE,67.89) to 101.45
[INFO] [05/15/2023 22:38:36.165] [MainRunner-akka.actor.default-dispatcher-6] [akka://MainRunner/user/StockCompany] Notifying all users.
[INFO] [05/15/2023 22:38:36.165] [MainRunner-akka.actor.default-dispatcher-6] [akka://MainRunner/user/StockCompany] All users notified of the stocks
[INFO] [05/15/2023 22:38:36.165] [MainRunner-akka.actor.default-dispatcher-9] [akka://MainRunner/user/User3] user 3 List(CDE 101.45, ABC 32.54)
[INFO] [05/15/2023 22:38:36.165] [MainRunner-akka.actor.default-dispatcher-10] [akka://MainRunner/user/User2] user 2 List(CDE 101.45, ABC 32.54)
[INFO] [05/15/2023 22:38:36.165] [MainRunner-akka.actor.default-dispatcher-8] [akka://MainRunner/user/User1] user 1 List(CDE 101.45, ABC 32.54)
[INFO] [05/15/2023 22:38:37.166] [MainRunner-akka.actor.default-dispatcher-5] [akka://MainRunner/user/StockCompany] Deleted User Actor[akka://MainRunner/user/User3#-455573140]
[INFO] [05/15/2023 22:38:38.168] [MainRunner-akka.actor.default-dispatcher-8] [akka://MainRunner/user/StockCompany] Updated stock value of Stock(123,ABC,23.45) to 35.45
[INFO] [05/15/2023 22:38:38.168] [MainRunner-akka.actor.default-dispatcher-8] [akka://MainRunner/user/StockCompany] Notifying all users.
[INFO] [05/15/2023 22:38:38.168] [MainRunner-akka.actor.default-dispatcher-8] [akka://MainRunner/user/StockCompany] All users notified of the stocks
[INFO] [05/15/2023 22:38:38.169] [MainRunner-akka.actor.default-dispatcher-10] [akka://MainRunner/user/User2] user 2 List(CDE 101.45, ABC 35.45)
[INFO] [05/15/2023 22:38:38.169] [MainRunner-akka.actor.default-dispatcher-9] [akka://MainRunner/user/User1] user 1 List(CDE 101.45, ABC 35.45)
[INFO] [05/15/2023 22:38:38.191] [main] [CoordinatedShutdown(akka://MainRunner)] Running CoordinatedShutdown with reason [ActorSystemTerminateReason]

Every Actor has a message queue and it’s called a mailbox. We can be absolutely sure that at individual actor level, all messages will be executed sequentially.

Level Up Coding

Thanks for being a part of our community! Before you go:

🚀👉 Join the Level Up talent collective and find an amazing job

--

--