There are currently several programming paradigms; in this post we’ll discuss reactive programming which focuses on the asynchronous management of finite and infinite data flow. Reactor is a reactive programming library for the Java language which provides the basis for developing non-blocking applications, thus representing a change in how we think about an application’s execution model. Reactor was developed by Pivotal, a software and services company, which takes care of developments worldwide in different divisions of software engineering.
We would like to share some highlights, new trends in Java, and the world of programming that we have learned thanks to our project experiences and research. Our aim was to achieve better results with more complex applications but in a simplified way.
In this post, we intend to bridge the gap between the world of Reactor and the powerful trends on the horizon of programming. Here we’ll discuss and demonstrate concepts, advantages, drawbacks and an approach for building an API using Project Reactor. Using the development of Twitter event manager as an example, we will explain how we consume and save responses to various requests in a non-blocking manner.
What is Reactor?
To talk about Project Reactor we must first define what Reactive Programming is. It’s a paradigm or microarchitecture that involves the routing or consumption of Streams (data stream emitted over time). Data-flows and the propagation of changes that can be generated in the application can be analyzed, providing fast and consistent response times (responsive), remaining responsive to error situations (resilient) and to increasing workload(elastic) based on the exchange of asynchronous messages (oriented to messages). Reactive Programming follows the Observer design pattern which means that when an object’s status changes, the other objects are notified and updated, thus reducing the inefficient use of resources. More can be learned about this topic in the Programming Reactive documentation.
Within this conceptual frame of reactive programming, we can begin to examine Project Reactor. As mentioned above, this is a library that exhibits the following characteristics:
- Allows on-demand management in an efficient way from a totally non-blocking base.
- Has an appropriate architecture.
- Works very well for microservices.
- It’s also powerful enough to use on any system, responding to the massive multiple requests in HTTP, UDP and TCP in an asynchronous way known as backpressure.
Backpressure grants the Consumer of an asynchronous stream the ability to tell the Producer the amount of data that must be sent to prevent the issuance of events at a rate that is faster than the processing capabilities. Reactor provides several strategies to reduce the amount of data that gets sent, including engaging buffers and using the Windowing technique, that allows a program to analyze data from the last n seconds every m seconds.
Summing up, Project Reactor can maintain a high-performance message rate and also work with very low memory space. Thanks to these features it is suitable for creating efficient applications based on events. It allows these events to cope with more requests at the same time in an asynchronous way which is ideal for high-latency applications.
The main artifact Project Reactor employs is reactor-core, which is a reactive library that focuses on the specification of reactive Streams and the Java 8 objectives. Reactor has two reactive types that implement the Publisher interface, but also provide a broad set of operators: Flux<T> and Mono<T>. These types allow applications to serve more requests at the same time and both support non-blocking backpressure.
A Flux object represents a reactive sequence of 0 to N elements, and also allows the generation of sources from arbitrary callback types. The following code fragment shows one of the most common and basic examples for creating a Flux.
In the snippet, we see the creation of Fluxs of integers, String and even from a Java stream. We have a Flux.just(..) method that creates a Flux that emits the specified element, which is then captured at the time of instance creation. The Flux.fromIterable(..) method creates a Flux that emits the elements contained in the provided Iterable, and this will in turn create a new iterator for each Subscriber. Subsequently, we’ll see more elaborate Flux implementations to fetch values that are needed to subscribe and consume data from an external API.
A Mono object represents a single or empty value result (0..1) and allows deterministic generation from scratch or a sequence from arbitrary callback types. Below, some of the most common creations of Mono are shown. The Mono.empty() method creates a Mono that is completed without emitting any element.
Reactor uses a Scheduler that determines the context for an execution of arbitrary tasks, providing the assurance required by a reactive Stream. We can also use or create efficient Schedulers for subscribeOn and publishOn. It’s possible to use multiple reactor instances that can be instantiated with different schedulers.
The snippet shows two examples that are analyzed with Schedulers. The first one defines a variable of type Mono that calls the Dao to insert a Person object. A method called Mono.fromCallable is observed, and this method expects that the supplier method (in this case insertPerson) returns a value of type T, and creates a type Mono, which is non-blocking. This method even captures errors implicitly and maps them to a Mono.error(…).
It should be clarified that Reactor has implemented its own error capture, and as it was just mentioned, it does so through the error() method. The Mono.defer(..) method works in a similar way to the fromCallable but has a difference: its method supplier must return a Mono<T> value. Since it doesn’t capture the errors, we need to do it ourselves. Once the Mono is captured, it’s returned through a subscribeOn, passing the Schedulers.elastic() as parameters, which returns a shared Schedulers instance. This means that multiple calls to this function will return the same Scheduler.
In other words, it dynamically creates groups of execution-services based workers as necessary and reuses the inactive ones, saving the groups of sub-processes or workers in cache. Groups of sub-processes that remain inactive for 60 seconds are eliminated. Elastic() method is a useful way to assign it’s own sub-processes to a blocking process so that other resources aren’t compromised. Therefore, elastic() is considered as the default Scheduler. Note that in the other example, though the method returns a Mono<Person>, it follows the same behavior as in the previous case in order to abstract its data.
Reactor and Twitter API
Now that we have a clear concept of Reactor we are going to demonstrate and develop an implementation of how it works with the consumption of external APIs, in this case with Twitter API. We will observe how it responds asynchronously to all the tweets that are captured from the application and show how it processes the information in a non-blocking way.
We must first configure the build.gradle file with the necessary dependencies for the example. In the following snippet we see the Reactor dependencies, Twitter (we’ll use the properties of Twitter4J), Spring Boot, among others. IntellijIDE is used as the text editor.
A service called TwitterService is created, which validates that our stream always exists, in this case as subscribers on Twitter. It does so through the abstract ConnectableFlux class, which allows subscribers to accumulate before connecting to their data source. This means that when we call the subscribe() method it doesn’t start broadcasting immediately and therefore we can add several subscriptions in advance. It then validates the Twitter stream calling a method that configures or builds it by adding credentials, methods to implement and finally adds the stream to a Listener. All of this is just in the case the stream has no connection and hasn’t started accumulating subscriptions.
There are several properties to configure Twitter4J, either by creating an instance of the ConfigurationBuilder class to do it manually, or it could also be done through the creation of a twitter4j.properties file. The following example was done in the service using the first method. In this configuration, it’s necessary to create the consumerKey, consumerSecret, accessToken and accessTokenSecret, each with their respective credentials.
Following this, an instance of the TwitterStream interface is created, passing the configuration that was made in the previous step as parameter. Then a StatusListener with its properties is added, working as a streams reader. With the sample() method we start listening to a random sample of all public statuses. The final result is that we have a stream.publish() method that allows us to publish our own messages in the TwitterStream instance while also having the connect() method that sends a connection request to the API to open up the data stream and receive the tweets. The data transmission model opens up a Pipeline for data to be sent as they occur, having an indefinite period of existence.
With the configured service, a controller called TwitterController is created, and with it, the communication of the external service with the application to obtain different results with the data of the captured tweets. We show that four endpoints will be created where we have different behaviors and different data in order to show the different actions that can be achieved with reactor and the Twitter API.
The first endpoint called Filtered() is a Get method that returns a Flux<String>; in this function, we get the connection to the external API to gather all tweets, then we make a filter to only get those statuses that contain the word “the”, and finally a map of the result is made and the obtained tweets are returned. Regarding the second endpoint feed() the Twitter service that was implemented is once again called, and we get all the news or published statuses through a map.
In the third endpoint onePerSecond() the same process of obtaining all tweets through the service is followed: a Flux is created, filtered by the tweet’s place or location if present and a Place instance is created, which is an interface provided by the Twitter API that extends TwitterResponse and other interfaces. The snippet shows the methods that can be used. It validates that a Place exists, and only tweets within states in the United States are returned. Finally, using the map function, parsed tweets are returned in a cleaner format.
In the fourth and final endpoint grouped(), tweets are grouped every 1 second, and once again the filter is defined by tweets originating in the United States, but this time the Flux.interval(..) method is used to assign a grouping duration, together with the zipWith(..) method to merge the previously obtained filter and then return its publications.
As illustrated by these examples, we can deepen and get fascinating things in today’s applications using Reactor — especially for microservices, a growing trend in digital transformation. Therefore, I invite readers to continue to engage with this amazing topic. In summary, I’d like to highlight the advantages and drawbacks of reactive and reactor programming.
Advantages and Drawbacks
We can answer many requests or call messages by generating one or a several threads.
It’s possible to do a callback asynchronously and this could potentially save us calling resources.
It achieves weak coupled programming and tends to isolate faults or errors, so it’s easily scalable, and you can anticipate the number of events it can receive.
With the efficient use of resources, we are doing much more with less. Specifically, we can process higher workloads with fewer threads.
More intensive memory usage is needed to store large data-flows as they are maintained for a long time.
It may be a little different from conventional programming, and it may be hard to understand in the beginning.
Most of the complexities must be dealt with at the time of declaring the service.
It doesn’t work well for applications that have very little data flow, as it can deem simple programming unnecessarily complex, or possibly even affect the performance.
Although Reactor has only been around for a short time, it has achieved a great impact on applications that suffer from high latency, by allowing better processing and response performance. This makes it ideal for the programming world’s new trend, as well as zooming in and allowing reactive programming in Java.
On the other hand it’s proving to be a strong resource to handle all of the devices and applications connected to the Internet 24/7. We need to be able to show information almost instantaneously to millions of users which generates very intense loads. That’s why I see great potential in Reactor being able to respond in an optimal and correct way to these massive data demands, which means that the application responds as the user expects it.
Finally, I’d like to thank Matias de Santi for being one of the contributors to the code proposed in this post.