Reactive Programming with Kotlin for Android

Jose Angel Zamora
BestSecret Tech
Published in
9 min readMay 27, 2019

Reactive programming has become almost a standard in the development of Android apps, as Kotlin already is. Combining the reactive paradigm with Kotlin development can help developers to keep a clean architecture in their apps in a easy way. But what is reactive programming? Which are the benefits? Which are the basics? Why are so useful on Android development?

Understanding the problem

When a new development paradigm appears it is normally because we have a problem we struggle and we would like to fix. Then, the new paradigm comes with a good solution.

Imagine the following code:

This code allows you to check if a number is even or odd. Actually, the program runs twice for 4 & 9 values, but the printed message will be:

The number is Even

The number is Even

As you can notice, the isEven variable is not updated when number is updated to 9.

The ‘reactive programming’ definition

In order to solve the previous case, we should be able to distinguish between two types of entities:

  • Emitter (also Subject, Observable): They are changing their values without taking care. In our example, it would be the number variable.
  • Observer: They should react to the changes of the emitter. In our case, we would love to allow isEven variable to react to the number changes, but it doesn’t.

We can set a one-to-one mechanism, so that the observer might react to the emitter changes. Normally, a callback is used in this cases. That mechanism is called a message-driven mechanism, where our emitter emits a message and that message has even the receipt.

As soon as we start thinking in the long term, this solution is not applicable because our system won’t be scalable. Why? Imagine that we start having more and more observers to the same emitter (e.g. we can have a sensor receiving data and more and more modules are consuming that value). On that case, it is not maintainable so many callbacks.

Then, we should start moving from message-driven systems to event-driven systems. The main difference is that the event has no data about the receipt: the event is available to anyone interested in. That brings into a design pattern called The Observable Pattern:

The Observable Pattern: one-to-many dependency between objects so that when one object change state, all of its dependents are notified and updated automatically

Reactive programming gets the concept from the Observable pattern and goes one step forward by allowing developers to follow an asynchronous programming paradigm that revolves around data streams and the propagation of change.

The Rx library

If we think of applying the reactive paradigm in our development process, the first library we should Rx, which supports sequences of data/events and adds operators that allow you to compose sequences together declaratively while abstracting away concerns about things like low-level threading, synchronization, thread-safety and concurrent data structures.

Rx scheme

The key concept is that everything is a stream of data. In order to produce and consume the stream of data, we have three different entities:

  • Observable: It’s the emitter. It’s the responsible to produce the data. Observable starts emitting items that it has in it. It also can notify about that data channel was completed or failed.
  • Observer: It’s the subscriber. Observer subscribes to Observable. It’s only responsible to consume the data when interested in, so it reacts to whatever item Observable emits.
  • Disposable: This entity represents the subscription itself, so when a Observer is subscribed to any Observable, we capture this entity.

The marbles schemes

The best way to understand how the reactive paradigm works is by using the marble schema. This is a generic one:

A marble schema: Best way to understand different reactive operators

Consider that the top line is a timeline where an Observable is emitting values. Before consuming them, we have the change to modify them as we demand (white box, in the example a flip operator), so that the Observer will receive a transformed version of the data. This is very useful in many scenarios. For example, we can map the data before consuming it, filter them…

Observable types

When using Rx, we give all the importance to the stream of data. For that reason, it is quite important to understand the different types of Observable we might find.

Based on the number of emissions

We can classify them thinking on how many items they will emit:

  • Completable: It just emits a completable action or an error, but no item.
  • Single: It emits only one item before completing, or an error.
  • Rest: If it is not a Completable or a Single, it will be inside this group. All the Observables which can emits more than once will be part of this group, even if they just emit once (but they have the chance to emit more).

Based on its behaviour

There is another interesting classification based on the behaviour they follow:

  • Cold: It emits in a passive way. All the subscribed observers will receive the exact set of data regardless of when they subscribe. These are some examples:
Just will emit and finish a emission. It is a cold observable.
From allows to create an observable from a list and will be also cold.
Interval is just another example of cold observable.
  • Hot: It emits in a more dynamic way, so each observer will receive different data based on the moment they are subscribing. The best way to implement hot observables is by using Subjects (which is a combination between Observables & Observer). These are the most important ones:
PublishSubject allows an observer to start receiving since they subscribe, but ignoring previous emissions
BehaviourSubject allows an observer to start receiving since they subscribe, but also receiving last emission before they subscribe as the first one
AsyncSubject emits the last one, and only the last one.
ReplaySubject is a subject which allows any observer to receive a replication of the emitted data.

The backpressure

Have you already considered the potencial problems with the reactive programming? Of course, there is no a perfect paradigm. Let’s have a look at this case:

Example of a backpressured case

Our Observable is emitting so fast so that the observer has no chance to process all the data before receiving new emissions. Then, the observerIt can happen for example when a sensor is emitting too many values and the data management requires extra time.

In order to avoid this case we need to establish a feedback channel where the observer (consumer) can notify the producer that it is ready to continue receiving data. Rx provides default strategies, such as:

The Buffer strategy keeps in a buffer the emitted data, so that the observer will be able to continue without loosing them. A good example is data when reading a file.
The Drop strategy allows to discard the emissions the observer wasn’t able to receive. A good example is sensor data management
The Latest strategy is similar to the Drop one, but observer will receive last one to keep updated. It is valuable as an improvement from the Drop one when using it for a sensor case.

Rx Operators

The operators are the operations which allow us to change the emissions before consuming them. This is very useful to particularise the Observable for our case.

In this link you can find all the operators classified by category. Some of the most important are:

  • Creating Observables: Operators that originate new Observables. Some examples are the ones that were already presented, such us just.
  • Transforming Observables: Operators that transform items that are emitted by an Observable.
Map is one of the most interesting operators as it allows us to convert an item into another type
Flatmap transforms the items emitted by an Observable into Observables, then flatten the emissions from those into a single Observable
  • Filtering Observables: Operators that selectively emit items from a source Observable.
Debounce only emits an item from an Observable if a particular timespan has passed without it emitting another item
Filter emits only those items from an Observable that pass a predicate test
  • Combining observables: Operators that work with multiple source Observables to create a single Observable.
Merge combines multiple Observables into one by merging their emissions
Zip combines the emissions of multiple Observables together via a specified function and emit single items for each combination based on the results of this function

There are a lot of more categories, such us conditional operators, boolean operators, mathematical operators… and many more.

Error handling

Managing the errors in reactive is as simple as considering as a new emitted item, but with an exception. Actually, there are some error operators to take care of them, like:

Catch recovesr from an onError notification by continuing the sequence without error

Rx Scheduling

Next point to take into consideration is how to manage threading. Actually, we should consider where emissions should happen (e.g. data coming from a network interface) but also where emissions should occur (e.g. we might want to update the UI or save data into the storage).

We have two methods for taking care of it:

  • subscribeOn → This method sets the thread where the observable is emitting
  • observeOn → This method sets the thread where the observer is consuming data

Best way to understand it is having a look into an example:

We can see how threads are set and how different observers can subscribe from different threads considering what they need to do with the data

We have a Schedulers class which provides us different threads:

Possible threads from Schedulers

There is a specific library called RxAndroid for Android which also supplies access to Android threads:

Thanks to this library we have access to the Android main thread to be able to change the UI from an observer:

Why Rx Kotlin instead of RxJava?

If we have a look at the library, we can notice that it is available for Java & Kotlin (even for other languages, like Swfit). But which is the reason to consider Kotlin as the best language to use?

Google announcement

Firstly, Google just announced that they are becoming increasingly Kotlin-first for new Android features. Actually, the whole Android community thinks on Kotlin rather than Java.

Secondly, Kotlin is a functional programming. As soon as we understand the functional programming, we will notice how reactive programming and natural Kotlin code are quite similar. In fact, best way is checking this example:

As you can see, managing lists and observables in Kotlin are exactly the same. And we can enjoy even the function extensions, such as toObservable (it converts a list into an observable).

Rx in Android

There are more libraries to get benefits from reactive in a easy way. One example is Retrofit2-rxjava2-adapter:

This library allows to create network interfaces like:

Example of Retrofit + Rx

But that one is not the only one. We have more like LiveData from Google, which allow us to use lifecycle-aware observables, so that lifecycle can handle the subscriptions automatically:

LiveData is a perfect candidate to connect our ViewModels to the UI

By using LiveData we can avoid memory leaks when we are not managing well the process of unsubscribing. Anyway, LiveData is not replacing Rx as we don’t want to have at all levels in our architecture a dependency to Android.

Summing up

Reactive programming is used by a lot of developers because it allow us to use an architecture which can react at any level in a scalable way. Even Google is betting for reactive paradigm with their LiveData. There are a lot of pros like:

Pros

  • Error handling is similar to normal events
  • Async operations are managed in a easier way
  • Code is easily testable with TestObservers & TestSubscribers
  • We can avoid memory issues produced by bad callbacks management
  • It is quite similar to functional programming

But we cannot forget that there are always cons:

Cons

  • The learning curve might be slower at the beginning
  • The flows might be more complex and difficult to debug, at least for new developers
  • We can still have memory leaks if we don’t manage well the unsubscriptions or we change variables from different contexts

Finally, did you remember the code at the beginning explaining the even case? Let’s have a look at an easy fix for the problem with reactive:

Now, the result will be:

The number is Even

The number is Odd

Enjoy when developing with reactive!

--

--