Wire Signals

Maciek Gorywoda
The Startup
Published in
15 min readJan 5, 2021

Yet Another Event Streams Library

Introduction

I’m from Poland but now I live in Berlin and work for Wire, an end-to-end encrypted messenger. And I work there in the Android team even though I write my code in Scala. About two-thirds of Wire Android code is written in Scala making it unique among Android apps most of them being implemented in Java and/or Kotlin. Wire is a messenger and as such it must be very responsive: it has to quickly react to any events coming from the backend, as well as from the user, and from the Android itself. So, during the last four years, the Android team developed its own implementation of event streams and so-called “signals” a build-up on the top of event streams.

This text is about them about the theory, practice, and some corner cases you may encounter if you want to code your own implementation of it. But it’s also a bit about the wire-signals open-source library, which we developed, and that library can do it for you. There are of course other, bigger solutions to the same problem, like Akka, Monix, JavaRx, or maybe, in case of Android, LiveData or Kotlin Flow. Personally, I like Akka very much. But I also think that it’s good to start small and then, if needed, you can switch to something bigger or stay with this smaller solution if it’s enough to you.

Theory

Let’s start with a bit of theory. Basically, we are talking here about the Observer pattern. You can read all about it in this very old book Design Patterns: Elements of Reusable Object-Oriented Softwareby a so-called Gand of Four (Erich Gamma, Richard Helm, Ralph Johnson, and John Vlissides). The idea behind is to solve two problems:

  1. How can a one-to-many dependency between objects be defined without making the objects tightly coupled?
  2. How can an object notify an open-ended number of other objects? [link]

“Open-ended” meaning that we don’t know how many of them are there and new ones can pop up at any moment.

The main object here is called a producer or a subject. It’s the source of events. The other objects are observers, also known as listeners or subscribers. The terminology is a bit liberal. Some people may even argue that there are differences between those terms, but I think the main reason behind it is that the pattern is with us for a long time and it was implemented in various ways. Here I will call it simply an event stream and subscribers. We will talk about producers too, briefly, in a minute.

The event stream can produce or better to say “broadcast” an event at any moment. If there are no subscribers present the event will be wasted. We may say that the event stream does not have an internal state. It does not store the event for future use it just broadcasts it and forgets about it, but that wouldn’t be entirely true. The event stream needs to store the set of references to the subscribers. The set is empty at first, but then any object in the app which fulfills the criteria of being a subscriber can subscribe to it. Criteria being that it can receive an event of the given type. The subscriber is added to the set and from now on every time the event stream broadcasts a new event, the subscriber will receive it.

There can be of course more than one subscriber. The event will be then sent to all of them. To finish receiving events the subscriber needs to unsubscribe. The fact that it has to be explicit that the subscriber unsubscribes is a bit of a drawback. In a complex application, the programmer may forget to call it when the subscriber is destroyed or simply stops being needed. In JVM this may lead to a memory leak. The garbage collector will not be able to collect the subscriber because there will still be a now-defunct reference to it in the event stream. We can solve this problem with some sort of automatic unsubscription. We can for example model subscriptions as case classes holding references to both the subscriber and the event stream, and use them to unsubscribe in some sort of onStop methods. Or we can use event contexts implicit object which oversee subscriptions. You can look to the wire-signals documentation for more details.

But where the events come from? In the original Observer pattern, we assume the event stream is also the producer of events. It’s even there in the book that we solve a problem of one-to-many dependency. We have one producer that somehow creates the data and sends it away. But producing data and sending are two operations and we can decouple them. So the producer becomes whatever part of the app that can produce the data and then give it to the event stream, and the event stream becomes this object responsible for handling subscribers and broadcasting the events to them. And since now the producer is separate from the event stream, nothing is preventing us from having more than one producer. The connection becomes many-to-many, with the event stream sitting in the middle. This way producers and subscribers don’t have to know about each other. They only have to know about the event stream that operates on data of the given type. But, in contrast to subscribers, producers don’t even have to register in the event stream.

Practice

Let’s say our producer here is an OkHttpClient. OkHttp is an HTTP client library very often used on Android. Let’s say we have an instance of it and we open a web socket that is, a connection to the backend and we wait for some data from it. So we don’t actually produce anything, it’s more like another link in a chain, but from the point of view of our Android app this is how our events are produced.

val stream = EventStream[String]()
okHttpClient.newWebSocket(request, new WebSocketListener {
override def onMessage(webSocket: OkWebSocket, text: String): Unit = {
debug(l"WebSocket received a text message.")
stream ! text
})

And the subscriber here can then be modelled as a trait and be implemented as such.

trait Subscriber[E] {
def onEvent(event: E): Unit
}
class EventStream[E] {
private var subscribers: Set[Subscriber[E]]
def subscribe(subscriber: Subscriber[E]): Unit = subscribers += subscriber
def publish(event: E): Unit = subscribers.foreach(_.onEvent(event))
}

But forcing every class to extend Subscriber to enable it to receive events is tedious. It’s verbose. We’re too lazy for that. Fortunately, we can turn this subscribe method above a foreach that will take a function.

class EventStream[E] {
private class EventSubscriber[E](f: E => Unit) extends Subscriber[E] {
override def onEvent(event: E): Unit = f(event)
}
...
def foreach(f: E => Unit): Unit = subscribers += new EventSubscriber(f)
...
@inline def !(event: E): Unit = publish(event)
}
stream.foreach { msg => … }

Oh, and I took this opportunity to also introduce the exclamation mark operator as a shorthand for publish.

For example, this is now how we can display the event. We can use this foreach to subscribe to the stream and for example we can use it this way to display the text of the message in the message view.

val messageView: View = …
stream.foreach { str => messageView.setText(str) }

Okay. At this point you might have had a thought that this Subscriber trait here it’s a one method trait, so why don’t we just turn it into a function itself? Well, yes, I’m usually all for it. If our trait turns out to have only one method, I think it means we should really consider turning it into a function. But there’s a reason why I didn’t do it. And we’ll talk about it in a moment.

Transformations

But first, transformations. If broadcasting events was all an event stream could do, it wouldn’t really be that interesting. Often subscribers either want to transform the event somehow before using it and many subscribers may want to transform the event in the same way or they are interested only in a certain subset of events coming from the event stream. Or both. Or the logic can be even more complicated. For example, the event can be interesting to the subscriber only if a certain event from another event source was received before it.

To address those needs event streams implementations come with a long list of methods such as map, flatMap, filter, collect, zip, and so on. With them, you can move the logic which would otherwise have to be implemented in the subscriber, to the event stream. In fact, each of them creates a new event stream that has the original one as its producer. The map creates a new event stream that publishes the transformed events, and the filter creates an event stream that publishes the event from the parent only if a certain condition is fulfilled.

class EventStream[E] {
def map[V](f: E => V): EventStream[V]
def flatMap[V](f: E => EventStream[V]): EventStream[V]
def filter(f: E => Boolean): EventStream[E]
def collect[V](pf: PartialFunction[E, V]): EventStream[V]
def zip(stream: EventStream[E]): EventStream[E]
...
}

You may notice how all those methods are basically the same as in the case of standard collections. We can treat an event stream as a collection a special kind of a collection, but still. You can compare it to a relationship between an Option and a Future. A collection that has only one or zero elements. The same relationship exists between standard sequence and an event stream, where in case of event stream we have undefined number of elements but we don’t have access to them immediately but only after some time.

And since we can treat an event stream as a collection, especially that we can use flatMap on it, allows us use for-comprehensions:

val stream1 = EventStream[String]()
val stream2 = EventStream[Int]()
val resultStream = for {
str <- stream1
i <- stream2
} yield
s"$str: $i" // resultStream is of the type EventStream[String]

which is equivalent to:

val resultStream = stream1.flatMap { str => stream2.map(i => s”$str: $i”) }

It might not look like it, because the example is simple, but the ability to use for-comprehensions for event streams is a major boost for the readability of our code. Imagine a whole long list of consecutive transformations of event streams, each based on some of the ones executed above, but also on other data. And you can just read it here, just like this: one line, one transformation after another.

Threads

A moment ago I skipped over one important issue. I said that a subscriber waits for an event coming from the event stream. But waiting means something different depending on if we talk about working with only one thread, or with two or more. Consider this:

def main(): Unit = {
val stream = EventStream[String]() // 1
stream.foreach(println) // 2
stream ! “Hello, world!” // 3
}

If this code works on only one thread, we will create a stream in the first line, subscribe to it in the second, and then just after publishing “Hello, world!” in the third line the control will come back to the event stream which will call the println function, the println function will print out the string, and only then the main method will finish. But… but that’s not how modern programs work.

In Android, for example, it’s customary to work with at least two threads belonging to separate execution contexts: UI, and Background. The UI thread should be used only to display and refresh things on the screen. If our code does not touch the UI directly, it should work on the Background thread. So if we have a list of items we want to display we do it on the UI thread, but to retrieve those items from the storage we should use the Background thread…

val storage: MyStorage[Item] = …
val adapter = new MyItemsAdapter[Item](
this,
android.R.layout.simple_list_item_1,
storage.allItems
)
val listView: ListView = findViewById(R.id.listview)
listView.setAdapter(adapter)
storage.onChanged.foreach { newItems =>
adapter.updateItems(newItems)
}

storage.updateItems(...)

You may already see the problem. If we don’t have a way to differentiate between UI and Background then when an update happens to the storage, the foreach method of the onChanged event stream, and consequently updateItems on the adapter will be called on the same thread. Fortunately, event streams are exactly the tool we need to jump from one thread to another with little ceremony. We use the foreach method to subscribe to the event stream. That works because the foreach method is a bit special. It’s different from map and flatMap in one important detail.

def foreach(f: Event => Unit)(implicit executionContext: ExecutionContext): Unit

In standard collections foreach is called immediately, so no execution context is needed. But in event streams, we can differentiate between the execution context of the source and the execution context of the subscriber. We can implement the foreach so that it will take not only the function to execute but also a reference to the execution context in which the function should be executed in. When a new event comes, the event stream goes through the collection of those subscribers, and for each calls the function f… but not immediately. Instead the behaviour here is that it wraps the function f in the future and run it in the execution context of the subscriber.

trait Subscriber[E] {
def onEvent(event: E): Unit
}
class EventSubscriber[E](f: E => Unit, ec: ExecutionContext) extends Subscriber[E] {
override def onEvent(event: E): Unit = Future { f(event) }(ec)
}
class EventStream[E]{
def foreach(f: E => Unit)(implicit ec: ExecutionContext): Unit =
subscribers += new EventSubscriber(f, ec)
}

(again, not actual wire-signals code, but close enough)

Okay, let’s go through it step by step:
1. storage.updateItems makes some changes to the items in the storage on the Background thread,
2. then we have foreach on the storage.onChanged on the UI thread, which subscribes to the onChanged with the implicit execution context of the UI,
3. and then when the change happens on the Background thread instead of executing that adapter.updateItems on the same thread as the update happened on the Background we wrap the call to adapter.updateItems in a Future and we call it in the execution context of the UI as soon as possible.

Of course there is a special case when the execution context of the publisher is the same as the subscriber’s execution context. That possibility can be implemented and used as well, but I’d suggest that the one above should be the default one. That we wrap the call in the future and that we execute it at some point in the future. We shouldn’t care about when exactly, just us we don’t care about it if the execution contexts are different. So, in a way, this gives use better consistency in how the code behaves in that we don’t know the same details about how it behaves.

I’m aware that it does not really answer the question why the Subscriber trait cannot be a function instead. The trait is a bit more complicated than the trivial version I presented before, but anyway, I think it could be a function. If there’s one way I want wire-signals to stand out among other libraries like it, it’s that I want it to be minimalistic. And at this moment this part of the code seems to me to be too complicated. I want to work on it. And if you think you can help me in any way, please reach out.

Signals

And finally… signals! A signal is not a commonly agreed name like an event stream. It’s also not an implementation of a popular pattern, just as event streams are an implementation of the Observer pattern. But it is a pattern nonetheless and it’s a pattern that came up out of necessity at my company and developed quite naturally. We implemented it, tested it, used it in several distinct places in our code even though arguably all of them within the same Android application and finally we documented it, and moved the code to a separate open-source library. At more-or-less the same time Google came up with Android LiveData which in many ways is a very similar concept. But it’s more tied to Android, while signals are a platform-independent implementation… as long as that platform understands Scala.

So, what is a signal? In short, a signal is an event stream with a cache. It’s a very simple, small distinction, but it’s also a very powerful one.

Whereas an event stream holds no internal state except for the collection of subscribers and just passes on events it receives, a signal keeps the last value it received. A new subscriber function registered in an event stream will be called only when a new event is published. A new subscriber function registered in a signal will be called immediately (or as soon as possible in the given execution context); and it will be called with the current value of the signal (unless the signal is not initialized yet), and then again it will be called when the value changes. A signal is also able to compare a new value published in it with the old one. The new value will be passed on only if it is different. Thus, a signal can help us with optimizing performance on both ends:

  1. as a cache for values which otherwise would require expensive computations to produce them every time we need them,
  2. and also as a way to ensure that subscriber functions are called only when the value actually changes, but not when the result of the intermediate computation is the same as before.

You can think of it as of traffic lights. You’re the driver. You come to the crossroads and check the traffic lights. That means you subscribe to them but also you immediately get the current value and can act on it. If it’s green, you go. If it’s red or yellow, you stop and wait for a change. So, one advantage is that in some cases you don’t have to wait. If it was an event stream, you would have to stop each time and wait until a new event would tell you that it’s safe to go. In the case of a signal, if you see green light as the current value, you don’t have to stop. But there’s also another advantage in that if for any reason the lights compute their new value and that value is the same as before, no new event comes. In that case, for you it’s completely transparent that anything was computed. You wait for a different value, not the same value computed again.

This second example is quite unrealistic if we talk about traffic lights, but consider this:

val signal = Signal[Int]()
signal.foreach {
case n if n % 2 == 0 => complexComputations()
case _ =>
}
signal ! 1
signal ! 2 // complexComputations executed
signal ! 2
signal ! 4 // complexComputations executed
signal ! 6 // complexComputations executed
signal ! 7

This is a signal of ints, and let’s say we want to perform some complex computations only if the value of the signal is even. And let’s assume, for the sake of argument, that we actually don’t have to perform complex computations every single time, but only if the value of the signal becomes even initially, or if it changes from odd to even later on. As we can see here, now the computations are also performed when the value changes from one even number to another even number. That’s not optimal. But let’s say it’s not invalid. Let’s say the result is the same as before and we only waste some CPU time this way.

But we can do better.

signal.map(_ %2 == 0).foreach { 
case true => complexComputations()
case _ =>
}
signal ! 1 // false
signal ! 2 // true, complexComputations executed
signal ! 2 // true, nothing changes
signal ! 4 // true, nothing changes
signal ! 6 // true, nothing changes
signal ! 7 // false

We can change the signal. We can map it and then make a foreach only after the mapping. And that new mapped signal is of a boolean value. The foreach of that signal will be called only if the boolean changes becomes true, that is, if the original number value becomes even from being odd previously, or from an empty, uninitialized signal. But if the number changes from one even number to another even number, the value of the mapped signal just stays the same, and so, because no change happened, the foreach part is not executed at all. And the complex computations are not executed.

Thank you

I think with this I will finish this text. It’s already quite long. If you want to know more:

--

--

Maciek Gorywoda
The Startup

Scala. Rust. Bicycles. Trying to mix kickboxing with aikido. Trying to be a better person too. Similar results in both cases. 🇪🇺 🇵🇱