Event is the new data — Reactive Programming

Dharmvir Tiwari
Blue Harvest Tech Blog
4 min readJun 26, 2020
The falling apple was an event and the Theory of Gravity was its reaction

Journey so far

After the synchronized hell-hole, Java provided us with ExecuterService and later with Future. It provided a lot of abstraction but had many hidden mines in the implementation and these mines seldom exploded in production, spoiling your sleep😵

It was implemented in Java-5 to model asynchronous programming efficiently, but it had myriad loopholes.

Following are a few issues:

  1. Exceptions are confined in the independent thread and the main thread gets blocked forever.
  2. Difficult to create a step-based pipeline.
  3. Blocking nature of get() method.
  4. Reacting to another Future completion.
The exception will block the calling thread

Java 8 provided an improvement over these issues and brought about an enhanced version of Future, CompletableFuture(Which implemented Future and CompletionStage interfaces).

Creating a pipeline of operations

With CompletableFuture, we are able to combine multiple calls, compose a pipeline, exit the flow via an exception, and many more (They have 50+ methods to help model the flow efficiently and with least mash-up). In spite of these modifications, the underlying model remained the same; a pull-based mechanism that inhibits concurrency if we are operating with a stream as our source.

Motivation for reactive programming

The Hollywood Principle says, ‘’Don’t call us, we’ll call you.’’

With reactive programming, we want to target the software model around events rather than objects. An application with foreign interaction is reactive in nature.
As per the Reactive Manifesto, a reactive system should be responsive, resilient, elastic, and message-driven. This resulted in the standardization and inclusion of reactive streams as Flow API in Java 9 which has various implementations; RxJava, Akka, Spring Reactor.

Here, we will have a look at RxJava2 (this is as per the new Java updates) and will try to gain an insight into its usage.

Pre-requisite

RxJava2

RxJava2 Maven dependency

Observables and Subscribers

Observable is a push-based, composable iterator. A given Observable<T> emits items of type T through a series of intermediate operators until it eventually arrives at a terminal Observer, which consumes the items. It sounds familiar to streams from Java 8, but the stream is a pull-based iterator with just a data channel whereas, Observable consists of data, error, and completion channels and employs a push-based mechanism.

The observable emits currency and provides the conversion for it ( Converting to Euro)

Creating Observable

  • create() : The Observable.create() is a factory method that takes an ObservableOnSubscribe<T> source (Lambda function which takes an emitter and returns an Observable)and helps us to create an Observable.
    The onNext() calls will source the items up the chain towards the Observer.
  • just(): Takes up to 10 items that the user wants to emit.
Observable using interval
  • interval: Observable.interval() will emit infinitely at the specified interval). As it operates on a timer, it needs to run on a separate thread and will run on the Scheduler by default.
  • defer: Observable.defer() creates a separate state for each Observer. If the source of the observable driving it changes, in other creation mechanisms, the update won’t reflect but defer() would pick the update for the subsequent observers.
  • Hot and cold observables: A cold observable is like an automated tele-caller.For anyone who connects to it, the same message is replayed😫. Most data-driven observables are cold ( create, just, fromIterable and many more return a cold observable)
    A hot observable is like a conference call if you join late, you would have missed some conversation and the conversation is broadcasted to all the attendees. ConnectableObservable is a helpful mechanism that takes cold Observable as input and converts it into a hot Observable using publish() over a cold observable.
Using publish() on cold observables turns them into a broadcaster

Concurrency

Concurrency (also called multithreading) is multitasking, where you have several processes executing at the same time. If you want to efficiently utilize your hardware’s computing resources, implementing concurrency solves your problem. But there is a caveat. How to create a concurrent system efficiently? RxJava addresses concurrency in a modest manner which is easy to implement.

RxJava handles concurrency with observeOn() and subscribeOn() methods.

Conclusion

The approach towards solving a problem must be reactive. Instead of modeling our solution based on objects and static data, we need to consider events, data, and think of everything as an Observable. The modified approach helps us to leverage everything reactive programming has to offer.

--

--