Abstracting IO using F#

Written By: James Novino

In this post, I discuss approaches to unifying IO access pattern in F#. Unification allows us to abstract the underlying infrastructure implementations while enabling us to have consistent interfaces for all our IO patterns. Having shared access methods enabled consistent wrappings for things like retries, metrics, logging.

Overview

The Order Management System was originally a collection of microservices built over a few libraries. OMS.Base was the library which provided supporting modules to handle concerns such as Error Handling & Retries, Stream Subscriptions, etc. when working with external systems such as EventStore, ServiceBus, Kafka. As the system grew we, recognized that the access patterns do not change significantly across different systems and thus OMS.Infrastructure was born. This library centralized the code to access various external systems while address common concerns. We took our dozen or so infrastructure libraries and abstracted them into a single library. This represented all the best practices and knowledge that we had accumulated in 4+ years of operations. The OMS.Infrastructure library’s most significant improvement over OMS.Base was the abstraction of IO operations to common interfaces, no longer did you need domain-specific information about reading and writing to Event Store, Kafka, etc.

Design

OMS.Infrastructure abstracts the access patterns we use across our systems both in OMS and across Jet. The common access patterns used are:

  • Read One or More Documents/Entities
  • Write One or More Documents/Entities
  • Subscribe to a Stream
  • Query

These were distilled into 4 generic modules:

  • Inputs
  • Outputs
  • Query
  • Service Abstractions

Each of these four modules provided generic interfaces to perform all common access patterns. The library solves some of the common concerns:

  • Logging — Recording events/information to either a file or STDOUT/STDERR for use in monitoring, troubleshooting and debugging.
  • Retries — Retrying failure scenarios. A nice article on some common retry patterns.
  • Error Handling — Also known as Exception Handling is the process around dealing with anomalous or exceptional behavior that requires special handling.
  • Code Distribution — The ability to distribute the source code to access and utilize infrastructure between both projects in the same solution and multiple solutions.

Centralizing these concerns reduces code duplication and provides consistency across the consumers. It also allowed us to distribute a single NuGet package with pre-configured dependencies.

Abstractions

In order, for the OMS.Infrastructure library to deal with access patterns in a generic way some abstractions were needed:

  • Stream Definitions: String or Uri representation of external systems
  • Domain Events: Common type representation for all events
  • Access Modules: Inputs, Outputs, Queries, Consume

I discuss each of these in more detail below. Each of these abstractions provides some value, but together we get a single unified access pattern that is abstracting away all of our IO.

Stream Definition

So, what is a stream definition? Well a stream definition is simply a string representation of external systems below are a few examples:

kafka://jetKafka/foo-topic-p8
sbcmd://jetSb/fake-sb-topic
https://jetapi/api/FakeEndpoint?uniqueId=FakeId&service-auth=fake-auth&service-method=post

To turn the strings above into Stream Definition type we need to do the following:

let sd = 
"kafka://jetKafka/foo-topic-p8
|> OmsService.parse lookup

The parse function turns the string representation into the F# discriminated union type:

type StreamDefinition = 
| ServiceBus of ServiceBusMetadata
| EventStore of EventStoreMetadata
| Kafka of KafkaMetadata
...
| AzureBlob of AzureBlobMetadata

The StreamDefinition type (above) allows us to write abstracted IO interfaces inside of OMS.Infrastructure. Each StreamDefinition has three essential parts

  • Schema (kafka): What external system are we trying to access
  • Host (jetKafka): text representation of connection string or host used in HostLookup more on that below
  • Topic (foo-topic-p8): This is backing store specific this field represents a topic name for Kafka but for EventStore would be a stream name

There is a single parse interface for all the different possible stream definitions. The parse interface has two functions:

  • parse : lookup:(string → string) → string → StreamDefinition
  • parseUri : lookup:(string → string) → Uri → StreamDefinition

The parse function is an abstraction of the parseUri function. We tend to prefer the parse function over the parseUri function as it’s easier to pass string through in our code base over Uri’s. The URI representation is used so that the schema can be determined so that the appropriate underlying parse function can be called.

The underlying parse function takes two parameters a lookup function string -> string and the Uri which is the string representation from the above examples. The lookup function is used to convert the hostname (jetKafka) into a connection string based on which environment the calling assembly is running in. An example of the underlying parse function can be seen below:

Domain Event

OMS.Infrastructure interacts with all the supported external systems through a common type which is the Domain Event. The Domain Event contains a set of fields that are generic and shared amongst all different types of events. The available fields in the Domain Event are:

The DomainEvent provides us with a uniform type to deal with when interacting with external IO. Unfortunately, not all the IO interaction models fit perfectly into this type of representation and is a leaky abstraction. For instance, the name field stores the EventName when interacting with EventStore but is not used at all when interacting with DocumentDB. Note: The structure of the domain event is not the ideal. It has a lot of tech debt that we carry. For example, number is obsolete, and version must be used. correlation_id is a bad name that is string, but id is Guid. The DomainEvent could be better designed. Some of the current proposals include removing the number, correlation_id and id fields and instead introducing a new CorrelationType and a new id field that would handle our use cases better. This requires a lot of internal refactoring to change and is something we hope to tackle in the near future once a better design can be finalized.

Access Modules

Inputs

The inputs module provides 4 read interfaces:

  • read : (sd : StreamDefinition) → Async<ReadResult>
  • readWithRetries : (sd : StreamDefinition) → Async<Choice<ReadResult,exn>>
  • readWithSetRetries : (retryCount : int) → (sd : StreamDefinition) → Async<Choice<ReadResult,exn>>
  • readLast : (sd : StreamDefinition) → Async<DomainEvent option>

Rather than using something like this where we need to have domain specific knowledge about how we are interacting with the external system:

We can now use the abstraction above and get the same functionality with the Inputs.readLast function:

let sd = 
"eventstore://omsEs/Foo-123456789"
|> OmsService.parse Lookup.getHost
let result = Inputs.readLast sd

The main benefit for the abstracted IO compared to the first implementation above is the code above is backing store agnostic as such I can easily change it to use table storage by changing the stream definition:

let sd = sprintf "azuretable://omsTable/FunTable?partitionKey=%&rowKey=%" partitionKey rowKey |> OmsService.parse Lookup.getHost
let result = Inputs.readLast sd

As you can see the code is the same to read from Table Storage or Event Store. In the above example the Inputs.readLast operation still needs to call the EventStore code above, however by abstracting the domain event and the type of the read operation, callers don’t need to be aware of this and a different implementation can be provided (i.e. For Testing, Chaining Access Dependencies, etc.) This allows us to control our backing implementation through configuration by updating the stream definition. This can be done through feature flagging in the stream definition. The below stream definition uses implementation uses the Confluent.Kafka library:

let sd = 
"kafka://jetKafka/fake-topic"
|> OmsService.parse Lookup.getHost
let result = Inputs.read sd

However, we can have multiple different implementations for accessing Kafka which we could control through a feature flag in the Kafka stream definition. The example below uses a different underlying implementation that uses the Kafunk library:

let sd = 
"kafka://jetKafka/fake-topic?useKafunk=true"
|> OmsService.parse Lookup.getHost
let result = Inputs.read sd

The code above is identical except for the stream definition which we would store in Consul. This ensures that the implementation in our service are completely backing store and implementation agnostic i.e.:

let sd = 
OMS.Infrastructure.Consule.getKey "OMS/Fake_Stream_Definition_X"
|> OmsService.parse Lookup.getHost
let result = Inputs.read sd

Outputs

Like the inputs module the outputs module provides unified access through 3 write interfaces:

  • writeAlways : (sd : StreamDefinition) → (de : DomainEvent) → Async<Choice<WriteResult,exn>>
  • writeWithSetRetries : (retryCount : int) → (sd : StreamDefinition) → (de : DomainEvent) → Async<Choice<WriteResult,exn>>
  • writeWithRetries : (sd : StreamDefinition → (de : DomainEvent) → Async<Choice<WriteResult,exn>>

This saves us from having to deal with exception handling and retry logic at the domain level as we can just wrap that logic at the infrastructure/platform level:

Just like the Input module, this allows us to simplify our IO interactions by dealing with everything in terms of stream definitions and Domain Events. The writeFunc is an interceptor wrapper that allows for unit testing by intercepting the function call before calling the external systems. The implementation and usage of this is outside of the scope of this post. Some of the other functions used like:

  • Helpers.logException
  • Metrics.Latency.external

Are examples of shared logging, or metrics functions that are provided by using a common infrastructure library. The logException function simply looks at the Choice<a,exn> if an exception was thrown, then it simply logs it and returns the type unmodified. The Metrics.Latency.external is a latency logger which wraps the execution of the pipeline and evaluates it to get the total latency and writes metrics to any/all of the configured metrics sources.

Query

Querying for data is a common access pattern for us as we may need to query something like SQL to find information to emit in an event. As such the Query module provides query support for infrastructure that supports querying like Table Storage, SQL, CosmoDB, etc. It contains only a single interface:

  • run : (sd : StreamDefinition) → (query : string) → (parameters : seq<string * ‘a>) → Async<Choice<QueryResult, exn>>

The Query.run functionality allows us to abstract things like dealing with Dapper, or CosmoDB parameters into a unified pattern to simplify querying. The implementation of the Query module is very similar to the Input/Output modules:

The main distinction between the other modules is the underlying backing store functions that are called and the Result Type.

Service Abstractions

The boilerplate we use to build microservices and our consume functionality are contained within the OmsService module. We have already talked about one of those functions the OmsService.parse but there are a few others most notable are:

  • decode → handle → interpret pipeline
  • consume

Both of these concepts are explained in detail in the sections below.

Decode → Handle → Interpret Pipeline

Presentation on the Decode Handle Interpret Pipeline

The decode → handle → interpret pipeline is explained in the above video. The pattern was originally motivated by the notion of a DSL and interpreter wherein you define a DSL, and then any number of interpreters for it. The decode/handle/interpret flow is essentially a highly specialized version of this pattern. The decode/handle/interpret flow is core construct behind all of our microservices and contains 5 core concepts.

  1. A set of Input that a microservice can understand
  2. A set of Output that a microservice can understand
  3. A decode phase that basically deserializes the incoming message on a pipe to an appropriate strongly typed input.
  4. A handle phase that takes an input, runs some business logic to calculate what side-effects should occur and generates the Output accordingly.
  5. An interpret phase that takes an Output and executes the side-effects that the output usually represents.

These constructs allow the DSL to be chained together to form a functional microservice. An abbreviated example of how to create a microservice can be found below:

As you can see from the example above the consumption or microservice DSL is consume(event) decode handle interpret. I may write a more detailed post about some of these constructs used above and how we detail with things like fault tolerance, parallelism, etc. at a later date.

Consume

The consume function is a subscription (also commonly called “streaming”) access module which we use for messaging systems such as Kafka, Azure Service Bus. This function, as shown above, is the beginning of the decode/handle/interpret flow. The consume function has a signature of consume : (sd : StreamDefinition) → AsyncSeq.T<Incoming>. The Incoming type is a DU type to represent the different possible consumption use cases.

When subscribing to a backing store there are 3 common patterns.

  1. Message : An Individual Message
  2. Batch : Regular batch where messages are in a sequence and commit order matters
  3. Bunch : A bunch of messages read together but where commit order does not matter

Each of these patterns has a data portion ( DomainEvent or DomainEvent[]) and an action portion which contains aCloseAction and CancelAction. The CloseAction and CancelAction are type aliases for functions that either cancel the processing of the messages or complete the messages in the batch/bunch. An example of a CancelAction would be to abandon the message so that the message will not be processed again or to move the message into a special review so that it could be reviewed. An example of a complete action would be to commit an offset saying that message had been consumed or to pop a message from a stack. As an example we will look at the implementation of the toIncoming function for service bus:

let toIncoming (sbm : ServiceBusMetadata) (messages : BrokeredMessage array) =
let complete : CloseAction =
fun () -> messages |> completeBatch sbm
let cancel : CancelAction =
fun () -> messages |> cancelBatch sbm
    Incoming.Bunch(messages |> Array.map makeDomainEvent, complete, cancel)

The complete action is calling a completeBatch function that uses the CompleteBatchAsync function to complete the messages. The cancelBatch function uses the AbandonAsync function from the Microsoft.ServiceBus.Messaging.MessagingFactory.

The Cancel/Complete actions are specific to the backing store as is the decision regarding which type of Incoming should be used (i.e. Incoming.Bunch/Batch). The three common patterns discussed above cover all the use cases we currently have and allow us to subscribe to streams. This in combination with the decode/handle/interpret pipeline support almost all our microservice requirements.

Conclusion

The functionality described above goes through at a very high level the abstractions we are using to simplify our interaction with external systems. You might be asking yourself at this point: “that’s well and good but what real value does that provide?” We talked about the fact that it provides the benefits of having consistent logging, error handling, etc. but you could have gotten the same functionality and consistency through careful peer-review and common handling libraries. However, I haven’t talked about one of the most significant benefits to this type of abstraction. Having a unified access pattern makes configuration extremely easy and powerful.

The abstraction discussed above allowed us to build a generic Workflow system that was agnostic to storage/messaging layer used. For example, in early May we started seeing issues with size overflows on one of Tables in Azure Table Storage which we use as a state store. Within 10 minutes we were able to completely reconfigure our entire backing store for that workflow by changing a single stream definition. Without having abstracted IO, this would have been more involved as it would have required changes in lots of different services and a lot of configuration and testing. Having abstracted IO interfaces has been a huge time saver and is made easy through DU and Pattern Matching in F#. Other benefits that I did not discuss are common metrics interfaces, testing, parallelism, extensibility and shared boilerplate I may write a separate post on these topics in particular.

If you like the challenges of building distributed systems and are interested in solving complex problems, check out our job openings.


Acknowledgments

Thanks to Krishna Vangapandu, Leo Gorodinski and Louie Bacaj and all of the other amazing Jet engineers that worked on our system over the years and for the comments, edits and suggestions.