RxJava().filter{ it.basic }

A “reactive” way to explain some key points and concepts regarding ReactiveX

Paolo Brandi
AndroidPub
8 min readJul 8, 2019

--

This article is not meant to explain how ReactiveX works, there are already plenty of them out there (some link at the end for those who might be interested), but rather it wants to be a reference for some basic concepts and terminologies that might be more or less familiar.
It might be of interest for who is already familiar with ReactiveX world but sometimes gets confused or to who, instead, is new to it and wants to understand the meaning of some terms, before jumping into it.

Just one assumption before starting…

this article refers to the Java implementation of the ReactiveX API, more specifically to RxJava 2.x.
If you are interested in the differences between this version and its brother RxJava 1.x, please read this.

Ready? Let’s start and subscribe to this article, by filtering only the basics of RxJava :)

we will invoke readNext() every time the article emits a new Rx basic concept.

Below the first emission :)

readNext() — An intro to ReactiveX

From the official ReactiveX page:

ReactiveX is sometimes called “functional reactive programming” but this is a misnomer. ReactiveX may be functional, and it may be reactive, but “functional reactive programming” is a different animal. One main point of difference is that functional reactive programming operates on values that change continuously over time, while ReactiveX operates on discrete values that are emitted over time.

…now…second emission…

readNext() — Reactive

ReactiveX may be Reactive.

Definition:
Reactive programming is a declarative programming paradigm concerned with data flows and the propagation of change over time.
With this paradigm is possible to express data flows, and any changing-state can be automatically propagated by the underlying execution model through the data flows.

…and….the third emission….you got it now, no?

readNext() — Functional

ReactiveX may be Functional.

Definition:
Functional programming is a declarative programming paradigm that treats computation as the evaluation of mathematical functions, using “pure” functions that don’t produce side effects, avoid changing-state and mutable data — e.g. the same input will always produce the same output.

In contrast to imperative programming which is done with statements and sequence of commands for changing the state and where data are mutable.
In functional programming data are immutable and new states are created from the previous ones.

readNext() — Reactive eXtensions

ReactiveX is a library built on the Reactive programming paradigm for composing asynchronous and event-based programs by using observables while abstracting away concerns about things like low-level threading, synchronization, thread-safety, concurrent data structures, and non-blocking I/O.

It is called ReactiveX (a.k.a. Reactive Extensions) because:

  • it extends the observer pattern to support sequences of data and/or events
  • it extends/combine Reactive Programming with Functional Programming by adding operators that allow you to compose sequences together declaratively.

readNext() — RxOperators

ReactiveX offers a big set of Operators for a different type of operations.
In general, operators can be chained to each other, can be applied to data flows, can transform one type of data to another, can be composed to produce more complicated operators, can be extended in order to create custom operators.

Operators can be grouped by category:

  • Creating Observable such as create(), just(), empty(), defer()…
  • Transforming Observable such as map(), flatMap()…
  • Filtering Observable such as filter(), skip(), take()…
  • Combining Observable such as merge(), zip(), switch()…
  • Utility Operators such as observeOn(), subscribeOn(), doOn()…
  • Conditional Operators such as contains(), takeIf()…
  • Mathematical/Aggregate Operators such as max(), min(), concat()…
  • and others…

Here a full list of Operators and their documentation that explains how each of them works and is meant to be used with the help of a typical marble diagrams.

Example of a marble diagram for the operator map().

readNext() — Push Pattern

ReactiveX follows the “push” pattern where everything is data, streams of data which can be emitted by a Source and consumed by other entities called Consumers. Data streams can be manipulated and combined together before getting consumed.

readNext() — Source of data

Based on the nature of the source there might be different types of streams and consumers.

readNext() — ObservableSource

In RxJava 2.x, ObservableSource is an Interface for a type of source that might produce 0 to many items or error, which allows consumer such as Observer to subscribe to this source of data.

readNext() — Observable

Observable is an implementation of the ObservableSource, which wraps up the data flow and provides a full set of RxOperators. It should be seen as a channel to connect sources and observers.

readNext() — Observer

An observer is the consumer of the data. In RxJava it is an interface that provides the mechanism to listen to each emission, error, and termination of the stream.

  • onNext(): an Observable calls this method whenever the Observable emits an item. This method takes as a parameter the item emitted by the Observable.
  • onError(): an Observable calls this method to indicate that it has failed to generate the expected data or has encountered some other error. It will not make further calls to onNext or onCompleted. The onErrormethod takes as its parameter an indication of what caused the error.
  • onCompleted(): an Observable calls this method after it has called onNext for the final time if it has not encountered any errors.

readNext() — MaybeSource, Maybe and MaybeObserver

In RxJava 2.x, MaybeSource is an Interface for a type of source that might produce 0 to 1 item or error, which allows consumer such as MaybeObserver to subscribe to this source of data. Maybe is the abstract implementation which provides a set of operators.

readNext() — SingleSource, Single and SingleObserver

In RxJava 2.x, SingleSource is an Interface for a type of source that might produce 1 item or error, which allows consumer such as SingleObserver to subscribe to this source of data. Single is the abstract implementation which provides a set of operators.

readNext() — CompletableSource, Completable and CompletableObserver

In RxJava 2.x, CompletableSource is an Interface for a type of source that might produce 0 item or error, which allows consumer such as CompletableObserver to subscribe to this source of data. Completable is the abstract implementation which provides a set of operators.

readNext() — Publisher, Flowable and Subscriber

The relationship between Publisher, Flowable and Subscriber is the same as for ObservableSource, Observable and Observer, with the only difference a Flowable supports Backpressure.

readNext() — Backpressure

Backpressure is the ability to handle a source of data where the emission rate is higher than the rate a consumer can actually consume these items. For example, this is true for real-time data such as GPS coordinates or motion events. ReactiveX provides different policies to handle these kinds of scenarios. Flowable is the only data stream to be able to support Backpressure. For the nature of the problem is understandable the reason Maybe, Single and Completable do not need to support this capability.

readNext() — Hot vs Cold Observable

  • Cold observables can be created multiple times and each instance can be triggered on its own. It starts emitting items once an observer subscribes to it.
  • Hot observables are like a “stream” of ongoing events. Observers can come and go, but the stream is created ones and just keeps going.

readNext() — Schedulers

Schedulers are the ReactiveX way to handle asynchronous jobs, abstracting away concurrency, synchronization and threading management, allowing to switch from a thread to another one in an easy way.

RxJava features several standard schedulers:

  • Schedulers.computation(): Run computation intensive work on a fixed number of dedicated threads in the background. Most asynchronous operators use this as their default Scheduler.
  • Schedulers.io(): Run I/O-like or blocking operations on a dynamically changing set of threads.
  • Schedulers.single(): Run work on a single thread in a sequential and FIFO manner.
  • Schedulers.trampoline(): Run work in a sequential and FIFO manner in one of the participating threads, usually for testing purposes.

readNext() — subscribeOn() vs observerOn()

  • subscribeOn() defines the Scheduler where the source work will be performed on. Since there is only one initial source for an Observable chain, it makes sense to only have one subscribeOn() operator. Subsequent calls to subscribeOn() will not affect or override the previous configuration.
  • observeOn() defines the Scheduler where all downstream operations will be performed. In other words, it changes the Scheduler for all operators after it is applied. Since there can be many subsequent calls to it, having multiple observeOn() calls in a single chain makes sense and it’s actually a good practice depending on the kind of job needs to be performed.

readNext() — Upstream vs Downstream

Imagine ourselves on the step defined by operator2:

  • upstream is anything towards the source
  • downstream is anything towards the subscribe(consumer)

onComplete()

That stream is ended :) That is it for now.
If you are more interested in Rx and how it works you can find some references that can help you out.

References:

--

--