Microservices to Workflows: Expressing Business Flows using an F# DSL

Over two years, the Order Management Team built and migrated to a new Workflow System, motivated by some reasons which I explain in an earlier post:

TLDR: Expressing business domains as workflows simplifies development and improves the overall maintainability. Read the previous post for more details :).

This post goes into the details about how we use F# to model our workflows as a DSL.

Workflows

We use F# at Jet and have since the beginning, which is why when evaluating options to build a DSL (Domain Specific Language), F# was a front-runner. When we decided to build a DSL we needed to determine what was important for that DSL to allow:

  • Compile Time Validation: Since developers are the ones mainly building steps and defining workflows we want workflows to enjoy the type safety and tooling that F# provides.
  • Readability: Workflows are representations of business logic and should be easily read by both developers and business users.
  • Extensibility: Any good DSL allows for extensions or improvements without breaking or affecting existing implementations. We wanted to be able to add new types of steps and compositions in the future easily.

The workflow DSL that we ultimately finalized on was centered around the ability to chain/compose steps. Each step is simply a function which given an Input and State produces an Output/Side Effect (Input → State → Output/SideEffect). These steps can be composed or chained together to represent complex business flows. A visual representation of a workflow can be seen below:

Figure 1: Visual Representation of a Workflow

The workflow above is an example of a simplified business flow that creates an order, its corresponding side-effects, reserves inventory, ships the order, and then finally charging a customer. This type of visual representation is something engineers and business users can design together and understand. Once a visual representation of the flow is determined a developer can then use the DSL to define the workflow, expressed using a function :

workflow : (name : string) -> 
(triggers : Trigger list) ->
(metadata : WorkflowMetadata) ->
(step : WorkflowStep) ->
Workflow

The sample workflow above (figure 1.) can be written using the workflow function above :

I’ll discuss some of the elements of this DSL in more detail below. But, I want to highlight the main arguments of the workflow function :

1. Workflow Name (workflow "Sample Workflow"): Workflows all require a name, these names are responsible for many identification elements as well as control elements such as execution channel control, enable/disable key names, etc.

2. Triggers: Triggers are specified using a Discriminated Union, wherein each case corresponds to a particular input type/source. A common trigger is a Stream which has three parameters: string * TaskIdType * PrimaryKeyPath the string is StreamDefinition whereas the TaskIdType and PrimaryKeyPath are JsonPath’s for the trigger message. Note: Workflows can have more than one trigger and have various types.

3. Workflow Metadata: Workflow metadata defines channels for communicating with the workflow executor, side effects executor and where failed workflows/side effects should deadletter, as well as various configuration elements like concurrency setting for the workflow. (See my previous post Microservice to Workflows for more architecture details)

4. Steps: Steps represent actions or side-effects that need to be performed. These actions are a tuple of the display name and the name of the implementation module. Note: In this example, the display name and the step implementation are the same, but cases, where you re-use the same step implementation multiple times in a workflow, would result in different display names.

5. Step Composition: Steps are composed together to form workflows through a series of composition functions. More on what those are later.

Triggers

Workflows can have one or many different triggers. These triggers are used to configure where the WorkflowTriggers service consumes trigger messages from. This consumption is the same consumption mechanism used in our microservices which have been detailed in previous posts F# Microservice Patterns @ Jet.com. The Trigger type is a discriminated union (algebraic type):

type Trigger =
| Simple of string * TaskIdType
| Stream of string * TaskIdType * PrimaryKeyPath
...

Note: I’m not going to go into detail about all of these different types of triggers in this post, I only talk about the Simple and Stream. We currently support several others which are useful when working with trigger streams that contain multiple different message types.

The Stream trigger used in the example workflow above has three parameters:

  • string : The string here represents a Stream Definition which is a representation of an external system (i.e. Kafka, EventStore, API, etc.). A previous post Abstracting IO in F# goes into far more detail about this concept and how we model and use them across the system.
  • TaskIdType : Each execution of a workflow has a unique task-id. The TaskIdType specifies how the taskId is obtained. Currently, we support multiple types of task IDs. We support having randomly generated ones these can be used when you don’t want any form of de-duplication checking and always want to run a workflow. We also support extracting the Task ID from the input payload to ensure we only execute the workflow once per message.
  • PrimaryKeyPath : Is a JSON Path for where to extract a value to be used as the Journal Id. The Journal is the source of truth for all workflows, it’s an event source log of every action that workflow has taken.

The Simple trigger type is a trigger that consumes the specified stream definition with a new key for each input. The workflow instance id of the workflow input gets set to a new GUID. This trigger gets used when the incoming data doesn’t have a unique identifier associated with it.

Steps

A step is a representation of an action that needs to be taken by the workflow. Steps are the basis of any workflow and are what define its functionality. There are currently three different types of steps:

  • Simple Steps(step): Just represents an action, will always be executed and all the business logic is self-contained within the step itself. Simple steps are represented using the step function.
  • Conditional Steps(cond): Are steps that contain a Condition, these conditions allow for behavioral information to be encoded into the workflow DSL, to make decisions about which steps to execute depending on certain logic.
  • Optional Steps (option): Are steps that contain a conditional but do not need to have a path for the negative case. Conditional steps always need to have a positive and negative path.

Predicate Logic

Predicate Logic aka Conditions can be used in either cond or option steps or used in the Trigger Stream for conditional filtering. The conditions work by extracting data from the specified Qualifier based on some JSON Path, they then compare this extracted data with the expected value based on the Comparison and OperandType . The different types of conditions that are supported are:

type Condition =
| True
| False
| Simple of Qualifier * path:string * expected:string
| Match of Qualifier * path:string * expectedRegx:string
| Compare of qualifier:Qualifier * typ:OperandsType * path:string * comparison:Comparison * expected:string
| Count of qualifier:Qualifier * path:string * comparison:Comparison * expected:string
| Exists of qualifier:Qualifier * path:string
| Not of Condition
| And of Condition List
| OR of Condition List

The base conditions:

  • True | False: Always evaluate (True) or never evaluate (False)
  • Simple: Extracts the data from the specified Qualifier based on the path, and compare with the expected value.
  • Match: Extracts the data from the specified Qualifier based on the path, and Regex match with the expected value.
  • Compare: Extracts the data from the specified Qualifier based on the path, and compare with the expected value cast to OperandsType, based on the Comparison. The Comparison type currently supports the following comparisons:
type Comparison =
| GreaterThan
| GreaterThanOrEqual
| LesserThan
| LesserThanOrEqual
| NotEqual
| Equal

The OperandsType are common data types, the currently supported types are:

type OperandsType =
| Int
| Long
| Double
| Decimal
| Boolean
| DateTime
  • Count: Extracts the data from the specified Qualifier based on the path, usually an array, get the counts of the element, and compare with the expected value(a stringified int value).
  • Exists: Extracts the data from the specified Qualifier based on the path, detect whether it exists
  • Not | And | Or: Are boolean operators used to compose conditionals i.e.
let cond = Condition.Count(Qualifier.Input, "$.a", Comparison.Equal, "10")
let cond2 = Condition.Simple (Qualifier.Aggregate, "$.bs", "sting")
let cond3 = Condition.Simple (Qualifier.State, "$.a", "20")
let condORAnd = (cond OR (cond2 AND cond3)) |> Condition.Not

Note: Currently supporting Qualifier are State/Input/Aggregate.

Step Composition

step(“CreateOrder”, “CreateOrder”) => 
step(“PreDealOrder”, “PreDealOrder”)

We compose steps together using composition operators:

  • chain (=>): steps can be composed(=>) together to form a chain. So a => b, implies that a step a gets executed first and then executed step b.
  • any (=?>): steps can be composed with a group of conditional steps where only the first step whose condition is met gets executed. So a =?> [b;c;d], implies that step a gets executed first. Then assuming step b does not meet the condition but steps c and d meets the condition, then since c comes first, only c gets executed.
  • every (=??>): steps can be composed with a group of conditionals where every step whose condition is met is executed. So a =??> [b;c;d] implies that step a gets executed first, then, assuming step a does not meet the condition but steps c and d meet the condition, then both the steps c and d get executed.
  • all (=>>): steps can be composed in a group where all the steps get executed. So a =>> [b;c;d], implies that step a gets executed first and then all the steps b, c and d get executed next.

Runtime

The F# DSL allows us to easily design and implement workflows, but to facilitate the execution of the DSL across services we created a different internal workflow representation for use in our backend services. We convert the DSL into a DAG (Directed Acyclic Graph) to decouple our specification DSL from our runtime environment. This graph is used by our services to actually execute the workflow. This graph is easily represented as an F# type:

Our runtime environment converts our F# DSL into a Graph using an evaluate function : evaluate : workflow:Workflow -> WorkflowEvaluation. The workflow evaluation is simply a type wrapper around our graph that allows us to easily look up which graph to use for which workflow:

type WorkflowEvaluation = 
{
name : string
model : Graph<WorkflowStep>
}

The graph allows our services to easily traverse and understand the pathways of any given workflow, as well as decouples our service runtime from the DSL representation which allows us to potentially have multiple different DSL that could compile into our runtime. The DAG allows us to use the DSL today and later adapt to any other possible DSL or language without the need to change our backend. This would also enable us to develop a design language which could then be mapped back into the DAG. This design language could be specified through a graphical UI to allow for business users to easily develop workflows with pre-defined blocks.

In addition to allowing our backend services to have their own runtime representation, it allows us to easily visualize our workflows in a UI by converting our graph into a Dot Graph, which is a commonly accepted format for visualizing graphs.

Conclusion

The combination of all these different elements forms the basis of our workflow scheme. While workflow DSLs like (Netflix Conductor, Apache Airflow, etc.) allow for the workflow to be defined at runtime, we have found the developer practice to be improved when the workflows are defined at compile time and go through source control processes. The F# workflow model described above allows for workflow validation through F# signature checking, as well as being able to run and pre-commit testing of the workflow steps and DSL, to ensure correctness before deployment. By decoupling our workflow DSL from our backend execution layer we have been able to demonstrate the ability workflow definitions that are expressed in other languages (javascript) as well as raw text.

In the future posts, my team will discuss in more details the implementation of the workflow engine, including how we execute side effects, trigger workflows and communicate between services to execute provided workflows.

If you like the challenges of building distributed systems and are interested in solving complex problems, check out our job openings.