Let's build an Eventbus using Kotlin

Vignesh Ramakrishnan
MindOrks
Published in
3 min readJul 21, 2019

EventBus, as we all know, simplifies communication between Activities, Fragments and other different parts of your application, achieved with less, clear code. We have seen Eventbus used by a lot of applications. Here we will see how Kotlin and RxKotlin give us slightly a better edge to build a clean EventBus helping us achieve the exact same scenario. We will also learn some kotlin along the way :).

Let us first create a contract here, i.e An interface to define our idea. We have 3 functions here.

  • postEvent()

Self-explanatory. Use it whenever we wish to emit an event.

  • observeEvents()

This guy is the one at the center. We return Flowable here and it will emit for every post event call. One thing to note is that we have not assigned any scheduler to this for now. We will discuss this in the next point.

  • observeEventsOnUi()

Emit the event on Ui thread. Based on this idea, all this function would do is call the observeEvents() and make it emit on the Ui thread.

Clients can make use of observeEvents() function and specify their own scheduler based on their needs.

This contract also states that the subscriber is responsible for backpressure handling.

Before jumping into the implementation, lets bridge the subscriber and the contract. We will define two extension functions for EventBus.

  • EventBus.observeEvent()
  • EventBus.observeEventOnUi()

These functions are observers of a single event. The edge that I was talking about comes into the picture now,

  • ofType() method of a Flowable helps us filter out events emitted by a flowable based on the type that we specify while subscribing to an event. This removes any check that we might have to do on our end and makes our code readable and clean at our subscriber end. RxKotlin has the generic type reified in the ofType method. In RxJava this method expects a Class parameter. We will discuss reified next.

The extension functions are “reified”. Reify means make something real. When working with generics in kotlin, if a function needs to use the generic type(T), it has to be marked as reified so that its made “real”. Real here means that the generic type T has to be available at runtime for the function to use it. In our example, the ofType() method of RxKotlin directly uses the type. Hence T is marked as reified in the extension functions.

Any function with a reified type parameter should be marked as inline. Inline as you might probably know expands the call site with the function in order to reduce the call overhead. This is required as reified generics use this trick to inline the function call and replace the generic parameter with the class you specify in the inlined code.

Now let's implement the Eventbus.

The implementation is similar to how it is done using a PublishSubject. Following are the differences,

  • We use a PublishProcessor here. PublishProcessor extends a flowable, because of which it can be backpressure aware. It is required as events are going to be emitted anywhere, at any frequency in our app. We also assign the responsibility of assigning backpressure handling to the subscriber as already mentioned in the contract.
  • The implementation of observeEvents() function has publishProcessor.serialize(). It is possible that the onNext function of the publishProcessor might be called in different places from different threads concurrently. This function helps us execute the subscribers sequentially, in other words, makes the publisher “well-behaved”.
  • We assign the Ui scheduler to the flowable emitted by observeEvents() in the observeEventsOnUi function.

The events can be observed as follows.

Thanks Arun for the help!

--

--