F# Microservice Patterns @ Jet.com

Written By: James Novino

In this post, I elaborate on the details of how we build, design and scale microservices on Jet’s Order Management System (OMS). This post is an extension of Abstracting IO using F# posted previously.

Background

A Microservice Architecture is a method of developing software that tries to focus on building single-function services (modules) with well-defined operations. This architecture pattern has grown in popularity in the recent years as it offers some key benefits such as:

  • Deployment Flexibility — Can deploy services independently without affecting the rest of the system.
  • Isolated — Easier to follow code since the function(service) is isolated and less dependent.
  • Modularity — Services can be built with varying technologies or languages, and don’t require an entire system overall to update the technology stack.
  • Reusability— Services can be shared across systems or business units.

These benefits are among the main reasons companies like Jet, Netflix Amazon, PayPal, and other tech companies have all chosen microservice architectures over the monolith. However, microservices aren’t without their challenges:

  • Deployment Complexity — While microservices provide valuable benefits when it comes to isolated deployment this often requires sophisticated deployment infrastructure to achieve.
  • Support — While microservices provide benefits when it comes to code isolation, they are run in a distributed manner which can often complicate the support and maintenance of systems.
  • Monitoring —Monitoring asynchronous interactions is complicated and challenging and requires a lot of specialized tooling.

These are just a few of the trade-offs of microservices, there are many more which are better described in other posts.

At Jet, we have been using microservices since the very beginning. In the last 4+ years, we have learned lessons about how to build, deploy, manage and support an ever-growing set of requirements and services. In the previous post on IO Abstractions, in the section on “Service Abstractions”, we briefly discussed how we use OMS.Infrastructure to build microservices. This post is intended to elaborate on those details.

Microservices

Most microservices at Jet follow the decode → handle → interpret pipeline that was discussed in the previous post. This pattern was initially motivated by the notion of a DSL (Domain Specific Language) by defining a DSL, and then build interpreters for it. The decode/handle/interpret flow is essentially a highly specialized version of this pattern. Note: The limitation of this approach is that in attempting to decouple effects from the core logic it falls short. The handle function may need to make Asynchronous calls to retrieve data in order to perform domain logic. As a result, part of the function is “pure” in that rather than performing effects it simply returns an Output, but another part may not be “pure”.

The decode handle interpret pipeline has a few core constructs which make up the basis of a microservice.

1. A set of Inputs that a microservice can handle. These inputs are commonly represented as a discriminated union in F#.

2. A set of Outputs that a microservice can interpret. Note: This not the actual output of the microservice but is instead an internal representation of what those outputs should be. More on this later.

3. A decode:DomainEvent -> Input option function that deserializes the incoming messages to an appropriate strongly typed input.

4. A handle:Input -> Async<Output> phase that takes an Input from the previous step, runs some business logic to calculate what side-effects should occur and generates the Output accordingly.

5. An interpret:Output -> Async<unit> phase that takes an Output and executes the side-effects that the output usually represents.

With some plumbing code, the above constructs can be chained together to form a single handler which makes up a microservice. Shown below is an example of the constructs and their composition.

The above example demonstrates the core constructs that was described earlier in the post (i.e.,Input ,Output,decode ,handle ,interpret). In addition to that, we can also see some additional functions such as RunWithMetrics.processWithCorrelation, OmsService.start, Parallelism.basedOn, etc. I’ll elaborate on these additional functions in more detail below, but before we get there let’s talk about the core concepts.

Decode

The above example shows how the different components are chained together to form a microservice to start the Input type is being created in the decode function:

let decode (evt : DomainEvent) = 
try
match evt.Name with
| "Something" ->
let magic ...
Some (Input.Trigger(magic))
with e ->
...
None

The decode function accepts a DomainEvent and turns it into an Input that the microservice can handle. Note: This pattern can be used on arbitrary “raw” inputs, such as byte[]. A DomainEvent happens to be a specialized representation of a “raw” input type used by OMS system. Typically in our systems, we match on the DomainEvent.Name whose value varies by the system the DomainEvent is being created from. For example, the event type is used for events from EventStore whereas a custom published field is used to set the DomainEvent.Name for messages generated to Azure Service Bus or Kafka.

The decode does not have to match on the DomainEvent.name; we utilize things like the DomainEvent.data or the DomainEvent.metadata to make these determinations when necessary. Note that the decode function returns an Input option (an optional F# type) which allows us to skip any invalid messages.

Typically, for Service Bus Queues or Kafka Topics, we generally have only one type of message for that queue/topic/channel in which case we don’t necessarily need to look into data or metadata. Another typical pattern is to assign a dedicated decode function to a single incoming stream.

let incomingStreams = 
[
yield "sbcmd://jetSb/test-queue"
|> OmsService.parse HostLookup.getHost
|> OmsService.consume
|> OmsService.decode sbDecode
        yield "kafka://jetKafka/fake-topic-p8"
|> OmsService.parse HostLookup.getHost
|> OmsService.consume
|> OmsService.decode kafkaDecode
    ]

The incoming stream would then be mapped over to create a list of handlers:

incomingStreams
|> List.map (OmsService.handle log Parallelism.Never handler)
|> OmsService.spawn //each of the streams will be processed independently.

Handle

The handle function accepts an Input and return’s an output. The handle is essentially a middleware step meant to encapsulate any business logic. For example, a typical use case is for the handle function to fetch an aggregate (state representation of event-sourced data) to determine what type of Output (side effects) are required. In the example below, the service is responsible for starting a fraud check.

let handle (input: Input) = async {
match input with
| Input.StartFraudCheck(orderId, orderItems) ->
let! aggregate = Aggregate.fetch orderId
if aggregate.isCancelled || aggregate.fraudCheckFailed then
// The order has reach a terminal state and we cannot start fraud check
return Output.NoOp
else
return Output.StartFraudCheck(aggregate)
}

The example above shows how the handle would encapsulate some domain logic to determine what type of side-effects if any need to be executed by the interpret.

Interpret

The interpret accepts the Outputa from the handle function and returns an Async<unit>. The interpret unlike the other functions is responsible for enacting side-effects. Continueing the example from the section above the interpret function would either handle the NoOp or send a command to our fraud service to evaluate the order for fraud.

let interpret (output: Output) = async {
match output with
| Output.NoOp -> return ignore
| Output.StartFraudCheck (aggregate) ->
let de = DomainEvent.New(Guid.NewGuid(), Some aggregate.orderId, None, data, [||])
let sd = sprintf "https://fraudCheck/fake/fraud/service?orderId=%s" aggregate.orderId |> OmsService.parse Lookup.getHost
// Note: It's not a good practice to ignore the write result here, typically we will log the result before returning
return! Outputs.writeAlways sd de |> Async.Ignore
}

While in the example above, we are only emitting a single side-effect, it’s not uncommon for a interpret to emit several different side-effects for a single Output. Another common practice is to emit different side-effects based on the result the first side-effect. For example, in the case above instead of ignoring the write result, we could match on the outcome and emit an event to log the response:

... 
let! fraudResult = Outputs.writeAlways sd de
match fraudResult with
| Choice1Of2 (result) ->
let de = ... Turns the response into a EventStore event
...
// Write Fraud Check Success
return! Outputs.writeAlways sd de |> Async.Ignore
| Choice1Of2 (result) ->
let de = ... Turns the response into a EventStore event
...

// Write Fraud Check Failure
return! Outputs.writeAlways sd de |> Async.Ignore
}

Handle + Interpret (Handler)

A handler is a function responsible for encapsulating the “domain logic” from the handle and interpret functions and applying it to the incoming message stream passed from the consume → decode chain. A handler is created by composing the handle and interpret functions by passing them into the RunWithMetrics module.

The RunWithMetrics module is inside of the OMS.Infrastructure package which we discussed in Abstracting IO using F#. This module contains two functions:

- processInput : log:NLog.Logger → category:string → handle:(‘a → Async<’b>) → interpret:(‘b → Async<unit>)→ input:’a → Async<unit>

- processWithCorrelation : log:NLog.Logger → category:string → generateCorrelation:(‘a → string list) → handle:(‘a → Async<’b>) → interpret:(‘b → Async<unit>) → input:’a * header:Logging.Header → Async<unit>

These two functions compose the handle and interpret functions into a handler that can be composed with the decode function to create the service. The implementation of these functions is fairly straight forward:

The main difference between the processInput and the processWithCorrelation functions is that the processWithCorrelation function uses a provided function to generate the correlationIds and that it requires that tracing header be present on the message to derive information about latency, producer, etc.

As you can see from the implementations above, both of the functions evaluate the handle and then pipe the output from the handle into the interpret. There are some utility functions throughout this implementation like Metrics.Latency.microservice correlationIds category "Handle" which are used to wrap async functions to get the elapsed time and write a metric to configured metric stores.

Consume

Now we have discussed most of the required parts of the microservice example from earlier, and we have been able to build the decode → handle → interpret pipeline:

let pipeline = 
let correlation =
function
| Input.Trigger(cid, _) -> [cid]
| _ -> []
let handler = RunWithMetrics.processWithCorrelation myLog "FakeService" correlation handle interpret
OmsService.consume
>> OmsService.decode (decode)
>> OmsService.handle log (Parallelism.basedOn (fst >> correlation)) handler

One of the elements we haven’t discussed yet is the OmsService.consume function. The OmsService.consume function is streaming (subscription) method which we use to connect to and consume messages from systems like Kafka and Azure Service Bus. This function is detailed more in the previous post. In brief, this function allows us to represent streaming messages as an AsyncSeq<Incoming>, where the Incoming type is simply a discriminated union (DU) representation of messages:

type Incoming =
| Message of DomainEvent * CloseAction * CancelAction
| Batch of DomainEvent[] * CloseAction * CancelAction
| Bunch of DomainEvent[] * CloseAction * CancelAction

The type is intended to represent three common message representations:

  • Message : Single Message
  • Batch : A collection of messages which are in a sequence and the commit order matters
  • Bunch : A collection of messages where the commit order does not matter

All of this is detailed in the previous post in more detail. But by currying the consume >> decode >> handle we now have a pipeline which has a signature of :

pipeline : (StreamDefinition ->  
AsyncSeq<(Threading.Tasks.Task<Choice<unit,exn>>
* (Input * Header)) array>)

While we have defined all the logic needed to handle our inputs and outputs, we still need to have an entry point to create an executable for the service — we need something to kick off our microservice. This is where the spawn or the start functions come into play.

// Start the service 
incomingStreams
|> List.map pipeline
|> AsyncSeq.mergeAll
|> OmsService.spawn

The example takes the incoming streams as an arbitrary sized List and passes it into the OmsService.spawn function, which applies the pipeline function over the AsyncSeq. The implementation for the spawn function is below:

let spawn fs =
fs
|> List.map (AsyncSeq.iter (ignore))
|> Async.ParallelThrottled (fs |> List.length)
|> Async.RunSynchronously
|> ignore

This function is the final piece of the puzzle. We now have an entry point for our program. Typically this pipeline would be called from a Program file which would be responsible for setting all of our configuration and initializing logging configuration before starting our pipeline.

Conclusion

This post went into more detail about how we build microservices, namely the decode → handle → interpret pipeline. There are many different aspects of the boilerplate that was not discussed in this post, namely how we deal with things like idempotency, the implementation of the handler function, the configuration module in OMS.Infrastructure. Another important topic that is not covered is parallelism and scaling of these microservices which is the topic in a later post.