Long running orchestrations using event sourcing in F#

Tackling the complexity of managing long running processes with functional programming

Frederick Swadling
Creating TotallyMoney
15 min readAug 17, 2022

--

Tldr;

Write long running state machines in F# in a manner combining the flexibility of Rx style stream operations with the simplicity of a procedural appearing async / await style:

Orchestration vs Choreography

A common problem encountered when building microservice based systems is how to deal with orchestrating workflows among services. The problem is that business logic can find itself getting chopped up and distributed across multiple services, which makes it difficult to maintain and make changes; instead of having to understand just one service, the developer must now have intimate knowledge of several. This pattern is known as choreography; each service is not necessarily aware of what is happening in other services, it only knows what to do when it receives instructions, and the full business logic emerges from all the services working together.

An alternative approach to tackle the inherent complexity of this is orchestration; instead of being distributed, the business logic is kept together in one service which acts as a controller, orchestrating and delegating work to child services. This typically takes the form of long running workflows. It is important to understand here that long running means potentially running over very long time periods, weeks, months, sometimes even years.

It should be noted before continuing that there is nothing particularly wrong with choreography; it’s fine for simple cases, and often the extra layer of infrastructure needed to support orchestration simply isn’t worth it. However, when business logic gets particularly complicated, switching to orchestration is a good idea.

It should also be noted that the problem of long running processes is not limited to the domain of microservice orchestration. Lots of domains struggle with the problem of defining workflows that run over a long period. User workflows in the frontend present a similar problem, albeit on a much shorter timescale.

Typically developing a long running workflow means implementing some sort of state machine. This approach is fine, but on its own can get hard to read. Generally, I prefer workflows to read in code as I would define them in a document; reading left to right and top to bottom. Large state machines can end up reading like one of those “choose your own adventure” books, where you constantly must navigate back and forth to understand what’s going on.

Existing technologies

Before getting into the solution proposed in this article, I will quickly round up the existing technologies and explain my issues with each of them. Despite my own issues, I would recommend looking and seeing if existing solutions suit your needs before embarking on my own somewhat experimental approach.

AWS Step Functions

At TotallyMoney we develop on top of AWS. The service provided by AWS specifically for this problem is AWS Step Functions. This allows a developer to define an orchestration using a visual tool. Personally, I don’t particularly like this approach. Companies have been trying to push visual workflow designers onto developers for a very long time now, and they always run into the same problems; visual workflow designers simply do not scale well with complexity. This is why WF never really took off with .NET, and I remain unconvinced now. Another problem is that in order to work within the serverless framework, the workflows must be defined in yaml format. In this format the workflows are both fragile (yaml bugs are probably the most significant source of errors I encounter these days), and hard to read.

Azure durable functions

A slightly more developer friendly approach offered by Microsoft on Azure. Durable functions essentially work by overwriting the behaviour of async / await. Normally async and await are syntactic sugar around a builder pattern for a Task, which is itself an abstraction of a kind of state machine. The developer can write code that appears procedural, reading easily from left to right and top to bottom, but without blocking to wait for asynchronous processes to finish, and without the “Pyramid of Doom” typically associated with nested callbacks. Durable functions essentially change the behaviour of the Task Parallel Library, instead of adding an abstraction layer over asynchronous behaviour, they abstract over queues and storage in azure. When the user calls await on a task, the whole process will stop, and be restarted when that asynchronous process has returned something.

A key aspect of durable functions is that although they appear similar to normal async / await, they are a fundamentally different beast. One way this manifests is that when an asynchronous awaited task returns in durable functions, the whole orchestration runs again from the beginning, with previously retrieved results being memoized and stored in azure storage. Indeed, it has to work this way; unlike in normal async functions, the process running the orchestration shuts down between waiting for responses. A solution where the process is kept running over long periods would be very wasteful, and very fragile. Because of this, durable functions must be deterministic; you can’t have an orchestration function whose code goes down a different path on different invocations within the same workflow instance due to some outside side effect; for example, an if else statement based on the current date.

I like durable functions, and they would probably be my first port of call, unfortunately they are an exclusive azure feature, and as already mentioned, we work in AWS, so this solution is out.

Third party services: Temporal / Cadence

There are also some third-party providers of solutions to provide this kind of functionality. Temporal and Cadence are both third party services that provide a very similar service as Azure durable functions, but while durable functions are built into the azure cloud infrastructure, these services must be hosted on Kubernetes. For my purposes, the extra infrastructure costs of running Kubernetes and managing that cluster is simply too much effort. There is also the matter of cost; these services are not free, and on top of that, these services restrict the user to a small set of languages that they support.

The problems of an async / await model, introducing Rx

There is another problem associated with an async await model that I haven’t mentioned so far. This is not a problem exclusive to long running workflows, but actually a problem inherent in async / await models more generally; what happens when an asynchronous event returns more than once? What if instead of simply dealing with one-time events, we are dealing with event streams?

This actually already has a very good solution, the Rx library. Rxjs is typically used in front end development, in particular with Angular. The front end must deal with event streams all the time. For example: implementing a lazy load triggered by a scroll bar hitting the bottom of the page. The scroll bar hitting the bottom is a sequence of events; you have to transform the entire stream when implementing the load, ignoring events that arrive while a load is already in progress.

As it turns out, the business of orchestrating long running workflows is quite similar, we don’t just respond to singular events, we respond to event streams. An async / await model is fundamentally limited to very linear workflows; you await one event, and then move onto the next step. A system like Rx, which allows us to define workflows with streams as our basic unit would be much more flexible and powerful.

Aim

With that all understood, I can now outline what I hope to achieve in this article. I want to develop a system to manage long running workflows in a way that allows them to be defined in a simple, easy to maintain manner that reads left to right, top to bottom. I aim for a solution that is completely generalised and platform agnostic; it will not be responsible for managing the underlying infrastructure required, the developer will have to bring that themselves. Furthermore, I aim to provide a more flexible and powerful approach to a simple async / await model, taking inspiration from the Rx library. This will be developed in F#, a functional .NET programming language that I have the pleasure of working in at TotallyMoney, and whose unique advantages I hope to showcase.

This solution will be based on the concept of Event Sourcing; the state of a running workflow will be stored and maintained a sequence of saved events, with the state of the workflow built up from the already stored events whenever the workflow is called.

Before I begin, I would like to clarify that this is somewhat of an experimental idea and it hasn’t been battle-tested. I encourage approaching this with some degree of scepticism.

The basic coordination type

I have called the basic type a Coordination. This is defined as below:

A coordination is essentially a function that takes a generic event, and produces some results, as well as the next step in the coordination. The result is a list. In other words when a coordination has an event applied to it, it can produce some results, or nothing. The next step is itself an option of a coordination. If it contains a coordination, then another event can be applied. If not, then the coordination is considered finished.

This type then clearly encapsulates the idea that a coordination can return results when given an event, and that the next step in a coordination depends on the events that have arrived so far.

Some utilities

Next, I will define a map function. This is a function that takes a coordination, and a function that transforms the result of our coordination to a different value, and potentially a different type:

I will also define empty and retn functions; these are simple utilities that allow us to create a coordination that does nothing, and a coordination that just returns a single given value respectively:

Now I will add some slightly more advanced utilities. These are the filter, choose and take functions:

Filter takes a coordination and a predicate function and produces a new coordination that returns only the results returned by the original coordination that satisfy the predicate. Choose is a slightly more advanced version of filter, that essentially combines a filter and a map. Take takes a coordination and a number and creates a new coordination that only returns up to the number of elements specified as an argument.

For the sake of demonstration, here are a couple more operators that have equivalents in the Rx library:

DistinctUntilChanged will cause a coordination to return only distinct results following one another (i.e. subsequent equal results are filtered out, but equal results separated by a change down the line will still be shown). TakeWhile will create a coordination where results are returned while a condition is satisfied, and at the point where the condition is not satisfied will terminate.

Lastly, I will define an event function:

Note: The business of serialising and deserialising events is done outside of our coordination. I recommend using an off the shelf solution such as FSharp.SystemTextJson.

Our first coordination

Now we have the basic tools we need to define simple coordinations. For example, say we define our system to have events:

We want to have a coordination that takes the first event received of type EventB, returning its content. We could define a coordination for that such as below:

Now given a list of events, as we apply the events one after the other, the first event of type EventB will emit a result, and all subsequent events will emit nothing.

Introducing higher order coordinations

What we have so far isn’t particularly useful; after all, the systems we build usually need to coordinate over multiple events of different types. What happens if upon receiving the result of an event, we want to use it to define the conditions for awaiting the next event? If we used map, we would end up with something like this:

This returns a Coordination<string, Coordination<string, int>>. This is called a higher order coordination; when passed events, it returns coordinations as results.

In order to continue, we need to somehow flatten this into a regular coordination. In functional programming, such a function is called a bind. A similar example would be a LINQ’s SelectMany function for dealing with collections of collections. Rx actually defines four binds for dealing with event streams: SwitchMap, MergeMap, ConcatMap and ExhaustMap. All are useful, although I would encourage deferring to one of the many great Rx blog posts to better explain the difference between these. A good explanation can be found here. For the purposes of this example, I will focus on the SwitchMap.

The Switch and SwitchMap functions

In Rx, a switch works by essentially “switching tracks” of the current observable. This is illustrated in the following marble diagram:

Borrowed from https://rxjs.dev/api/operators/switchMap, license: https://creativecommons.org/licenses/by/4.0/

For our purposes, we want to accomplish the same thing. Whenever an event is applied that causes the higher order coordination to emit a new coordination as a result, we want to switch over to this new coordination, forgetting the previous coordinations. Such a function can be written using recursion like so:

Great! Now we can write our coordination that uses the result of one event to define the conditions for awaiting the next:

The coordination computational expression

Now if we step back a second, we can think back to the async / await model used in durable functions I discussed earlier. Sure, it’s limited, but it’s still very useful. Quite a lot of coordinations are just linearly waiting for one event and then moving onto the next. With our current tools we could replicate this, just take 1, and then SwitchMap:

This does the job, but I must admit it’s pretty ugly in comparison to a nice simple async / await model. Also things get awkward if I want to refer back to intermediate variables in the chain. In the world of curly braced languages this is just something I’d have to accept in the trade off. But this is F#! and F# has a unique feature called computational expressions that allows me to clean this up a bit. If I define a computational expression as below:

I can now write my coordination like so:

Much nicer!

Appending coordinations, yield and yield! keywords

So far we have used coordinations exclusively to think about workflows where we await a set of events, and then return a single result at the end. This is a useful model, as often what we get at the end is all we really care about. But coordinations can return results at every step, whenever a new event is applied; this can be useful in many scenarios as well.

For example imagine a sequence of events arriving that contain a boolean. We want to trigger a workflow of the first ten false events, then the next true event, then another false event and then stop. Triggering workflows means needing to return a result at those events and only those events.

Essentially here we want to append two different coordinations together. First complete one coordination, and then move onto the next. We can do this with this function:

Thats great, but can we fit that into our computational expression? Yes we can! just add Yield, YieldFrom, Combine and Delay members to the builder type:

Now we can write the above example like so:

Parallel awaits, the zip function and applicative computational expressions

This seems like a nice readable format, but let’s imagine I run this in production, awaiting real events. The events would trigger lambdas that would save the event objects in my data storage (whatever that is), and in turn would trigger the coordination to run. But what I find is that sometimes nothing is returning, the coordination is just stopping.

After chatting with the people in charge of the event sources, the reason becomes clear; eventB does not necessarily come before eventA, usually it does, but there’s no guarantee. My current computational expression cannot support this, as it only deals with events arriving in sequence. Fortunately, F# recently added a new feature just for problems like these; Applicative Computational Expressions. To make use of this, first we will need a function to combine two coordinations running in parallel, this is called the Zip function:

As you can see, it returns the results of two running coordinations together as a tuple. This can be used in the computational expression as so:

Now we can rewrite our coordination to be able to handle the case as described above:

The and! keywords following a let! indicate that these events can arrive in any order. A single let! can be followed by any number of and! expressions to indicate a block of events that can arrive in any order. A subsequent let! means that that block must finish, and all have received an event before the next event will be listened for.

Let’s try a slightly more complicated example to demonstrate what we can do so far:

The first three events can arrive in any order. After that we can conditionally await one more event based on the results of the first three. So already we can setup quite sophisticated coordinations.

From coordinations to orchestrations

So far, I have avoided one critical feature for long running workflows; the ability to actually do things. The coordinations I have defined so far are purely reactive; they can respond to events and produce results based on those events, but we need more than that. We need to be able to actually kick off steps in the workflow that will trigger further events down the line.

It is important to remember that the act of performing some action usually involves some IO, or communication with some external service. In other words, they can fail. As mentioned before, the orchestrations we write must be deterministic based on the events they are passed, so that they always produce the same results with a given input. It’s perfectly fine for actions in our workflow to fail, but those failures should be recorded in the events, and handled as part of the state machine.

To achieve this, actions are not run in the orchestration themselves. Instead, we simply aim for the orchestration to return a representation of what action should be run, and let an external system take care of actually running them and recording the events in the data storage. This has the added benefit of meaning the whole business logic of the workflow can be tested in isolation. In F# the actions (including any relevant data they need) can be represented as a discriminated union:

Now we must ask ourselves how to integrate the need for actions into our coordination model. A coordination simply reads off a sequence of events to produce a result. We can say that if the coordination reaches the end of the stored events, and the coordination is not finished, then we know that an action is probably required to continue the workflow. We can use this information as an argument to the coordination to get an action. In other words, a lack of events can be considered an event! We can represent this by using an option type as our event type.

We also need to consider how we will return an action as a result. We can do this by representing the result of a coordination as a discriminated union. I shall call this a CircuitBreaker, the reason for this is that if we find a step that requests an action, we want to stop the workflow and return the action immediately:

Note: the action type ‘T2 here is a list because an orchestration can potentially return multiple actions. We shall see why later.

Next, we shall add some utilities to aid in using the tools we already have for when writing orchestrations that can take optional events:

And finally let’s try writing an example orchestration:

This code is not ideal. It’s verbose and already we can see a pyramid of doom emerging. In F#, a pyramid of doom is usually a problem that can be fixed with a new computational expression.

To do this properly, I will first define a new Orchestration type:

Then I’ll add another set of functions in the Orchestration module:

And finally, we can create a new computation expression for orchestrations:

Now let’s try creating our workflow again:

Much better. This will create an orchestration that awaits eventA and eventB in sequence. If there is no eventA or eventB in storage, then the caller can pass None instead to get the next action to call, which will be returned in a list as a Break type.

Thanks to our new zip function, we also can use and! as before:

If you call this orchestration with None, it will return a list of both actions in the Break. If you apply one event to the orchestration, then only the corresponding action for the other event will be returned.

Conclusion

Here I have outlined a way to use computational expressions to write long running orchestrations in a way that is very flexible, and completely platform agnostic, but also relatively easy to read and therefore maintain.

As users of Rx will know, I have only scratched the surface of what can be done with Rx operators, however I feel it’s probably best to leave it here with the basics wrapped up. Some further enhancements could be the introduction of functions for the other binds: MergeMap, ConcatMap and ExhaustMap. On top of that there are several other utility functions that wouldn’t go amiss such as TakeUntil, Debounce etc.

Another problem that will emerge (no matter what system you use) that I have not mentioned is managing changes to workflows that are already running. I recommend using a system of cohorts, where changes to a workflow are introduced as a new orchestration, where new workflows start with the latest orchestration, and existing ones continue on the path they were already on.

I have also not mentioned the infrastructure required for running actions or storing events. Currently we use DynamoDB as our storage database for events, but I can also imagine a wide column database with rows indexed on individual workflows might be a good choice.

I have deliberately avoided getting into the details of questions like “How do you actually wait X number of days?” as the answer to these will be platform specific. In our case we store an event with a unique id to signal starting a wait, set an entry in DynamoDB with an expiry time corresponding to when the timeout should end, trigger a lambda on expiry that saves an event with the same id corresponding to a wait finishing, and then trigger the orchestration to run through the events again.

I hope you have enjoyed reading, the source code can be found here:

--

--