DIY Reactive Model Store using RxJava

In the last few years on Android, we’ve seen an explosion of architectures based on the idea of a unidirectional data flow.

I was first exposed to this idea while working on an MVI-based app using RxJava. One of the key concepts in MVI is to cleanly manage changes to your application state. The Model Store pattern is key in achieving this.

Working with Immutable State

So, what does a Model Store do, exactly? Basically, it stores immutable state.

That’s a pretty abstract concept. To make this idea more concrete, let’s imagine we’re building an upvoting application. This app will keep track of upvotes from our users.

In Kotlin, our app-state could look something like this:

data class UpvoteModel(val hearts:Int, val thumbsUp:Int)

Each time a vote is recorded, we’ll want to update the stored state. Given that a data class is immutable, we use the copy() function to update our count:

val initialState = UpvoteModel(hearts=0, thumbsUp=0)
val newState = initialState.copy(thumbsUp=initialState.thumbsUp+1)


So the big idea here is to build newState from oldState. Let's formalize this with the Reducer interface:

// Reducer takes an old (immutable) state and builds a new state from it. interface Reducer<S> { fun reduce(oldState: S): S } 

Each reducer instance represents one “action” we received from the user. For our Android app, we have two buttons. We’ll need actions to increment ❤️ or 👍 by 1

class AddHeart :Reducer<UpvoteModel> {
override fun reduce(oldState: UpvoteModel) =
oldState.copy(hearts = oldState.hearts + 1)

class AddThumbsUp :Reducer<UpvoteModel> {
override fun reduce(oldState: UpvoteModel) =
oldState.copy(thumbsUp = oldState.thumbsUp + 1)

Model Store

Let put focus back on the Model Store. We’ll want our store to process Reducer instances on the input side. And we'll need some reactive stream on the output side. Using RxJava, the following interface works nicely:

interface ModelStore<S> {
fun process(reducer: Reducer<S>)
fun modelState(): Observable<S>

Let’s combine all these ideas together, and visualize them as a marble diagram:

How do we implement this? Let’s start with the process(r) function. The first thing we need to do is take that function call, and turn it into an Observable<Reducer>. The RxRelay library is ideal here.

val reducers = PublishRelay.create<Reducer<S>>()
fun process(reducer: Reducer<S>) = reducers.accept(reducer)

Once an Observer subscribes to a PublishRelay, it will emit all the following observed items to the subscriber. That said, we'll only be using this reducers:PublishRelay internally:

private val store = reducers
.scan(startingState) { oldState, reducer -> reducer.reduce(oldState) }

Single Source of Truth

We are missing one important piece of the puzzle. We need our model store to be a shared, single source of truth. To get the behaviour we’re looking for, we need to turn our store into a ConnectableObservable.

The replay(1) operator gives us this ConnectableObservable. This insures that subscribers will get the current state of our ModelStore on subscription. Then, we use connect() on initialization, priming our ModelStore immediately.

Here’s a generic RxModelStore<S> implementation:


Subscribing to a ModelStore

The last step here is to subscribe to a model store. If we go back to our upvoting app example, your subscriptions will look something like this:

store.modelState().subscribe { model ->
textView.text =

Key Benefits

By keeping your state immutable, and processing changes from within a single thread, you can make sure race conditions won’t become a concern.

You can apply this idea anywhere. Be it MVVM, MVI, MVP, if you deal with shared Model State, the Model Store pattern can become handy.

Interested in learning more? Check out this full Upvote MVI Sample App on Github, from my Simple MVI Architecture talk at Droicon Boston 2019.

Originally published at on May 5, 2019.