Part II. Building Next-Gen Event-Driven application powered by Stateful Functions

Tymur Yarosh
DevOops World … and the Universe
7 min readJun 26, 2021

The engineering discipline has a significant impact on our society. Right now, you’re reading this text delivered to your screen in the blink of an eye. Similarly, in the blink of an eye, an invisible disease reached all corners of the world. Did you ask yourself whether an engineer can stop it? This read is the second part of a saga where we fight infection spreading with Stateful Functions. In the last part, we considered the architecture, so it’s time to get our laptops hot, hearts brave, and eyes red. Let’s implement it!

Disclaimer

This article is not a typical “how-to” with very detailed text and an optional project repository. It’s rather a project on GitHub with the optional text. You can read it to better understand the concepts, though feel free to skip it and delve right into the source code. For simplicity, I consciously ignore some corner cases to focus on the technology rather than the domain. Kudos to Pavlo Kurachenko for his invaluable contribution.

Project structure

I will focus your attention on two significant parts of the project: remote module and statefun cluster. Let’s overview the module first:

Project structure

Person and sector are cohesive packages containing functions, types they store, commands, and events. Spring package is responsible for the integration of Stateful Functions with the Spring context.

Types

The application serializes all types into JSON using Jackson. It enables application clients to communicate using a convenient, human-readable messages. Though it’s a benefit for application contract, it means nothing to internal types that functions use to communicate with each other and persist. In real-world applications, you may want to switch internal types to Protobuf to reduce the size and increase performance, still JSON is an excellent way to go.

Take a look at the LocatePersonCommand

TYPE constant is used widely as a reference to this type. It contains namespace, type name, and SerDe (Serialization / Deserialization) functions. Find the example usage in the PersonFn function:

Besides message.is(TYPE) and message.as(TYPE) that checks whether the message contains a given type and then extracts the command, there is one more thing I would like to mention.

Functions

There is a typical pattern to handle messages in Java SDK of Stateful Functions 3.0:

if (message.is(TYPE)) onType(context, message.as(TYPE)).

It’s the most straightforward approach to handle few types of incoming messages. For more complex domains, you may want to either implement a more advanced strategy on handling different message types or divide a large function into smaller ones.

So, we have two functions: PersonFn and SectorFn. I’ve found suffix *Fn helpful, but it’s up to your team to decide on the naming convention. PersonFn keeps the person’s current sector and all people whom this person met in recent two weeks. In case if this person becomes infected, people in the state receive a notification.

The SectorFn function keeps people who are currently in the sector. If a new person enters the sector, everyone receives a notification and updates a list of people met recently. Here is a sequence diagram to better understand the communication:

Commands & Events

Before we delve into implemented messages, let’s clarify two important concepts: commands and events. A command is a request sent to a function. It contains data required to act and has a known recipient. An event is a notification about something that was already happened. Events don’t have a specific recipient.

LocatePersonCommand — when PersonFn receives this command, it compares the new sector to the current one. If sectors differ, the function sends QuitSectorCommand to the recent SectorFn and EnterSectorCommand to the new one. Besides this communication, it saves a new sector to the state.

SavePersonsCommand — when PersonFn receives this command, it upserts given people into the log of people it’s met in recent weeks.

AlertInfectionCommand — when PersonFn receives this command, it sends AlertInfectionRiskCommand to all people it’s met in recent weeks.

AlertInfectionRiskCommand — when PersonFn receives this command, it sends PersonMightBeInfectedEvent to the egress.

PersonMightBeInfectedEvent — this event is used to notify external systems about the person that might be infected. The probable usage is to ask this person to test and self-isolate before it infects more people.

PruneMeetingLogCommandPersonFn sends this command to itself with a delay on the first LocatePerson command and repeatedly sends this command itself when log pruned. It’s used to remove people that weren’t in contact in the recent two weeks from the state.

EnterSectorCommand — when SectorFn receives this command, it adds a new person to its current visitors.

QuitSectorCommand — when SectorFn receives this command, it removes a person from its current visitors.

Messaging & State

All the time, I mention commands, events and state. Let’s overview messaging and storage API.

To send a message, you need to know the type of recipient, its id, and the message you’re going to send:

Or for a message to egress:

Easy, right? State storage has a clean API as well:

Storage.get(..), Storage.set(..) — yes, it’s that easy!

Spring Integration

The remote module receives RPC requests from Stateful Functions job. Java SDK provides RequestReplyHandler to parse those requests, forward them to a particular function, and generate a response. There are numerous ways to handle HTTP requests in Spring. Let’s use the well-known RestController for the sake of simplicity.

The handle method accepts POST requests on the “/{functionName}” and delegates processing to RequestReplyHandler.

RequestReplyHandler is registered as a bean that knows about our functions:

It’s:

  1. Create StatefulFunctions.
  2. Find beans annotated with @Statefun
  3. Create StatefulFunctionSpec for each function
  4. Register spec in StatefulFunctions

Our functions have to be annotated with @Statefun and implement StatefulFunctionSpecFactory:

Statefun Cluster

Statefun cluster consists of Flink masters and workers. The easiest way to start it for local development is to attach module configuration to recommended base Docker image and launch everything using Docker compose.

Attaching module configuration:

The module configuration itself:

There are a few things that require your attention here:

module.spec.endpoints[0].endpoint.spec.functions — Functions’ namespace here.

module.spec.endpoints[0].endpoint.spec.urlPathTemplate — URL where sits our remote module.

module.spec.ingresses[0].ingress.spec.address — Kafka host.

module.spec.ingresses[0].ingress.spec.topics[0].topic — Topic to consume messages from.

module.spec.ingresses[0].ingress.spec.topics[0].valueType — Type of the messages in the topic.

module.spec.ingresses[0].ingress.spec.topics[0].targets — What functions to invoke when a message arrives.

module.spec.egresses[0].egress.meta.id — Functions use this id to specify this egress as a message recipient.

Please refer to the official documentation for the explanation of other properties.

You may wonder where Stateful Functions job gets the id of the target function as there is only a type of function configured for the ingress. The answer is in a Kafka message. The key of a message is the id of the function that will be invoked.

And the docker-compose.yml:

Launch & Test

We need both Statefun Cluster and the Spring Boot application up and running:

To stop the cluster and the application, use the following commands:

The whole app in action:

Egress consistency

Both ingress and egress are configured via module.yaml file. While the ingress sets up entirely in one place, the module config is lacking some properties for the egress. Remember this code?

Key and the topic are set via the messaging API. While it enables us to switch topics without downtime, each function can set its own topic and can have different key detection strategies. A clear tradeoff. I see a few ways to ensure consistency across egress messages.

Implement a specific gateway function to forward messages to the egress

Pros:

  1. Ensures consistency across separate namespaces implemented as different applications

Cons:

  1. The function neither has its own state nor implement a business model. Does not fit a modeling approach Stateful Functions are good at.
  2. Concurrency is limited to a single invocation at a time for a single function id. Some awkward mess is required to scale.
  3. Inefficient since introduces an additional step that is including some I/O

Implement a code abstraction

It might be implemented as an egress interface:

or as a message factory:

Pros:

  1. Extremely efficient since it doesn’t require a mediator function
  2. Easy to implement and use across a single namespace

Cons:

  1. Special care required to achieve consistency across multiple namespaces in separate applications

Design models the way that particular egress can be accessed via a single function type only

Pros:

  1. The most natural approach for Stateful Functions
  2. Unlimited concurrency since a function does not wait until other functions send their messages
  3. Does not require any artificial utility components in the codebase
  4. Ensures consistency across multiple namespaces

Cons:

  1. Cannot fit all possible business requirements

So, what to choose? The last one is the way to go if your domain can be modeled using this approach. Otherwise, I would suggest implementing a code abstraction. A gateway function seems to me as an antipattern. Please let me know your opinion in the comments.

Cliffhanger

You’ve just read a brief explanation of the core concepts of Java SDK for the remote Stateful Functions module. I highly encourage you to fork the repository and implement some new commands and events to learn Stateful Functions in action. Don’t hesitate to leave a comment if you have any questions. You are not alone in this exciting journey.

In the last part of this series, I’ll show how to implement Spring Boot Starter to enable seamless integration of Stateful Functions with Spring Boot application.

--

--