Mastering own Reactive-Streams implementation. Part 1 - Publisher.

Disclaimer: Rule number 1 of the Reactor club: don’t write your own Publisher. Even though the interface is simple, the set of rules about interactions between all these reactive streams interface is not. — Simon Baslé (Pivotal, Project Reactor).

I believe that most of the readers have already seen the Reactive Streams Specification with an understandable four interfaces. From the first look it seems that the implementation those interfaces should be simple and maybe every one tried or wished to try to create own implementation of those interfaces, despite the fact that there already exists a few reactive implementation such as Project Reactor or RxJava. In fact, it is not recommended without in-depth knowledge to implement interfaces, declared in Reactive Streams. The main pitfalls are hidden behind the huge list of rules, that specify strict behavior for the interfaces. Nonetheless, during the writing of one of the chapters for my book Reactive Programming with Spring 5, I decided to create own implementation of Publisher, Subscriber, Subscription, and Processor and share the steps with which I faced during that challenge. Through this topic, I will share basics coding steps, pitfalls, and patterns that will be useful for future implementors.


In the first part of the topic, we will implement the Publisher interface. The initial idea that came to my mind was creating the Publisher-wrapper for Java 8 Streams. The first naive implementation with which we will start looks like next:

Here, we just implement Flow.Publisher<T> interface and override subscribe() method. As you might notice from the example, instead of Reactive Streams Specification here is using JDK9 Flow API. Since both interfaces are identical, it does not matter which will be used as a base. The other point that should be noticed is the behavior of our Publisher. In our case, there is no dynamic data streaming. Thus, the original behavior of Publisher is cold. That is meant that each Subscriber will receive the same data. To satisfy that requirement the implementation should create a new instance of java.util.stream.Stream<T> for each new Subscriber. The good candidate to achieve that behavior is to have Suplier<T> of Stream instead of plain Streaminstance. That is why the constructor receives Suplier<T> as a parameter and store in the final field.

Now, we have the essential understanding of Publisher behavior, and we can move forward to functional implementation. Let’s try to pass some data to subscriber naively. The next example shows the implementation of subscribe() method:

In line 12, the Stream is pushing data to the onNext() method of Subscriber. The next line calls onComplete() after successful iteration, otherwise, will be executed the line which calls onError() method.

Now, let’s create some simple verification, that our solution works:

And finally, if we run our solution we will see the next output:

1
2
3
4
5
6
Completed

It seems that it works. However, is it a truly working solution from the Specification perspective and how to verify that all specified rules are properly incorporated? It is not an easy job to prove it. Just to correctly build corresponding test suite may take much more time than an implementation of the working code for Reactive Streams. However, to our luck, the toolkit for that purpose has already been implemented by Konrad Malawski and got the name Reactive Streams Technology Compatibility Kit or simply TCK. TCK is defending all Reactive Streams statements and testing corresponding implementation against specified rules. TCK includes a few useful test-classes which cover all corner cases for all defined interfaces. In our case, will be helpful the next abstract class org.reactivestreams.tck.PublisherVerification<>. Let's create a test to verify our StreamPublisher<> implementation:

To get a better understanding of this test, would be nice to have a brief overview of conceptual points of the verification before we have run it. To correctly execute the test, we mandated to override two abstract methods and correspondingly provide instances of working Publisher. In the first case, Publisher should produce some data and completed successfully. Also, for the successful path, we should consider the incoming parameter elements, which represents the amount of generated data. Along with the successful one, the test asks to provide failed Publisher, which should produce error result. It is some basics, minimum configuration of test suite required to start.

Ok, after a brief intro to TCK it will be nice to check the tests results:

Yup, it looks terrible. 38 tests were run, 15 tests of those were skipped, 20 tests were failed, and only three were accidentally passed. Now, the implementation looks completely incorrect.

Nevertheless, let’s try to fix them all and appropriately implement our publisher. Let’s start with the simplest test, which is called required_spec109_mustIssueOnSubscribeForNonNullSubscriber. Three digits after 'spec' stand for the 1.09 rule:

Publisher.subscribe MUST call onSubscribe on the provided Subscriber prior to any other signals to that Subscriber and MUST return normally, except when the provided Subscriber is null in which case it MUST throw a java.lang.NullPointerException to the caller, for all other situations the only legal way to signal failure (or reject the Subscriber) is by calling onError (after calling onSubscribe).

To satisfy this rule, we required in calling onSubscribe method, before any other calls. Let's implement this:

After the running of the test, you might found that the implementation is still incorrect even the method call is added. The problem still exists because it is illegal to pass null Subscription to Subscriber. So, it will be nice to implement minimum valuable Subscription and pass it to the Subscriber:

Great, that change has fixed our test. However, we still have a lot of failed verifications. Now, let’s move to the rules with a bit higher complexity. One of the central concepts of Reactive Streams is to provide transparent backpressure control. For that purpose, the specification introduced the new interface, which is called Subscription.

The notion 'new' is using here to emphasize that this interface was missed in the initial implementation of Observable-Observer model in RxJava.

Subscription has one important method to inform Publisher about its demand. This method is called request() and is an important feature of Reactive Streams. Along with standard Push model, it allows to mix Push and Pull model. The current implementation of StreamPublisher supports only Push model. To satisfy hybrid model we need to provide some mechanism that may push data only on corresponding demand. To find the correct place where this mechanism will be fitting well we need to analyze Specification. Appropriate rule, which may hint us where the logic should occur is the rule 3.10 which states as:

While the Subscription is not cancelled, Subscription.request(long n) MAY synchronously call onNext on this (or other) subscriber(s).

Regarding that statement, the best place to put the logic is in Subscription#request itself. It may be reasoned by the current idea of our Publisher which aimed to merely wrap Stream<T> without additional parallelization. Also, the good technique is putting Subscription implementation as an internal class. Next code snippet shows the new iteration in our Publisher implementation:

Here we created an internal class which is responsible for handling Subscriber demand and pushing data. Since each new Subscriber receives new Subscription, it is valid to store some progress or state inside Subscription. Also, we need somehow push data to Subscriber. Unfortunately, Stream does not have explicit API, which allows fetching data in ‘give me next’ fashion. Exactly that is why it is unreasonable to store Stream instance explicitly. The only one logical way is extracting Iterator and using its API for that purpose. Thus, we will save Iterator<> as the local state.

All right, we got a basic understanding of the state, which will be stored inside Subscription. Now it is time to move to request method. Here we create for-loop that allows handling onNext method of Subscriber. Then, when demand is satisfied we are checking if the iterator has not finished yet and then notifying subscriber about completion if the statement is true. Now, let's run the tests to recheck our code:

Just by analyzing failed tests the one missing thing may be noticed here. That thing is violation or rules 1.07 and 3.06 which says that terminal signal must be emitted only once. Thus, we should implement a flag, which will be responsible for termination state indication:

In the mentioned part of the code, we have introduced two additional methods and AtomicInteger field. In that case, an extra field is responsible for preserving the current state of Subscription, and control concurrent access via atomic behavior. terminate() method is reliable for termination of Subscription and returns the previous state. The returned result may be useful to check if Subscription has not already been terminated and thereby prevent violation of rule 1.07. Just by adding this additional flag and modifying the code a bit we may resolve at least six tests! Also, let's fix minor issues related to rules 3.09 and 1.04. The first rule says that negative or zero demand should complete with onError termination and is the easiest one. To satisfy it enough just add the next part of the code in request method:

In turn, rule 1.04 is more complicated. It says that all Publisher errors should be emitted via onError and should be emitted only after onSubscribe method has been called. Regarding our implementation of failed Publisher, the only place when an error may be thrown before onSubscribe is in Subscription constructor:

To postpone error throwing and emitting it is required to wrap the statement in try-catch block and store exception in the local field. Since an error should be thrown immediately after subscription, we should handle it manually, after onSubscribe call:

Moreover, since the iteration may be a potential point of failure it will be useful to wrap subscriber.onNext(iterator.next()); in try-catch block as well. With mentioned improvements StreamSubscription will look like next:

All right, we come to the final point, where there are only two tests are failed. Both tests are field because of the identical root cause which is recursive calling between request()->onNext()->request()->onNext(). To avoid recursion, we mandated to check the state of execution and somehow verify if the request is called from the onNext method. When I tried to solve this problem the simplest idea that came in mind was creating a separate field, which would be responsible for preserving demand and indicating recursion state at the same time. Before we have got into details let’s take an overview of the solution:

Here, we added field demand, in which the requested number of elements will be stored. Also, to prevent recursion, we added if statement and check if demand is greater then zero. This assumption is valid because of the rule 3.01, which says that only Subscriber has access to Subscription, and request() method must be called inside Subscriber context. Also, it is good to emphasize that the request method may be called concurrently, and it is important to adjust the demand in a thread-safe manner. For that purpose the type of demand field is AtomicLong. Finally, the for-loop statement was a bit modified for atomic API usage.

If we run TCK against the current implementation, we will find that all tests have been passed, which was our primary goal!

However, this implementation contains some hidden, very dangerous pitfalls, that may not be noticed from the first glance (thanks to David Karnok, who highlighted those points). To understand the problem let’s imagine high-concurrent access to request() method. The first problems that come in that environment depicted in the next piece of code:

Here, two separate threads are concurrently working inside the body of request() method. That situation is allowed from both Reactive Streams specification (rule 2.7) and JMM specification. In this example, the Thread B is near the finish of the looping. At the same time, the Thread A has just finished thread-safe validation inside if-statement. May happen that between if-statement and the subsequent demand.getAndAdd(n) instruction may be executed instructions from the concurrent Thread B. In that case, we come to the point when both threads exit the request() method, and a subscriber will have received no data until the next request() call.

This issue may be fixed by enhancing the code in a next way:

Here, three redundant code lines were eliminated and replaced with one atomic CAS operation. getAndAdd() returns previous value which in turn allows correct demand validation and prevent violation of rule 3.3. Finally, all test will be green if we run it.

Despite the fact that all TCK verifications are passed, there is another case that is still missed in the verification suite. In comparison to the previous case, which was quite unpredictable and hard to test, the second one is quite obvious. According to rule 3.17, which states:

Subscription MUST support an unbounded number of calls to request and MUST support a demand up to 2^63-1 ( java.lang.Long.MAX_VALUE). A demand equal or greater than 2^63-1 ( java.lang.Long.MAX_VALUE) MAY be considered by the Publisher as “effectively unbounded”

the next piece of code should be valid:

Despite the fact that above example is quite illogical, it is still valid. However, in that case, we will break our publisher because of number overflow. To prevent the mentioned case, before adjusting the demand we should verify if the sum is not overflowed. However, this action leads to the previous problem again. To solve this problem we required to replace if (demand.getAndAdd(n) > 0) with old-good CAS-Loop pattern (wiki):

Let’s analyze the code:

  1. Thread-Safe value retrieving with guarantees that subsequent loads and stores are not reordered before this access. (JDK 9 docs)
  2. Additional validation to ensure that value adjusting is necessary.
  3. Additional value validation on number overflow case.
  4. Thread-Safe conditional CAS operation. This code block prevents value collision and ensures that local initial value and in-shared memory one are the same, otherwise — repeat.
  5. Additional value validation that prevents recursion (rule 3.3) if currentDemand is greater then zero.

Now, if we run new test along with TCK tests we will found that everything is fine and all tests are passed.

To summarize: Throughout this topic, we have taken an overview of the plain implementation of Publisher. We have seen the corner cases, which points we should pay attention to first, and which toolset will be useful during the verification stage. As we have seen, TCK helps us a lot to solve issues in our code. However, TCK is not a silver bullet and missing some corner cases. Nonetheless, TCK will be a very useful as integration test-suite for the most of Reactive-Streams rules. Overall, the Reactive Streams specification is not rocket science, and with proper attention, every experienced Java developer may create their own implementation.

In the next parts of this topic will be covered:

  • Implementation of custom Subscriber;
  • Implementation of the Async StreamPublisher;
  • Implementation of own RxJava like library;

The source code of the solution may be found in GitHub.