ReactiveX: Reactive Programming Principles

Mahdi Chtioui
13 min readAug 21, 2019

--

Everything you need to know to get started with Rx.

Lately we’ve been hearing a lot about Rx or ReactiveX, and most of us still don’t know what Rx is, what it does really, how it works and why is making a buzz in the developer’s community?

I’m going to try answering all of these questions in order to make the picture less blurry. I will also provide some use cases to get a better understanding of the utility of Rx and how it’ll make life easier for us, developers.

First of all what is ReactiveX (Rx)? It’s basically an implementation of Reactive Programming principles, but before we dive deep into Rx, let’s start by having a better understanding of Reactive Programming.

What is Reactive Programming?

Reactive means acting in response to a situation rather than creating or controlling it: reacting to it. Reactive programming is actually similar to this definition, meaning writing code that reacts to changes.

Reactive Programming is one of the new popular programming paradigm. It’s actually a way of coding with asynchronous data streams (I know, I know… what is a data stream? I will get to it shortly, I promise) that will make it easier for us to code apps and interfaces that respond dynamically to changes in data.

Meaning that whenever we have changes in the data, our app will respond reactively to those changes.

Now let’s get back to our data streams like I promised. In programming, data streams are a sequence of data elements made available over time (according to Wikipedia) which can be accessed in sequential order. In other words, a flow of data ordered in time.

Stream & Array of Data

If you want to read further about data streams, check the first answer in this Quora post.

Now let’s wrap it up, Reactive programming is way of coding event-based programs with data streams asynchronously.

Now let’s get back to our beloved ReactiveX.

ReactiveX

ReactiveX or Reactive eXtension or simply Rx is a library for event-based and asynchronous programming by using data streams sequences.

Asynchronous means executing multiple code blocks simultaneously, each code block will run on its own thread.

Event-based means executing code based on the events generated while the program is running (for example: Button clicks events).

Rx exists in almost all the platforms, which makes it so powerful : .Net, JS, Java, Scala, Swift, Kotlin, Android, PHP, Dart, Python, C++, Ruby and others.

Rx abstracts away concerns about low-level threading, synchronization, thread-safety and much more and it’s a non-blocking I/O: as the tasks runs on the background thread, this will allow us to perform smooth and continuous user experience tasks without blocking the main thread.

In Rx, everything is a data stream, which can be observed, and whenever a value is emitted to the stream some actions will take place. Those data streams can be anything: events (for examples click events), http calls, variable changes … and even errors.

We will be creating data streams all the time, which we can observe, iterate over and make some operations on those data using operators like filter, transform, select, combine…

Focus on those highlighted words/terms in the previous sentence. Anything comes to mind? Well those three words refers to:

· Observe: Observer Pattern

· Iterate: Iterator Pattern

· Operations (Filter, Transform …): Functional Programming

Rx is the combination of the best ideas from the observer pattern, the iterator pattern and Functional Programming. If you’re not familiar with these design patterns and programming paradigm, don’t worry, I will give a short introduction to each one of them

Let’s start with the observer pattern.

Observer Pattern

The observer pattern is a behavioral design pattern. It’s the one concerned with communication between Classes/Objects.

When we talk about observer pattern, we talk about Subject and Observer. An Observer watch or observe a subject, and get notified and updated automatically whenever this subject change it state.

Observer Pattern

If you want to read more about the observer pattern here’s an introduction with example that I find helpful to understand the concept.

Now let’s move on to the iterator pattern.

Iterator Pattern

The iterator pattern is also a behavioral design pattern. It’s used to access the objects of a collection in a sequential manner without exposing its internal structure, in order to keep track of the current item and having a way of identifying what elements are next to be iterated.

The iterator pattern have 2 methods:

· next(): returns the next value from the current position.

· hasNext(): returns true if the current item is the last one else re

For further reading about the iterator pattern, check this link.

Functional Programming

Functional programming (FP) is a programing paradigm, which is based on mathematical functions. It uses conditional expressions and recursion instead of execution of statements. FP avoids concepts of shared states and mutable data.

Functional Programming is the process of building software by composing pure functions.

Basic core concepts of Functional Programming:

· Pure functions

· Higher-order functions

· Function Composition

Pure functions

These functions returns the same result for the same argument values, meaning the return value is only determined by its input values. Besides, these functions have no side effects.

Here’s an example to understand it better.

Impure Function

var res=0;

function addition(x,y){
res = x + y;
}

addition(5,3);
console.log(res);

Pure Function

function addition(x,y){
return x+y;
}

console.log(addition(5,3));

Higher-order function

Higher-order function is a function that operates on other functions, they can take a function as an argument or return it as an output. (Example code sample)

Function composition

Composition is actually how we generate a higher-order function and we do that by combining simpler functions or pure functions. It’s basically a set of chained functions.

Example:

let communities = [
{name:'JavaScript', isActive:true, members:700},
{name:'Node', isActive:true, members:500},
{name:'Angular', isActive:false, members:600}
]

let results = communities.filter((m) => {
return m.isActive;
})
.map((m) => {
return m.members - (0.1*m.members)
})
.reduce((acc,m) => {
return acc + m;
},0);

These 3 are the most basics characteristics of functional programming that you need to understand because you’ll be seeing a lot of them when coding with Rx.

Why we need ReactiveX?

Simply to improve the user experience, the application will be become more flexible to changes in the data: react to any change in data with less code, without worrying about low-level threading, just focusing about the business logic.

Since Rx is for event-based programming, it will allow us to manage all the UI events in order to get a flexible and responsive applications. It’s also error prone since it handle errors. Besides, it’s easier to code and easier to test with less time-consuming work.

In Result, we will get a performant application with smooth user experience without getting slow visual performance, and without blocking the main thread.

To wrap up, Rx will enhance the performance and specially the user experience of our applications so that our users won’t be blocked waiting for results and have a nice and smooth running application.

Now that we know what ReactiveX is, and why we need it, let’s dive deeper into its basic concepts.

ReactiveX Core Concepts

Like we said earlier ReactiveX is the combination of both the observer and the iterator patterns and functional programming principles.

ReactiveX solves almost all issues of these design patterns by introducing an Observableand an Observer. You can think of an observable as an object that emits data and an observer as an object that consumes that data.

Observables can store data of any type and Observers can be view components or other objects that subscribe to the observables in order to react to changes in the stored data.

Observables and Observers

Observables are data streams and observers consumes the data streams emitted by these observables.

But how they actually do that?

Observable

Let’s start by the observable which is an object in which an observer subscribe to it, so that this last reacts to whatever item or sequence of items emitted by the observable.

Observer

An observer also called subscriber is connected to the observable through the subscribe method. This method provide three subsets:

· onNext(value): whenever an observable emits a new value.

· onError(error): whenever an error occur or the observable fails to emit the expected value or event. It terminates the sequence with an error.

· onCompleted(): when the observable call the onNext method for the final time and no error has occurred. It terminates the sequence with success.

Observable & Observer

The power of the observable lies in its ability to identify all the events in his time line, even errors without the need of throwing an exception. And that’s exactly how Rx improved the iterator pattern.

The Observable implements 2 main methods:

· Subscribe

· Unsubscribe

We already know what subscribe method does, but one more time the subscribe method allow us to observe the observable by subscribing to it in order to react to the changes made to its values or events.

The unsubscribe method or unsubscribing from an observable means that the observer is no longer interested in the subscription so it is released from memory.

Subscription

Before moving on further let’s see an example using observables: how to create, subscribe to and unsubscribe from observables, in order to figure out how they behaves.

This example will wrap and put to action everything we learned so far. I’m going to comment each line with further explanation so that you don’t get lost in the code. (I will provide the same example in Js (using RxJS) and in java (using RxJava)).

RxJS Example:

import { from } from 'rxjs';

// Create observable of integers
const observable = from([10,20,30]);

// Subscribe to the created observable
const subscription = observable.subscribe({
next(value) { console.log('Current value: ', value); },
error(error) { console.log('Error occured :', error); }
});

// Unsubscribe from the subscription
subscription.unsubscribe();

RxJava Example:

Integer[] numbers = {10,20,30};

// Create observable of integers
Observable<Integer> observable = Observable.from(numbers);

// Subscribe to the created observable
Subscription subscription = observable.subscribe(new Observer<Integer>() {

@Override
public void onNext(Integer value){
System.out.println("Current value: ", value);
}

@Override
public void onError(Throwable e){
System.out.println("Error occured: ", e);
}

@Override
public void onCompleted(){
System.out.println("Completed");
}

};

// Unsubscribing the subscription
subscription.unsubscribe();

There are other types of ReactiveX or Rx, you can think of them as custom observables.

Different types of ReactiveX

Single

A Single is something like an Observable, but it emits only either one value or an error. When subscribing, it only uses 2 methods instead of 3 (onNext, onError, onCompleted). Those 2 methods are:

· onSuccess

· onError

A Single can call only one method either onSuccess or onError.

Observable & Single

If you want to read more about it follow this link.

Subject

A subject is special type of reactive that can be an observable and an observer. It can emit values or events, accept subscriptions and add new elements into the sequence.

There are 4 types of subjects, I’ll provide an example to each type in order to clarify the behavior of those subjects.

Subjects

Hot and Cold Observables

People never really pay attention to whether their observables are Hot or Cold which could lead to a lot of issues when writing code. These 2 concepts will help you figure out how your observables behaves.

Cold Observable

Cold Observables are those whom data are created inside them, and they begin emitting values only when an observer subscribes to those type of observables meaning it emits values only when requested. Some use cases examples: database queries, web services, reading or downloading files…

The data are produced inside of the observables so it’s not shared between subscribers, since each subscriber have its own private source.

Here’s a list of the operators that will create for us Cold Observables:

· Of

· From

· Range

· Interval

· Timer

These operators create for us cold observables, since the data is created from within the observable and does not produce data only after a subscription is made.

Hot Observable

On the other hand, Hot Observables are those who start emitting values whether the observer is ready or not. Meaning that these observables start emitting values before the subscription is made it does not wait for observers to subscribe (No control on emission rate).

The data is produced outside of the observable, for example: mouse and keyboard events, UI events, system events, time…

We can create Hot Observables using the operator fromEvent(), which basically create observables from events like clicks, mouse move….

Hot Observables & Cold Observable

Those type of observables are able to share data between multiple observers and the subscription has no side effects.

How about making a cold observable hot? Can we do that? Actually, we can and here’s how.

Making Cold Observables Hot

In order to make a cold observable hot, we can use an operator called share() which will allow us to share the same stream between multiple subscribers (Cold observables normally does not share data).

ReactiveX Operators

Operators are the tools that makes observables powerful, allowing us to code elegant and declarative solutions to complex asynchronous tasks. They also allow us to chain operations, each operation modifies the observable emitted from the previous operator.

These operators operate on an observable and return an observable. Which will allow us to chain them one after another. These operations can transform, combine or filter the elements emitted by an observable sequence, before the subscriber receives them.

There are many categories of operators, I will not list them all but I will provide a link to all the categories and operators and here are some:

Operators List

Like I promised here’s the link the list of categories and operators.

Here’s an example using operators In RxJava (don’t worry about the technology, it’s basically the same for the different platforms).

In this example, we created an observable of integers with Observable.from() method, then we used the operator filter() to filter the array and get the numbers that array inferior to 4, after that we used the operator map() to add “+1” to the results and finally we subscribe to the final results in order the print it.

Example:

Observable<Integer> observable = Observable.from(new Integer[] {3,6,8,1});

observable
.filter(i -> i<4)
.map(i -> i+1)
.subscribe(System.out::print);
Example using operators

Schedulers

When we talk about Rx, usually, we talk about three key points Observables, Observers and Schedulers. We already, explained the first two, it’s time to shine the light on the schedulers.

We know that Rx is library for asynchronous programming, and by asynchronous we mean executing multiple code blocks simultaneously, each code block will runs on its own thread.

So we need to manage these thread to get a better performance and prevent problems like memory leaks, and that’s where schedulers comes in handy.

Schedulers in Rx are the ones responsible for telling the observables and observers, on which thread they should run. In order to do so Rx introduce 2 main operators to manage these threads:

· SubscribeOn(): allow you to specify on which thread the observable should operate.

· ObserveOn(): Specify on which scheduler an observer should observe the observable.

Note that if you don’t specify which thread to execute the operators on, by default, it keeps running on the same thread in which the subscription is created.

Code Example:

postService.getPosts()
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.subscribe({ result ->
handleResponse(result)
},
{ throwable ->
throwable.printStackTrace()
})

Let’s wrap up what’s been said.

Wrap Up

ReactiveX (Rx) is a library for composing asynchronous and event-based programs by using observable sequences.

ReactiveX will allow us to have fast and performant apps with a smooth user experience. Rx will take care of low-level threading, handle errors and also allow us to easily write interfaces code that will updates itself reactively which will allow us to focus on the business logic.

Everything in ReactiveX is an observable sequence or something that react to an observable sequence.

Observables, Observers and Schedulers are the building blocks of reactive programming.

Observables are the data streams. These data can be of any type (objects, events, Http calls, errors …). They emits data, errors and even completion events.

Observers watch and consumes data emitted by the observable, they also receives errors or completion events.

Schedulers make it easier to manage complex threads. They allow us to add threading to our observables and observers.

Observers are connected to observables through the subscribe method. This method subsets 3 methods:

· onNext

· onError

· onCompleted

There is 2 operators that will allow us to manage the schedulers of our observables and observers:

· subscribeOn: specify the scheduler that our observable will operates on.

· observeOn: specify on which scheduler the observer should be notified.

Hot and Cold observables will allow us to understand the behavior of our observables in order use the full potential of Rx:

· Hot Observables

o Start emitting data from the second it’s created.

o Share the data between multiple observers.

· Cold Observables

o Start emitting data only after a subscription is made.

o The data are not shared between subscribers/observers.

Credits

I started working on ReactiveX as an iOS Developer intern at Vynd Solution. For those who doesn’t know Vynd:

Vynd is a Tunisian startup offering a mobile application (Android & iOS) for finding the perfect restaurant, bar or coffee shop for a night out (For more details on Vynd, checkout the links below).

One of my tasks was to introduce ReactiveX to Vynd’s Developer Team through a presentation i made, plus this article. In order to implement it in Vynd’s different solutions for different platforms (Android, iOS, JS and .NET). I also started working with RxSwift, by implementing its core concepts on some features of the current Vynd iOS application.

I want to thank Vynd Team for the support and the encouragement they gave me. Also for the knowledge I got during my internship from the hands-on experience in development process from development to production, and the different tools and software used in the DevOps process to ensure the continuous integration and the continuous delivery for different platforms.

Vynd Important Link:

Vynd:

Download Vynd Mobile App:

Vynd Android:

Vynd iOS:

For getting social with Vynd:

Vynd LinkedIn: https://www.linkedin.com/company/vynd/

Vynd Facebook: https://www.facebook.com/vyndinc/

Vynd Instagram: https://www.instagram.com/vyndapp/

--

--

Mahdi Chtioui

Mobile & Web Developer 💚 💙 Passionate by innovations, new technologies and design ❇️ GitHub: https://github.com/mahdichtioui