Functional Reactive Programming with RxJava (Part 1)

Francisco Riádigos
6 min readAug 17, 2016

--

This is the part 1 of 4 articles, where I will try to describe, without the use of abstract theories, what means and makes important the use of a Functional and Reactive style. I will share in the first two parts of this article, some examples of main concepts and approaches that can make you to consider to upgrade your legacy code by using these paradigms.

This introduction does not aims to focus in the use of RxJava as a WebServices request tool, but providing a mechanism that let us to apply some functional techniques, as well as a way that makes our project to be more reactive.

Imperative vs declarative

Look at the following piece of code and try to figure out what it does.

each( items )
.filter( notEmpty )
.filter( noSpecialChars )
.map( toISOCode )
.findAll();

We can quickly guess the following definition:

Given a number of items, it returns a number of their corresponding ISO codes

This is called a declarative style of programming, because this code expresses what it does, and what is the expected result you get.

Now let see the same code but written in Java:

Set<Integer> isoCodes = new HashSet<>();
int i, len;
for (i = 0, len = items.length; i < len; i++) {
String item = items[i].trim();
if(item.isEmpty()) {
continue;
}
Pattern p = Pattern.compile("[^A-Za-z0-9]");
Matcher m = p.matcher(item);
if(m.find()) {
continue;
}
isoCodes.add( findISOCodeByString(item) );
}

This is called an imperative style of programming, which describes how the code operates.

We understand the code above because we are familiar with this style of programming, but if the code gets huge we will probably waste many time analysing how it works instead of understanding what it just does.

Imagine a scenario with multiple conditional statement, with nested for loops and so on, nowadays nothing apparently strange for us, right?

Then normally, once this code has been written and tested that is working, a developer usually refactor it by this way:

Set<Integer> isoCodes = new HashSet<>();
for (String item : items) {
if(!item.isEmpty() && hasNoSpecialChars(item)) {
isoCodes.add( findISOCodeByString(item) );
}
}

Which looks more understandable as well as simple, but what we really observe here is that we are trying to make our code more readable and expressive, not only for our colleagues, but also for us, because 3 months later we would probably like to understand what the hell we wrote.

What means a side effect?

A function or expression is said to have a side effect if it modifies some state or has an observable interaction with calling functions or the outside world.

You can find some examples of a side effect,

  • when you modify a global or a static variable outside the scope of a class,
  • when the result or the own execution of a function is different due to the use of an external variable, for instance a state,
  • when an exception is thrown and changes the expected behaviour of a program…

Take a look at the following example:

int[] foo = new int[]{ 2, 4, 3, 1, 5 }; // unordered
Bar bar = new Bar();
bar.execute(foo);
System.out.println("foo ->" + Arrays.toString(foo));
foo -> [1, 2, 3, 4, 5] // the outside variable becomes ordered...

This code makes the “foo” array to be also reordered outside the execute function of the Bar class. A way to prevent this side effect would be by making a copy of the array inside the execute function of the Bar class in order to avoid unexpected behaviours on the next logic execution.

Then why are we considering RxJava that avoids side effects?

int a = 0, b = 0;int c = Observable.just(a)
.map(integer -> ++integer) // no side effect
.map(...) // "a" haven't changed
.toBlocking().first();

int d = ++b; // there is a side effect, it changes the value of b
- Before execution
a = 0
b = 0 <--
- After execution
a = 0
b = 1 <--
c = 1
d = 1

The variable “b” changed because of the increment, whilst the variable “a” still remain the same. This is because a downstream operation of an Observable does not cause any side effects from an upstream execution or the outside world.

Take a look at this talk Dan Lew gave at Droidcon SF.

What difference a functional approach makes?

There are several good practices and principles that we can also consider as rules in a pure functional programming style.

Let’s see some restrictions we might have if we were developing, for instance, in Haskell which is supposed to be a pure functional language:

  • You cannot use loop statements.
  • All functions must have only one parameter.
  • Each function contains only one single expression.
  • There must not be any side effect.
  • All variables must remain immutable.
  • Operations on a data structure cannot be destructive.
  • The execution order doesn’t matter.
  • You must use static typing.

It seems like the way we are required to develop is actually all the opposite as how we do it right now, isn’t it?

Dealing with RxJava and Third Party Libraries

A good practice in general is wrapping external libraries, as this give us the following advantages:

  • Loose coupling classes from the third party library
  • Independent from an external contract or an interface
  • Ability to switch to another library easily without any side effect
  • The wrapper is the only one which implements the changes
  • Easy to test by creating our own mocks

Let’s imagine we have a third party Player library which notifies every second the current seek bar position through a Listener.

listener.onTimeUpdated(position.getAndIncrement());

But the problem with this implementation is obvious, we are calling the client each second, which can be really expensive in terms of a computational execution, whilst our need could be just to get notified when a specific position is reached.

In this scenario, the best path would be wrap our Player library and, perhaps, follow a Decorator Pattern to add a new functionality which let us store only those positions we are interested on…

public void notifyOnTime(Integer... positions) {
this.positions = ...;
}

… so we can handle internally when a position must be emitted to the Observer.

@Override
public void onTimeUpdated(int sec) {
if(positions.length > 0) {
if(Arrays.binarySearch(positions, sec) >= 0) {
observer.onNext(sec);
}
} else {
// Otherwise notify each position
observer.onNext(sec);
}
}

By this way, our custom PlayerDecorator could be used as follow:

playerDecorator.notifyOnTime(2, 7);playerDecorator.play()
.subscribe(sec -> log("Emitting on " + sec));

But now, imagine we got another concern. We would like to fire multiple events from the Player, like the followings:

...
void onInit();
void onPlay();
void onPause();
void onStop();

Then, how we could handle each event by using RxJava?

A straightforward way to do it can be by implementing a kind of a Reactive Event Bus.

public class RxPlayer {   private Subject<Object, Object> bus;   ...   private RxPlayer () {      playerDecorator.attachListener(new PlayerListener() {         @Override
public void onPlay() {
bus.onNext(new PlayEvent());
}

@Override
public void onPause() {
bus.onNext(new PauseEvent());
}
... });
}
... public <R> Observable<R> ofEvent(Class<R> clazz) {
return bus.ofType(clazz);
}
}

Now we can subscribe to any event of the Player wherever we want and even without the need to implement the whole Player interface, which means decoupling our client classes from any artificial contract.

RxPlayer playerBus = RxPlayer.getInstance();

playerBus.ofEvent(PlayEvent.class).subscribe(playAction);
playerBus.ofEvent(PauseEvent.class).subscribe(pauseAction);playerBus.ofEvent(StopEvent.class).subscribe(stopAction);

playerBus.getPlayer().initialise();

In case we were only keen to handle the Pause event of the Player in a specific class, we won’t have any more the need to implement all the empty remaining methods from the Interface.

Let’s say I have wrapped a couple of classes with RxJava, then in the following example, I could delay the execution of a specific operation before to fire another:

playerBus.getPlayer().initialise()
.delaySubscription( drm )
.subscribe();

The above instruction subscribes first to the Reactive DRM wrapper, which executes some built-in operations before to subscribe and initialise the Player, providing that the DRM doesn’t throw any error.

Resources

A java project is available for you, where you can review the snippet codes I used in this article:

What’s next?

In the second part of this article, we will see how to become Reactive and how to mix both approaches, Functional & Reactive styles together.

Any feedback, please tweet me.

Click here to see the second part of this article.

--

--

Francisco Riádigos

Former @LloydsBank, @BritishAirways, @SkyUK, @Wefox, @Indra... Co-founder @Chatty