Reactivity in F#

Dag Brattli
9 min readDec 16, 2018

--

Reactivity in F# is part 2 and the “continuation” of Asynchronicity in F#, and builds upon what we discussed there about callbacks, continuations (CPS) and promises.

This time around we will be discussing reactive programming, asynchronous sequences and streams.

Interactive Programming

Interactive programming is programming where the application is asking for data, getting some response and then doing stuff based on the result. In programming terms this means that the program will be pulling data from functions and sequences and deciding what to do next.

for x in someSequence do
doStuff x

Interactive programming is programming with sequences, and the most common pattern is the iterator pattern. In the world of .NET and F# they are called Enumerables. In other languages they are known as iterables.

Enumerables and Iterables

With enumerators we pull values from the underlying collection one at a time, possibly an infinite series of them.

This is what we call “synchronous pull”

Sequences in F# such as Seq<’a> implements the IEnumerable<T> interface from .NET for pulling data from a collection. Here are the Enumerable family of interfaces translated to F#.

The IEnumerator<'a> might look a bit strange if you are used to the e.g much simpler Python next function. It has a property Current of type T, and a method MoveNext for moving to the next value, possibly returning false to signal the completion of the sequence. It may even throw an exception to signal an error. The Reset method is for legacy reasons and should be ignored.

In F# you could think about this interface using Result and Option types instead:

Next () : unit -> Result<'a, exn> option

Thus the returned item can either be some value of type 'a, an error of exn or None to signal the end and completion of the sequence. We could make this a bit cleaner by creating a custom type ResultOption. The reason for the complicated IEnumerable interface might be because of speed, variance and reducing memory allocations.

An enumerable sequence could be seen as some kind of infinite data structure, a stream of values. Remember how we called constant values for pull-values since the pulling was driven by the program. We also called promises for push values since the push was initiated by the environment around the program.

A sequence in F# is enumerated by pulling values so you can think of it as a “pull” stream:

  • Pull stream, values are pulled from the collection. The pulling is done by the program itself. But this also means that we can have push streams.

Reactive programming on the other hand is about programming with “push streams”:

  • Push stream, values are pushed to the collection. The pushing is initiated by the environment around the program.

Reactive Programming

Reactive programming is programming with (push based) event streams. Reactive programming is about registering your interrest in some data and then being told (called back) when it’s available. A push based event stream is an abstraction for a series of related events that are separated by ordering or time.

Examples are:

  • Mouse moves and button clicks
  • Keyboard input
  • Geo positions from a GPS device
  • Websocket events
  • Data sent from IoT devices

But in addition to being asynchronous and event-based, we say that reactive programming is also about composition and having event streams as first-class objects. First-class, means that we should be able to juggle with them and use event streams the same way as any other value in the programming language:

  • Assign an event stream to a variable or object property
  • Pass an event stream as an argument to functions or methods
  • Return event streams as the result from functions or methods
  • Create event streams dynamically at runtime

Observables

With Observables values are pushed from the underlying collection one at a time, possibly an infinite series of them.

This is what we call “asynchronous push”

Observables in .NET implements the IObservable<’a> interface. There are currently two different implementations for F#. The Control.Observable that is F# native, and the FSharp.Control.Reactive that wraps the .NET System.Reactive (C# implementation) Here are the IObservable<’a> interfaces translated to F#:

The releationship between the three interfaces can ben seen in this single line of code:

let disposable = observable.Subscribe observer

An observer is an implementation of IObserver<’a> such as:

{ new IObserver<'a> with
member this.OnNext x = ...
member this.OnError err = ...
member this.OnCompleted () = ...
}

But there are also extensions for using plain functions e.g for handling an OnNext even such as on (‘a -> unit). Thus if we have a for-loop in the enumerable world such as:

for x in xs do
printfn "%A" x

In the Observable world this is the equivalent to:

xs.Subscribe (fun x ->
printfn "%A" x
)

Subscribing to an observable returns a disposable (IDisposable) that can later be used to unsubscribe:

let disposable = observable.Subscribe observerdisposable.Dispose () // Unsubscribe

The most commonly known implementation of Observables is the Reactive Extensions (Rx).

A brief history of Rx

The Reactive Extensions (Rx) is a library for composing asynchronous and event-based programs using observable sequences and LINQ-style query operators.

The Reactive Extensions (Rx) was developed at Microsoft by the Cloud Programmability Group with the projects Tesla and Volta in 2007. The Rx logo represents an electric eel and was originally made for the Volta project, but is now the official logo for Rx.

The Rx Electric Eel

The project was staffed with members such as Erik Meijer, Brian Beckman, Wes Dyer, Bart De Smet, Jeffery Van Gogh and Matthew Podwysocki.

Rx.NET became available in 2009, and RxJS for JavaScript in 2010. In 2012 Rx was made open source by Microsoft Open Technologies and this triggered the development of Rx for other languages such as Java (RxJava), Scala (RxScala), C++ (RxCpp), Ruby (Rx.rb), Clojure (RxClojure), Groovy (RxGroovy), Kotlin (RxKotlin), Swift (RxSwift) and Python (RxPY).

An Observable is multi value abstraction and the Rx Grammar tell you that the stream may contain any number (0 or more) of OnNext, followed by a single OnCompleted or OnError.

OnNext* (OnCompleted | OnError)?

Rx represents asynchronous data streams using Observables, query asynchronous data streams using LINQ operators, and parameterize the concurrency in the asynchronous data streams using Schedulers.

Rx = Observables + LINQ + Schedulers

Schedulers include ImmediateScheduler, CurrentThreadScheduler, NewThreadScheduler, ProcessScheduler, AsyncScheduler, and you can decide where and when the subscription logic and observer invocations are being run.

Duality

Duality is about symmetry, and one way to explain duality is that one part can be seen as the opposite of the other. Any structure or action in one of the parts has a direct analogous, or opposite structure or action in the other.

In physics there are many famous dualities such as:

  • Wave-particle duality
  • Space-time duality
  • Electro-magnetic duality (capacitors vs inductors)
  • Logic, De Morgans Law (not (a && b) is the same as not a || not b)

In mathematics and category theory duality is about reversing arrows. This means that input becomes output, and output becomes input:

'a -> 'b is the dual of 'b -> 'a

In 2010 Erik Meijer discovered the duality between the Subject/Observer pattern (IObserverable/IObserver) and the Iterator pattern (IEnumerable/IEnumerator).

If we write the Enumerable and Observable artifacts as simple functions ignoring disposables and the details of the notification (OnNext, OnError, OnCompleted) then it’s much easier to see the perhaps surprising symmetry:

The inherit and close relationship between these two worlds is really amazing, and shows that Observables are really just as natural as Enumerables. They are simply the opposite of each other.

First Class Continuations

To explore how we can implement reactive streams we have to look one more time at continuations. Remember that an asynchronous function may either take a continuation ('a -> unit) as an extra parameter , or it could return a setter function ('a -> unit) -> unit that takes the continuation.

We can encapsulate such a function in a type to make the abstraction a bit higher.

The perhaps surprising thing is that Observables is basically the exact same thing as first-class continuations. We just need to rename Run to Subscribe, Return to Single, and Cont<’a> as our OnNext observer.

And using the interfaces, we can rewrite our continuation based Observable into an object representation using IObservable:

Async Streams

But the rabbit hole goes much deeper. Recently it has become popular to combine the worlds of Enumerables and Observables with async and await (Async<’a>). We call this Async Streams and can be divided into:

  • Asynchronous Pull
  • Asynchronous Push

Async Enumerables

F# async sequences are pull-based. You ask for a next item and then a callback is called when the value is available. You then ask for a next item, until the end.

The current F# implementation is in FSharp.Control.AsyncSeq, but there is a new .NET proposal called Async Streams. Here are the interfaces from the async stream proposal translated to F#:

Async Enumerables are much closer to Observables than plain Enumerables, because both are async. For Observables you asynchronously call the observer, for Async Enumerables you asynchronously return the value. The difference is really just that one transfers the value as a parameter while the other transfers the value as the return value.

But Observables have another trick up its sleeve. Meet Async Observables.

Async Observables

Observables and Rx is push-based. You start by subscribing, and then it keeps throwing values at you. Observables are already asynchronous, so Asynchronous Observables may sound like “butter on your bacon”:

What is Async asynchronous push anyways?

But as we will see this depends on which side of the fence we are standing.

Asynchronous Subscriptions

Subscribe is synchronous. This is a problem if you need to establish a network connection asynchronously in your subscribe logic. Wouldn’t it be nice if we could subscribe asynchronously?

Asynchronous Push

Observers with their OnNext, OnError and OnCompleted are all synchronous (for the caller). This is a problem if the observer needs to write asynchronously to disk, or contact some other web service. This is not possible with plain Observables.

The only way to handle such side-effects with Observables is to compose the side-effect into the observable stream using operators such as flatMap/SelectMany.

Wouldn’t it be great if we could do this using async and await (let!) instead? We already know that async and await is just syntactic sugar for hiding flatMap/SelectMany anyways.

Here are the Async Observable interfaces:

The only F# implementation is currently Reaction AsyncRx that specifically targets Fable, but runs on .NET as well. There is also AsyncRx.NET that is work in progress and might get F# wrappers in future.

Async Enumerables vs. (Async) Observables

Async Enumerables and (Async) Observables overlaps much more than Enumerables and Observables. This is because the pulling of the Enumerator is now asynchronous and may therefore implement several reactive application scenarios. Lets look more closely at the possible duality between the async stream types.

We see that they are basically duals of each other with the difference that the Async<_> do not change side and stays to the right.

Converting an Observable to an Enumerable involves queuing since the Observable may produce values faster than the Enumerator is being pulled. What about converting between Async Observables and Async Enumerables?

Below is an example showing that the Observer is being awaited until the Async Enumerable is being pulled. The Async Enumerable will also await values from the Async Observable.

This shows that we can easily convert between Async Obserables and Async Enumerables without using queueing. One uses the return value to transfer the data, the other one uses a function parameter. Both can await the operation. Async Enumerables are driven by the application and can control the flow rate by applying so called back-pressure. Async Observables are driven by the environment but calls may also be back pressured by an awaiting Async Observer.

Summary

Using Observables and reactive programming is just as fundamental as Enumerables and interactive programming. We have seen that they are in fact opposites of each other. Not using Observables for event based programming is like not using Lists and sequences with multiple values in “normal” programming.

“I like to program my applications using just single values.”

If you should use Async Enumerables or Async Observables depends on the scenario. Use reactive programming and Observables when you want to the environment around your application control what is happening. Use (Async) Enumerables when you want the program to be in control and control the event rate (applying back-pressure).

Combining asynchronous (Async<’a>) programming with Observables and Enumerables gives you the best of both worlds.

References and Further Reading

--

--