Streaming with Spring Cloud

Bert Sanders
Walmart Global Tech Blog
7 min readJul 24, 2020
Spring Cloud Stream
Photo by Michael Brunk on Unsplash

Writing asynchronous messaging applications and eventually-consistent systems provides many advantages, including:

  • Maximization of throughput — especially in cases where immediate consistency is not required
  • Offloading of workflows to provide an immediate response to a user
  • Ability to scale various portions of the workflow differently as needed
  • Support of backpressure around system bottlenecks

Modern platforms and frameworks have made writing asynchronous messaging systems very easy. Recently, I have had some time to explore Spring Cloud Stream, and I have come to find that it has many advantages over other frameworks.

“Spring Cloud Stream is a framework for building highly scalable event-driven microservices connected with shared messaging systems.

“The framework provides a flexible programming model built on already established and familiar Spring idioms and best practices, including support for persistent pub/sub semantics, consumer groups, and stateful partitions.” (reference)

Recently, Spring Cloud Stream introduced several new features that have made it an easy option for processing asynchronous messaging streams. There are two in particular that I want to focus on.

  • Functional paradigm for message processing through spring-cloud-function
  • Simplified testing with an improved test binder

Background

Spring Cloud Stream evolved out of the Spring Integration project as it embraced Spring Boot and moved into the modern cloud era. Spring Cloud Stream relies on the concept of a binder to handle the integration with a messaging or event streaming framework. At the time of publication, there were binders for Apache Kafka, Kafka Streams, Rabbit MQ, Azure Event Hubs, Google Cloud PubSub, and many others (see the list).

The binders support the core features of modern messaging/eventing systems:

  • Publish/Subscribe (multiple consumers of the same data)
  • Consumer Groups (horizontal scaling of a single consumer)
  • Partitioning
  • Message-driven consumers
  • Polling-based consumers

Binder

A binder handles the integration with a single framework. This abstraction allows your code to be middleware-neutral, focusing only on core business logic. The application core only needs to interact with a canonical message.

Binders can be used on both the input and output side to interact with producers or consumers of data. An application can use more than one binder if it has multiple message destinations. (Note, in Spring Cloud Stream, the term destination is used to refer to an input or output source of messages. This can be confusing on first pass.)

Spring’s documentation does a good job of explaining the concept of a binder and how to configure one.

Each binder implementation contains configuration options for setting up features specific to it, such as concurrency, partitioning, and error handling. You should check the specific documentation for the binder(s) if you need this information.

Functional Paradigm

One of the key concepts of Spring Cloud Stream is its functional paradigm. Consumers, Producers, and Streamers of data can be defined with Java’s functional interface. Define any number of Functions, Consumers, or Suppliers as Spring beans, and these can be configured as streamers, consumers, or producers of messages respectively. This can be done completely with configuration (no boilerplate code!). The functions can even be composed together to create complex workflows, and since it is done with configuration, Spring Cloud allows these to be changed on the fly!

Spring Cloud Stream has a great Quick Start guide that I recommend for getting introduced to the project.

Basic Configuration

Spring Cloud Stream makes it very easy to configure common binding properties regardless of the broker. With a few lines of configuration, an application can support:

  • Consumer groups across multiple nodes
  • Retry logic based on specific errors with various options for back-offs
  • Concurrent consumers
  • Data transformation

Testing

Historically, testing asynchronous applications such as message processors has been extremely difficult. Tests should follow the 3A Pattern (Arrange-Act-Assert), also frequently called given-when-then. How do we arrange an automated test that consists of an asynchronous workflow whose completion will happen in parallel to test execution?

This test will fail because the workflow is incomplete when the Assert block runs

The goal is to write integration tests that will test the application’s integration with its frameworks and infrastructure in a non-brittle, deterministic way.

Trying to test asynchronous messaging systems inevitably leads to one of two compromises. The first compromise involves not testing the actual integration with your chosen framework and/or platform by mocking the framework. This may look like a unit test calling the code that runs asynchronously in a synchronous manner. This is certainly an essential part of the test suite. On its own, this may be fine for simple workflows. However, most organizations eventually find a need to test a scenario that requires the framework to participate in the test, such as complicated error-handling and retry scenarios, serialization and versioning complexities, and performance testing and tuning. Rather than just hoping these work in production or resorting to manual testing for these scenarios, we should find a way to automate tests for the entire application.

This eventually leads to the second compromise, littering tests with code that forces execution to yield and wait, hoping that the asynchronous workflow completes in the time allotted. Often, this will include a Thread.sleep with possibly some loops thrown in for good measure. This is a trial-and-error process, finding the right balance of how long to sleep while completing the test in a reasonable time-frame. But what happens as a workflow gets longer or when the test runs on a slow machine? The tests suddenly break, and you re-tune.

The goal is to write integration tests that will test the application’s integration with its frameworks and infrastructure in a non-brittle, deterministic way. What this means is a suite of integration tests fully leveraging the application’s framework, using test fixtures to simulate the dependent infrastructure, that will signal to the test when the asynchronous Act block of the test is complete so that the Assert block can commence.

This test succeeds because the Assert block waits for the completion of the workflow

Spring Cloud Stream provides a Test Binder that makes this effortless. The test binder acts as an in-JVM message broker, allowing testing with a real binder without the networking. With the test-jar on your testing classpath, Spring Boot will auto-configure both an InputDestination and an OutputDestination, allowing the test to produce and consume messages, as appropriate for the flow being tested.

Another advantage to this test binder is that the producer is implemented as a BlockingQueue, allowing the workflow to execute synchronously. This allows the asserts to be executed without any extra code to wait on the result, which satisfies the desires described above.

Limitations

The only limitation to this approach is that it prevents testing any logic specific to the binder implementation. This is because the specific binder or binders are replaced with the TestBinder. This would include features such as dead-letter routing, batch consumption, message expiry, and quorum.

Sample Implementation

Consider the linked sample project. It contains a simple streaming application that reads from an input topic and produces to an output topic. Take note of the features:

  • A single Spring bean called calculateNthPrime that is a java.util.Function that accepts an integer and returns a BigInteger .
  • The function used for this bean calls to another Spring bean primeNumberService . This bean exposes a method to calculate the Nth prime number. This was simply chosen as an example of a method that might take some time to complete, demonstrating the blocking nature of the TestBinder .

The application.yml is used to setup the application to read and write to the topics.

  • spring.cloud.function.definition defines the function bean that will be used within the application.
  • spring.cloud.stream.bindings defines the specific binding(s). The binding name prefix matches the function name.
  • A binding is defined as either an input or output by virtue of having -in or -out following the function name within the binding.
  • A binding also has a sequence number, allowing multiple bindings to be chained.
  • A binding defines a destination. The destination is the topic the message is read from or written to. (Remember, even a consumer calls its topic a destination.)
  • Each binding also demonstrates some basic configuration, such as consumer groups, concurrency, retry, and backoff. The retry configuration also defines particular exceptions which should not be retried.

Note that errors are routed to an error channel where they can be handled by the specific binder.

Testing

The project also has two Spring Boot Tests.

  • The primary test checks the basic flow of the application. It produces a set of random numbers to the topic, reads in the results, and asserts that the number given is prime. If any number is not prime or the service fails, the test will fail.
There is no need for waiting between the send and receive. The TestBinder takes care of that.
  • The retry test checks what happens on error cases. It uses Spring’s MockBean to simulate both a retryable and non-retryable exception, verifying that the expected behavior occurs.
Each test verifies that the primeNumberService is invoked the appropriate number of times to indicate whether the message was retried or immediately put into an error channel

View the full sample implementation: https://github.com/bertsanders/spring-cloud-stream-demo

--

--