Event Sourcing - Step by step in F#

Why Event Sourcing again?

[Roman; Roman] |> Async.Parallel

Let’s start then, can we?

Event

Command

There is no door, Neo.

Events without Commands

Back to Event Sourcing…

Event Store

State

…and that’s why I don’t like cricket!

F# Example

Commands

type Command =     
| AddTask of CmdArgs.AddTask
| RemoveTask of CmdArgs.RemoveTask
| ClearAllTasks
| CompleteTask of CmdArgs.CompleteTask
| ChangeTaskDueDate of CmdArgs.ChangeTaskDueDate
module CmdArgs =    
type AddTask = {
Id : int
Name : string
DueDate : DateTime option
}
type RemoveTask = {
Id : int
}
type CompleteTask = {
Id : int
}
type ChangeTaskDueDate = {
Id : int
DueDate : DateTime option
}

Events

type Event =    
| TaskAdded of CmdArgs.AddTask
| TaskRemoved of CmdArgs.RemoveTask
| AllTasksCleared
| TaskCompleted of CmdArgs.CompleteTask
| TaskDueDateChanged of CmdArgs.ChangeTaskDueDate

State

type Task = {    
Id : int
Name : string
DueDate : DateTime option
IsComplete : bool
}
type State = {
Tasks : Task list
}
with static member Init = { Tasks = [] }

State Init (a.k.a Empty State)

Execute function

let execute state command =     
let event =
match command with
| AddTask args ->
args.Id
|> onlyIfTaskDoesNotAlreadyExist state
|> (fun _ -> TaskAdded args)
| RemoveTask args ->
args.Id
|> onlyIfTaskExists state
|> (fun _ -> TaskRemoved args)
| ClearAllTasks -> AllTasksCleared
| CompleteTask args ->
args.Id
|> onlyIfTaskExists state
|> (fun _ -> TaskCompleted args)
| ChangeTaskDueDate args ->
args.Id
|> (onlyIfTaskExists state >> onlyIfNotAlreadyFinished)
|> (fun _ -> TaskDueDateChanged args)
event |> List.singleton //we must return list of events
let onlyIfTaskDoesNotAlreadyExist (state:State) i =    
match state.Tasks |> List.tryFind (fun x -> x.Id = i) with
| Some _ -> failwith "Task already exists"
| None -> state
let onlyIfTaskExists (state:State) i =
match state.Tasks |> List.tryFind (fun x -> x.Id = i) with
| Some task -> task
| None -> failwith "Task does not exist"
let onlyIfNotAlreadyFinished (task:Task) =
match task.IsComplete with
| true -> failwith "Task already finished"
| false -> task

Apply function

let apply state event =     
match event with
| TaskAdded args ->
let newTask = {
Id = args.Id
Name = args.Name
DueDate = args.DueDate
IsComplete = false
}
{ state with Tasks = newTask :: state.Tasks}
| TaskRemoved args ->
{ state with Tasks = state.Tasks |> List.filter (fun x -> x.Id <> args.Id) }
| AllTasksCleared -> { state with Tasks = [] }
| TaskCompleted args ->
let task =
state.Tasks
|> List.find (fun x -> x.Id = args.Id)
|> (fun t -> { t with IsComplete = true })
let otherTasks = state.Tasks |> List.filter (fun x -> x.Id <> args.Id)
{ state with Tasks = task :: otherTasks }
| TaskDueDateChanged args ->
let task =
state.Tasks
|> List.find (fun x -> x.Id = args.Id)
|> (fun t -> { t with DueDate = args.DueDate })
let otherTasks =
state.Tasks
|> List.filter (fun x -> x.Id <> args.Id)
{ state with Tasks = task :: otherTasks }
type Aggregate<'state, 'command, 'event> = {    
Init : 'state
Apply: 'state -> 'event -> 'state
Execute: 'state -> 'command -> 'event list
}

Before we continue…

  • Domain (list of tasks)
  • Commands (way how to add/remove/complete tasks)
  • Events (results of our Commands)
  • Function for creating Events based on Commands (Execute) with basic built-in validation
  • Function for creating State based on previous Events (Apply)
  • Initial (empty) State we will start with
  • All wrapped in Aggregate type.
let tasksAggregate = {
Init = State.Init
Execute = execute
Apply = apply
}

Event Store Command Handler

  • Event Store implementation
  • Aggregate type with Init, Execute and Apply functions
  1. Get the Command as parameter
  2. Load all previous Events from Event Store Stream ordered by time they occurred
  3. Use Apply function to affect Init State, one by one. Just a typical fold function.
  4. Use the latest (current) State (result of folding/applying Events from previous step) as parameter for Execute function + use Command we got as parameter
  5. In case we got error, we end here and return/throw error
  6. In case Execute went ok, we now have new Events
  7. Save (append) those new Events back to Event Store
  8. Return new successfully created and stored Events
// create new Event Store
let store = CosmoStore.CosmosDb.EventStore.getEventStore myConfig
// get current state from Tasks stream
// note the use of tasksAggregate record
let getCurrentState () =
store.GetEvents "Tasks" EventsReadRange.AllEvents
|> Async.AwaitTask
|> Async.RunSynchronously
|> List.map (fun x -> Mapping.toDomainEvent (x.Name, x.Data))
|> List.fold tasksAggregate.Apply tasksAggregate.Init
// append new events to Tasks stream
let append events =
events
|> List.map Mapping.toStoredEvent
|> List.map (fun (name,data) -> { Id = Guid.NewGuid(); CorrelationId = Guid.NewGuid(); Name = name; Data = data; Metadata = None })
|> store.AppendEvents "Tasks" ExpectedPosition.Any
|> Async.AwaitTask
|> Async.RunSynchronously
|> ignore
let handleCommand command =
// get the latest state from store
let currentState = getCurrentState()
// execute command to get new events
let newEvents = command |> tasksAggregate.Execute currentState
// store events to event store
newEvents |> append
// return events
newEvents

Read (query) side

let handleEventToConsole event = 
match event with
| TaskAdded args -> printfn "Console handler says: Hurrayyyy, we have a task %s!" args.Name
| TaskCompleted args -> printfn "Console handler says: Task with ID %A is completed" args.Id
| AllTasksCleared -> printfn "Console handler says: ...and now they are all gone"
| _ -> ()
let handleEventToSql = 
match event with
| TaskAdded args -> args |> sqlInsert
| TaskCompleted args -> args |> sqlUpdate
| AllTasksCleared -> sqlDeleteAll()
| _ -> ()
let handle evn =    
evn |> handleEventToConsole
evn |> handleEventToSql

Final pipeline

// simple function composing command handler + event handlers
let pipeline cmd =
cmd
|> CommandHandler.handle
|> List.iter ReadSide.handle

Q: Why should you even think about writing apps this way?

  • You never lose any data. You just store all the data you have about Events and decide later (can be 1ms after, but 1 year as well) what kind of information you are interested in.
  • Absolutely DDD friendly. Design your domain the way you think it should be and define Events around.
  • Thinking and talking in Events makes communication with customer much easier. They don’t understand models, relations, constraints or foreign keys, but they will tell you after OrderReceived there should be EmailSent. You are now talking in same language.
  • You get fully trustworthy audit log by design.
  • You can replay old Event and get new information from them. For marketing, security or whatever reason (somebody said “bugfixing”?), you can go through all the Events from the very beginning and create new projections.

Q: Can we rename Command Handler to IBusinessDomainEventFactoryProxyImplProvider?

Q: Why you shouldn’t do Event Sourcing?

  • Like it or not, the complexity of application is much higher than simple CRUD against SQL table.
  • Restoring State from stored Events can be expensive. Speaking about thousands of Events in single Stream, it can take some time. If your application is performance-first + invalidating State every few milliseconds (gaming apps?), Events Sourcing is not for you.
  • Until you get all your read side data from Event Store (which you still can, but can be ineffective), you are in the world of eventual consistency.
  • Events migrations / versioning is not piece of cake.
  • Correct splitting domain into aggregation roots and storing in Streams is probably the most difficult part I can think of.

Q: You’ve sold me Event Sourcing, where should I go next?

--

--

--

F# enthusiast, founder of FSharping meetups, cheap joker, senior developer by title, junior developer by hunger for improvement.

Love podcasts or audiobooks? Learn on the go with our new app.

Recommended from Medium

Making a Visual Trading Simulator From Random Numbers with OpenCV

Month 1 — Reset Phase

Introduction to Docker

Online Crossword

Online Crossword

SRE / DevOps / Kubernetes Weekly Collection#1(Week 06)

A Method For Measuring And Managing Exploratory Testing — Tentamen Software Testing Blog

Using vermin to manage your own VMs

Visualize RDS DB Performance with Performance Insights

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
Roman Provazník

Roman Provazník

F# enthusiast, founder of FSharping meetups, cheap joker, senior developer by title, junior developer by hunger for improvement.

More from Medium

Building a Smarter Account Dashboard with Domain-Driven Design

DESIGN PATTERN FOR MICROSERVICES — AGGREGATOR PATTERN

What Is Event Sourcing?

A Metrics Suite for Microservices, EventStorming and DDD