Functional Reactive Programming with RxJava (Part 2)

In the previous part of this article, we mentioned some concepts like declarative style, side effects or functional principles.

Now is time to talk about the Reactive style, and how we can mix Functional & Reactive programming together.

How to become Reactive?

Lets imagine we are working on the following User Story:

As a Customer,
     I want to filter a set of videos by a Genre
...

Also, we are already using a Repository which provides us a list of non-filtered videos. Let’s say the server didn’t implement that functionality yet, but we need it and we don’t want to be blocked because of that.

Once we receive a full list of the videos we could filter them easily by doing:

repository.getVideos()
.filter(viewModel -> viewModel.getGenre().equals(genre))
.subscribe(...);

However, a user tends to change quickly the filters, and those videos are also shown in different pages of our application. For these reasons, we decided to create a background service, which in some way, it caches the request and delivers always the same results, so that we can apply easily several filters to the same data structures anytime and anywhere.

Why this? Let’s imagine that, because of a copyright restriction from the customer requirements, we are required to not store any data within the device, but neither we want to repeatedly make a call to the server to get the same data again.

Then, how we can do this? Simply, by making use of an interesting Operator from RxJava.

Our background service would return a Shared Observable:

return repository.getVideos().share();

Remember that the share() function of RxJava is a wrapper from publish().refCount().

Then,

videoService.filterBy(Genre.ADVENTURE).subscribe(...);
// 5 minutes later
videoService.filterBy(Genre.DRAMA).subscribe(...);

Thread: RxIoScheduler-2 - Subscribed to retrieve Videos
Thread: main - Emit VideoViewModel{ id=2, ... genre=ADVENTURE }
Thread: main - Emit VideoViewModel{ id=3, ... genre=ADVENTURE }
Thread: main - Emit VideoViewModel{ id=9, ... genre=ADVENTURE }
Thread: main - - Fetching videos by ADVENTURE done!
Elapsed time: 5 minutes
Thread: RxIoScheduler-2 - Subscribed to retrieve Videos
Thread: main - Emit VideoViewModel{ id=6, ... genre=DRAMA }
Thread: main - - Fetching videos by DRAMA done!

So, by implementing a “filterBy” method on the service we’ve already created and by sharing its Observable, we can subscribe multiple times as well as filtering the same retrieved data, whenever the service and the Observable itself still alive.


Now, in the next Sprint we are required to apply a new feature:

As a UI Designer,
     I want to show which videos were pinned as favourite
...

This feature can be achieved easily following the reactive approach we’ve been developing previously. This means we don’t need to make big changes or modify our data flow in order to provide a new different data structure for our purpose.

videoService.filterBy(Genre.ADVENTURE)
// Our data flow can react on changes
.map(DataBase::checkAndUpdateForFavourite)

This is one of the advantages that a Reactive style of programming offer you in a day to day Agile development.

I would like to recommend a good article about Reactive Programming, written few years ago by Kevin Webber, you can find it in the following link:

This article describes very well the principles covered by the www.reactivemanifesto.org

Both Functional and Reactive: first-order and high-order functions

Do you remember this piece of code?

repository.getVideos()
.filter(viewModel -> viewModel.getGenre().equals(genre))
.subscribe(...);

We applied implicitly a filter expression, but what happens if we want to reuse the same instruction code in different places of the project?

As we know, this filter only applies to any object that matches a Genre, then let’s create a specific contract for that Genre.

public interface IGenre {
Genre getGenre();
}

By isolating a particular functionality with a contract, we can create a new first-order function which satisfy our contract and let us reuse it wherever we want. The following function is known as a Predicate, and can be applied to a RxJava filter.

public static <T extends IGenre> boolean byGenre(
T object, Genre genre) {
    return object.getGenre().equals(genre);
}

repository.getVideos()
.filter(viewModel -> byGenre(viewModel, Genre.ADVENTURE));

If you follow this approach, you will end up with a set of functional operations that can be reused anywhere, and probably at some point of your development, you will realise that you won’t have the need to write new instructions for the same purpose even if the code structure changes, because those logics are already there, isolated and prepared to be used the way you want.

These set of operations can be also shared between different teams or platforms, as they are simple pieces of logic that can be translated into any other language easily, increasing the productivity of a development by providing a common answer for a specific problem domain.

As in the real life, before to end our Sprint, we have asked to include a new user experience functionality asap:

As a UX Designer,
     I want to show and hide a Loading Spinner whilst the
application executes a long running operation
...

This is a common requirement, in any front-end development, we’ll always want to provide to the user in order to make his experience more friendly. But sometimes, we probably faced multiple problems with this functionality, like for instance, getting an infinite loop spinner somewhere for some reason, the need to handle such spinners in every user interaction or because of a particular state of the program.

Therefore we would need a way to let our code execution to handle it by itself, so by following a Functional style which matches with our Reactive paradigm, we will create a high-order function 

public static <T> Observable.Transformer<T, T> attach() {
return tObservable ->
tObservable.doOnSubscribe(LoadingSpinner.showLoading())
.doAfterTerminate(LoadingSpinner.hideLoading());
}

… which let us apply automatically the desired loading spinner operation whenever it is needed.

videoService.filterBy(Genre.ADVENTURE)
.compose(LoadingSpinner.attach());

Thread: main - Show Loading...
Thread: RxIoScheduler-2 - Subscribed to retrieve Videos
...
...
Thread: main - Hide Loading.

By default, a high-order function is a function that does at least one of the following:

  • takes one or more functions as arguments
  • and/or returns a function as its result

Resources

A java project is available for you, where you can review the snippet codes I used in this article:

What’s next?

In the third part of this article, we will see real examples of dealing with Concurrency and Parallelism with RxJava, not only to make network requests, but also to minimise the stress and the time consumption of our computational operations.

Any feedback, please tweet me.