Asynchronous Programming In Java

Rohan Aggarwal
Jun 19, 2020 · 7 min read

Asynchronous programming is a programming paradigm that facilitates fast and responsive user interfaces. The asynchronous programming model in Java provides a consistent programming model to write programs that support asynchrony.

Why Asynchronous programming

Asynchronous programming provides a non-blocking, event-driven programming model. This programming model leverages the multiple cores in your system to provide parallelization by using multiple CPU cores to execute the tasks, thus increasing the application’s throughput.

In Java, there are many ways to achieve asynchronous programming or reactive programming like using RxJava, project Reactor, vertX, etc

Here we will discuss the project Reactor using the spring web-Flux module.

Spring webFlux module

Spring Framework 5 includes a new spring-webFlux module. We will achieve the asynchronous services using spring webFlux. It exposes 2 major API types: Flux and Mono. These API works on the concept of subscriber and publisher. Flux and Mono are publishers.

Difference between Flux and Mono

Mono can publish almost 1 result i.e either 0 or 1 whereas a flux can publish 0 to infinite results.

let's discuss them 1 by 1

Dependencies

Flux

Flux can publish 0 or more values. It can of any type.

Flux creation methods

1. Flux.just()

Use this method when we have to pass comma-separated values.

2. Flux.fromIterable()

This method is used when we have an instance of an iterable like list, set.

3. Flux.range()

This method is used to create a publisher of numbers.

4. Flux.empty()

This method is used to create an empty flux

5. Flux.error()

This method is used to publish some error response.

6. Flux.duration()

This method emits long values starting from 0 in the given duration

These are the most commonly used flux creation methods. There are few more like fromArray, fromStream, create, etc

Mono creation methods

Mono.just()

Just like we have this method in Flux we have just() method in Mono. The only difference is we can pass only 1 value to it

Mono.empty()

Same as Flux this method is used to create empty Mono. Usually comes when we don't want to return null, we can return empty Mono.

Mono.error()

It will publish an exception.

Mono.from()

This is used to create a new publisher from the existing one

Mono.justOrEmpty()

This method can take Optional value or direct value. In case of optional value, it will return value if Optional is not null else only emit onComplete In case of direct value it will return the value or emit onComplete if null is passed

There are many more methods to create Mono like *Mono.fromCallable(), Mono.fromRunnable(), Mono.fromFuture() etc.

Subscribing publishers

A publisher can only publish value when someone subscribe to it. When we subscribe to any publisher, we can get 1 of three results

i) response

ii) error

iii) completion indication

And a subscriber has 3 methods to handle these results

  • onNext()
  • onError()
  • onComplete()

onNext() method is called when any success response emits from the publisher. The OnError() method is called when an error response emits from the publisher. The onComplete() method gets called in the end when there are no more values left to emit. Its a kind of indicator to tell subscriber that subscription is completed.

OnError() and onComplete() are terminator methods means once they get called, the subscription ends.

In the above example onNext() method will be called 4 times and onComplete() once. There is no error that is why onError will not get called.

In this example onNext() will get called 4 times and onError() will get called once. Subscription got cancelled after onError() method so onComplete() will not get called.

So from the above example, we can conclude that only one of onError() and onComplete() can get called in any subscription.

Intermittent functions

These are the functions which take the values from the publisher and perform some functions on it before passing it to a subscriber.

Let's discuss a few of them.

1. map

This method is used to transform the values from a publisher (Flux or mono) before passing it to a subscriber.

1. buffer

We can use it in 2 ways: 1st with buffer size and 2nd with time duration

In the case of buffer size, we can set the number of values that we want to emit at 1 time.

Here we defined buffer size as 2. So it keeps adding values in the buffer and only emits when it reached the buffer size.

In case of buffer duration, the publisher publishes values in every specified time duration

It will emit all the values in 1 go because before 5000 ms it added all the values in the buffer. Let say if we are getting 2 values in every 5000ms, So the response will be like

2. doOnEach()

This method is used to check all the values of the publisher before passing it to the subscriber

Output :

with this method, we can check which all method will get called and how many times

output :

3. log()

This method is used for the logging purpose

output on the console :

It will log all the methods called in the subscriber with the order in which they get called.

4. delayElement()

This method is used to delay the emission of value from the publisher for the given time

It will print 1 value on console 4 times in every 5 sec

5. subscribeOn()

By default, the whole subscription process works in the main thread which subscribes to the publisher. We can change that using subscribeOn() method

6. blockFirst()

This method subscribes to the flux and returns the first element or null if no value is present. This is usually used for testing

7. blockLast()

This method subscribes to the flux and returns the last element or null if no value is present. This is also usually used for testing

8. timeout()

Propagate a TimeoutException as soon as no item is emitted within the given Duration from the previous emission (or the subscription for the first item).

9. bufferUntilChanged()

This method adds the values into the buffer until they are the same and emits them when it receives some other value.

output :

10. cache

This method is used to cache the values emitted from the publisher. we can pass the count as a parameter, how many last emitted values we want to cache. So when any other subscriber subscribes to it, there won't be any need to go to the publisher.

This method is useful when we have a publisher with infinite values. And we want to keep track of at least a few previously emitted values as well.

There are many many more methods present in Flux and Mono. For whole list, you can refer to

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Flux.html

https://projectreactor.io/docs/core/release/api/reactor/core/publisher/Mono.html

The Startup

Get smarter at building your thing. Join The Startup’s +800K followers.

Medium is an open platform where 170 million readers come to find insightful and dynamic thinking. Here, expert and undiscovered voices alike dive into the heart of any topic and bring new ideas to the surface. Learn more

Follow the writers, publications, and topics that matter to you, and you’ll see them on your homepage and in your inbox. Explore

If you have a story to tell, knowledge to share, or a perspective to offer — welcome home. It’s easy and free to post your thinking on any topic. Write on Medium

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store