Disclaimer: Rule number 1 of the Reactor club: don’t write your own Publisher. Even though the interface is simple, the set of rules about interactions between all these reactive streams interface is not. — Simon Baslé (Pivotal, Project Reactor).
I believe that most of the readers have already seen the Reactive Streams Specification with an understandable four interfaces. From the first look it seems that the implementation those interfaces should be simple and maybe every one tried or wished to try to create own implementation of those interfaces, despite the fact that there already exists a few reactive implementation such as Project Reactor or RxJava. In fact, it is not recommended without in-depth knowledge to implement interfaces, declared in Reactive Streams. The main pitfalls are hidden behind the huge list of rules, that specify strict behavior for the interfaces. Nonetheless, during the writing of one of the chapters for my book “Reactive Programming with Spring 5”, I decided to create own implementation of Publisher, Subscriber, Subscription, and Processor and share the steps with which I faced during that challenge. Through this topic, I will share basics coding steps, pitfalls, and patterns that will be useful for future implementors.
In the first part of the topic, we will implement the Publisher interface. The initial idea that came to my mind was creating the Publisher-wrapper for Java 8 Streams. The first naive implementation with which we will start looks like next:
Here, we just implement
Flow.Publisher<T> interface and override
subscribe() method. As you might notice from the example, instead of Reactive Streams Specification here is using JDK9 Flow API. Since both interfaces are identical, it does not matter which will be used as a base. The other point that should be noticed is the behavior of our Publisher. In our case, there is no dynamic data streaming. Thus, the original behavior of Publisher is cold. That is meant that each Subscriber will receive the same data. To satisfy that requirement the implementation should create a new instance of
java.util.stream.Stream<T> for each new Subscriber. The good candidate to achieve that behavior is to have
Stream instead of plain
Streaminstance. That is why the constructor receives
Suplier<T> as a parameter and store in the
Now, we have the essential understanding of Publisher behavior, and we can move forward to functional implementation. Let’s try to pass some data to subscriber naively. The next example shows the implementation of
In line 12, the Stream is pushing data to the
onNext() method of Subscriber. The next line calls
onComplete() after successful iteration, otherwise, will be executed the line which calls
Now, let’s create some simple verification, that our solution works:
And finally, if we run our solution we will see the next output:
It seems that it works. However, is it a truly working solution from the Specification perspective and how to verify that all specified rules are properly incorporated? It is not an easy job to prove it. Just to correctly build corresponding test suite may take much more time than an implementation of the working code for Reactive Streams. However, to our luck, the toolkit for that purpose has already been implemented by Konrad Malawski and got the name Reactive Streams Technology Compatibility Kit or simply TCK. TCK is defending all Reactive Streams statements and testing corresponding implementation against specified rules. TCK includes a few useful test-classes which cover all corner cases for all defined interfaces. In our case, will be helpful the next abstract class
org.reactivestreams.tck.PublisherVerification<>. Let's create a test to verify our
To get a better understanding of this test, would be nice to have a brief overview of conceptual points of the verification before we have run it. To correctly execute the test, we mandated to override two abstract methods and correspondingly provide instances of working Publisher. In the first case, Publisher should produce some data and completed successfully. Also, for the successful path, we should consider the incoming parameter
elements, which represents the amount of generated data. Along with the successful one, the test asks to provide failed Publisher, which should produce error result. It is some basics, minimum configuration of test suite required to start.
Ok, after a brief intro to TCK it will be nice to check the tests results:
Yup, it looks terrible. 38 tests were run, 15 tests of those were skipped, 20 tests were failed, and only three were accidentally passed. Now, the implementation looks completely incorrect.
Nevertheless, let’s try to fix them all and appropriately implement our publisher. Let’s start with the simplest test, which is called
required_spec109_mustIssueOnSubscribeForNonNullSubscriber. Three digits after 'spec' stand for the 1.09 rule:
onSubscribeon the provided
Subscriberprior to any other signals to that
Subscriberand MUST return normally, except when the provided
Subscriberis null in which case it MUST throw a
java.lang.NullPointerExceptionto the caller, for all other situations the only legal way to signal failure (or reject the
Subscriber) is by calling
To satisfy this rule, we required in calling
onSubscribe method, before any other calls. Let's implement this:
After the running of the test, you might found that the implementation is still incorrect even the method call is added. The problem still exists because it is illegal to pass
null Subscription to Subscriber. So, it will be nice to implement minimum valuable Subscription and pass it to the Subscriber:
Great, that change has fixed our test. However, we still have a lot of failed verifications. Now, let’s move to the rules with a bit higher complexity. One of the central concepts of Reactive Streams is to provide transparent backpressure control. For that purpose, the specification introduced the new interface, which is called
The notion 'new' is using here to emphasize that this interface was missed in the initial implementation of Observable-Observer model in RxJava.
Subscription has one important method to inform Publisher about its demand. This method is called
request() and is an important feature of Reactive Streams. Along with standard Push model, it allows to mix Push and Pull model. The current implementation of
StreamPublisher supports only Push model. To satisfy hybrid model we need to provide some mechanism that may push data only on corresponding demand. To find the correct place where this mechanism will be fitting well we need to analyze Specification. Appropriate rule, which may hint us where the logic should occur is the rule 3.10 which states as:
While the Subscription is not cancelled,
Subscription.request(long n)MAY synchronously call
onNexton this (or other) subscriber(s).
Regarding that statement, the best place to put the logic is in
Subscription#request itself. It may be reasoned by the current idea of our Publisher which aimed to merely wrap
Stream<T> without additional parallelization. Also, the good technique is putting Subscription implementation as an internal class. Next code snippet shows the new iteration in our Publisher implementation:
Here we created an internal class which is responsible for handling Subscriber demand and pushing data. Since each new Subscriber receives new Subscription, it is valid to store some progress or state inside Subscription. Also, we need somehow push data to Subscriber. Unfortunately,
Stream does not have explicit API, which allows fetching data in ‘give me next’ fashion. Exactly that is why it is unreasonable to store
Stream instance explicitly. The only one logical way is extracting
Iterator and using its API for that purpose. Thus, we will save
Iterator<> as the local state.
All right, we got a basic understanding of the state, which will be stored inside Subscription. Now it is time to move to request method. Here we create
for-loop that allows handling
onNext method of Subscriber. Then, when demand is satisfied we are checking if the iterator has not finished yet and then notifying subscriber about completion if the statement is true. Now, let's run the tests to recheck our code:
Just by analyzing failed tests the one missing thing may be noticed here. That thing is violation or rules 1.07 and 3.06 which says that terminal signal must be emitted only once. Thus, we should implement a flag, which will be responsible for termination state indication:
In the mentioned part of the code, we have introduced two additional methods and
AtomicInteger field. In that case, an extra field is responsible for preserving the current state of Subscription, and control concurrent access via atomic behavior.
terminate() method is reliable for termination of Subscription and returns the previous state. The returned result may be useful to check if Subscription has not already been terminated and thereby prevent violation of rule 1.07. Just by adding this additional flag and modifying the code a bit we may resolve at least six tests! Also, let's fix minor issues related to rules 3.09 and 1.04. The first rule says that negative or zero demand should complete with
onError termination and is the easiest one. To satisfy it enough just add the next part of the code in request method:
In turn, rule 1.04 is more complicated. It says that all Publisher errors should be emitted via
onError and should be emitted only after
onSubscribe method has been called. Regarding our implementation of failed Publisher, the only place when an error may be thrown before
onSubscribe is in Subscription constructor:
To postpone error throwing and emitting it is required to wrap the statement in try-catch block and store exception in the local field. Since an error should be thrown immediately after subscription, we should handle it manually, after
Moreover, since the iteration may be a potential point of failure it will be useful to wrap
subscriber.onNext(iterator.next()); in try-catch block as well. With mentioned improvements
StreamSubscription will look like next:
All right, we come to the final point, where there are only two tests are failed. Both tests are field because of the identical root cause which is recursive calling between
request()->onNext()->request()->onNext(). To avoid recursion, we mandated to check the state of execution and somehow verify if the
request is called from the
onNext method. When I tried to solve this problem the simplest idea that came in mind was creating a separate field, which would be responsible for preserving demand and indicating recursion state at the same time. Before we have got into details let’s take an overview of the solution:
Here, we added field
demand, in which the requested number of elements will be stored. Also, to prevent recursion, we added
if statement and check if demand is greater then zero. This assumption is valid because of the rule 3.01, which says that only Subscriber has access to Subscription, and
request() method must be called inside Subscriber context. Also, it is good to emphasize that the request method may be called concurrently, and it is important to adjust the demand in a thread-safe manner. For that purpose the type of
demand field is
AtomicLong. Finally, the
for-loop statement was a bit modified for atomic API usage.
If we run TCK against the current implementation, we will find that all tests have been passed, which was our primary goal!
However, this implementation contains some hidden, very dangerous pitfalls, that may not be noticed from the first glance (thanks to David Karnok, who highlighted those points). To understand the problem let’s imagine high-concurrent access to
request() method. The first problems that come in that environment depicted in the next piece of code:
Here, two separate threads are concurrently working inside the body of
request() method. That situation is allowed from both Reactive Streams specification (rule 2.7) and JMM specification. In this example, the Thread B is near the finish of the looping. At the same time, the Thread A has just finished thread-safe validation inside
if-statement. May happen that between
if-statement and the subsequent
demand.getAndAdd(n) instruction may be executed instructions from the concurrent Thread B. In that case, we come to the point when both threads exit the
request() method, and a subscriber will have received no data until the next
This issue may be fixed by enhancing the code in a next way:
Here, three redundant code lines were eliminated and replaced with one atomic CAS operation.
getAndAdd() returns previous value which in turn allows correct demand validation and prevent violation of rule 3.3. Finally, all test will be green if we run it.
Despite the fact that all TCK verifications are passed, there is another case that is still missed in the verification suite. In comparison to the previous case, which was quite unpredictable and hard to test, the second one is quite obvious. According to rule 3.17, which states:
SubscriptionMUST support an unbounded number of calls to
requestand MUST support a demand up to 2^63-1 (
java.lang.Long.MAX_VALUE). A demand equal or greater than 2^63-1 (
java.lang.Long.MAX_VALUE) MAY be considered by the
Publisheras “effectively unbounded”
the next piece of code should be valid:
Despite the fact that above example is quite illogical, it is still valid. However, in that case, we will break our publisher because of number overflow. To prevent the mentioned case, before adjusting the demand we should verify if the sum is not overflowed. However, this action leads to the previous problem again. To solve this problem we required to replace
if (demand.getAndAdd(n) > 0) with old-good CAS-Loop pattern (wiki):
Let’s analyze the code:
- Thread-Safe value retrieving with guarantees that subsequent loads and stores are not reordered before this access. (JDK 9 docs)
- Additional validation to ensure that value adjusting is necessary.
- Additional value validation on number overflow case.
- Thread-Safe conditional CAS operation. This code block prevents value collision and ensures that local initial value and in-shared memory one are the same, otherwise — repeat.
- Additional value validation that prevents recursion (rule 3.3) if
currentDemandis greater then zero.
Now, if we run new test along with TCK tests we will found that everything is fine and all tests are passed.
To summarize: Throughout this topic, we have taken an overview of the plain implementation of Publisher. We have seen the corner cases, which points we should pay attention to first, and which toolset will be useful during the verification stage. As we have seen, TCK helps us a lot to solve issues in our code. However, TCK is not a silver bullet and missing some corner cases. Nonetheless, TCK will be a very useful as integration test-suite for the most of Reactive-Streams rules. Overall, the Reactive Streams specification is not rocket science, and with proper attention, every experienced Java developer may create their own implementation.
In the next parts of this topic will be covered:
- Implementation of custom Subscriber;
- Implementation of the Async StreamPublisher;
- Implementation of own RxJava like library;
The source code of the solution may be found in GitHub.
To learn more:
- David Karnok Blog — http://akarnokd.blogspot.hu/2017/03/java-9-flow-api-asynchronous-integer.html
- Source code of the same operator inside reactive library — https://github.com/akarnokd/Reactive4JavaFlow/blob/master/src/main/java/hu/akarnokd/reactive4javaflow/impl/operators/FolyamStream.java