Why you should learn Reactive Programming

Ovidiu Latcu
Corebuild Software
Published in
6 min readDec 29, 2017

So you’ve heard about reactive programming, reactive extensions, RXJava and all the hype around them and you can’t get your head around them. You don’t know if they are a good fit for your project, if you should start using them and where to start learning. I’ll try to make this easy and simple for you.

Note : in this article I’ll be mostly talking about and using examples from RxJava which is the Java VM implementation of the Reactive Extensions specification. The code will be written in Kotlin.

Quick intro

So what is reactive programming ?

Reactive programming is an asynchronous programming paradigm concerned with data streams and the propagation of change. This means that it becomes possible to express static (e.g. arrays) or dynamic (e.g. event emitters) data streams with ease via the employed programming language(s).

https://en.wikipedia.org/wiki/Reactive_programming

An API for asynchronous programming with observable streams

http://reactivex.io/

Well I am pretty sure that is not the quick intro you where looking for. I find a simpler, better definition the following :

In reactive programming, everything can be a stream of data.

https://gist.github.com/staltz/868e7e9bc2a7b8c1f754

The device location, the system time, a list of entries from the DB/API, user clicks, everything can be a stream of data. Data from different streams can easily be combined and transformed, and in the end processed/observed by the subscribers.

In reactive programming, Observables emit data, and send it to the subscribers. This can be seen as data being PUSHed in reactive programming, as opposed to data being PULLed in imperative programming, where you explicitly request data (iterating over collection, requesting data from the DB, etc).

RX also comes with a strong mechanism through which subscribers can notify the observables at what rate they should emit the items, called backpressure. This is needed, when an observable emits items at a faster rate than the subscriber can process. In this case, if the observable it is not notified that it should reduce the rate at which it emits items, the internal buffers will grow until an OutOfMemory will occur. Although this seems complex to implement, using RX this can easily be achieved using one of the onBackPressureXXX operators ( onBackPressureBuffer, onBackPressureDrop, onBackPressureLatest , etc )

The building blocks of Reactive Programming

Observables (& Flowables in RxJava2)

  • Observables are the data source /stream
  • can emit multiple values, just one, or none
  • can also emit errors
  • can be infinite or finite, in which case they emit their completion event

Subscribers

  • Subscribers subscribe to Observables
  • they consume/observe the data
  • they also receive the errors and completion events from the Observable

Operators

  • used to create Observables (timers, ranges, from other data sources)
  • used to transform Observables (map, buffer, group, scan, etc)
  • used to filter Observables (filter, distinct, skip, debounce, etc)
  • used to combine Observables (zip, merge, combine latest, etc)

Schedulers

  • mechanism that allows us to easily add threading to our Observables and Subscribers
  • subscribeOn() specifies on which Scheduler to Observables should operate
  • observeOn() specifies on which Scheduler/Thread the Subscribers should be notified

Advantages of Reactive Programming

Still not sure why you should start learning RX ? Here’s a short list of advantages :

  • avoid “callback hell”
  • a lot simpler to do async / threaded work
  • a lot of operators that simplify work
  • very simple to compose streams of data
  • complex threading becomes very easy
  • you end up with a more cleaner, readable code base
  • easy to implement back-pressure

Concrete example

I know RX is hard to understand from the first try, but I think taking a concrete example will make this easier.

Let’s assume you have the following requests for a component, in an Android application :

  • it should track the user Location
  • just take into consideration location when accuracy is higher than 20 meters
  • it should also track device acceleration and magnetic sensor from which it should compute the azimuth(orientation angle relative to North)
  • combine the latest values from each data ( location , azimuth )
  • at every 5 seconds submit the data to the API
  • do all this work on a background Thread
  • once a response is received from the API, post it on the UI Thread so we can display it to the user

Please take 2 minutes and think how you can easily accomplish this using imperative programming, and how clean the code will look.

Whatever solution you came up with, I don’t believe it will be simpler and cleaner as the below code written in Kotlin that uses RxJava2 :

For now just let’s assume we already have an API/component that will give us Observables for the location, magnetic field and acceleration sensors (in the next article I’ll dwell deeper and explain how we can create our custom Observables using Observable.create() ). For the HTTP part, I think you are familiar with Retrofit which has adapters that can return RxJava2 types. So our APIs providing Observables will look like this :

To create our Azimuth Observable we must combine magnetic field and acceleration values, and each time one changes compute the new azimuth value :

Finally, our UserService implementation will look like this :

So what’s happening inside our startTrackingUserData() method ?

First, we must create our azimuth stream, by combining the accelerometer and magnetic sensor values. We connect to them, and combineLatest values and pass them to the computeAzimuth() function that will return the azimuth value.

Next, we take the azimuthStream, combine it with our locationStream after we filter() locations that have accuracy better than 20 meters, take again the latest values using combineLatest for each stream and bundle them in a UserData object. At every 5 seconds, we submit the most recent value, using throttleLast(5,TimeUnit.SECONDS) , while discarding older objects. We submit our data to the API using flatMap{ restApi.postUserData()} that will return another Observable<ResponseBody> . We add subscribeOn(Schedulers.io() to instruct RX to execute everything on a background thread, and observeOn(AndroidSchedulers.mainThread() so that our final result is posted back to our main thread. In the end, we call subscribe() which will trigger all the involved observables to start emitting items.

I hope you now have a better understanding of why RX is so great, and how much it can simplify asynchronous work. Writing the above code without RX would mean nesting a lot of callbacks, spawning threads, adding timers or Thread.sleep() maybe having to synchronize code paths and so on. Not only it would be hard and complex to write, it would also leave a lot of room for errors and bugs, and having to modify it would be a pain. Just imagine having to add a method that will cancel() the updates. You’ll probably have to call Thread.interrupt() , unregister listeners and so on. With RX all you have to do is call disposable.cancel() and this will unsubscribe from all the streams and stop all the sensor listening and API calls.

Reactive Programming is not easy, and it definitely comes with a steep learning curve, as you will have to un-wrap your head from imperative programming and start thinking in a “reactive way”, but once you get to learn it it will simplify your life a lot.

Some of the code for my article is also available on GitHub.

References & good learning sources :

--

--