F# Async Guide

Leo Gorodinski
28 min readJun 26, 2018

--

This is a usage guide for asynchronous programming in F# using the Async type. The content should be helpful to existing F# Async users or those approaching F# concurrency from another programming language, and is complementary to existing material such as Asynchronous Programming by Scott Wlaschin, Async in C# and F# by Tomas Petricek and Async Programming in F# on MSDN.

Table of Contents

  • Definition — the definition of the F# Async type, its interaction with the thread pool and then, async workflows.
  • Hazards — common programming hazards with F# Async and workarounds.
  • Related Programming Models — relationship to other programming models.
  • Concepts — narrative on general concepts in concurrency used throughout the post.

Definition

The F# Async type represents an asynchronous computation. It is similar in concept to System.Threading.Tasks.Task in .NET, java.util.concurrent.Future in Java, a goroutine in Go, Control.Concurrent.Async in Haskell, Event in Concurrent ML, or promise in JavaScript, with some important differences.

Overall, F# Async serves the following needs:

  1. It allows for more efficient use of OS threads by preventing the need to block them when waiting.
  2. It provides constructs for concurrency and parallelism in addition to sequential computation.
  3. It indicates that a computation is long-running, or may not be expected to terminate.

Programmatically, the Async type is defined as follows:

type Async<'a> = ('a  unit)  unit

In other words, a value of type Async<'a> is a function that accepts a callback function of type 'a → unit and returns unit.

We can derive the Async type as follows. Suppose you've an operation that transmits and then waits for a response to an HTTP request:

let download (url:string) : string =
let client = new WebClient()
let res = client.DownloadString url
res

In this case, the call to DownloadString is blocking - the OS thread on which the execution is taking place becomes blocked for the duration of the IO operation. When a thread is blocked, it isn't directly consuming CPU resources, however it continues to consume stack space, which it needs to resume when the operation completes. These context switches, as a thread blocks and then unblocks, are costly. We can make more efficient use of threads and processing resources by using the calling thread to invoke the operation, and when the IO operation completes, send a notification to a callback, on another thread. This can be done as follows:

let downloadCallback 
(url:string)
(callback:string → unit) : unit =
let client = new WebClient()
client.DownloadStringCompleted
|> Event.add
(fun args callback args.Result)
client.DownloadStringAsync url

In this case, the call to downloadCallback returns immediately, and the provided callback is subscribed to an event that triggers when the invoked operation completes. This allows the callback to be called from a different thread, and allows the calling thread to continue doing useful work rather than remaining blocked. If you squint a little, you can see that the type of downloadCallback url is (string → unit) → unit and if we generalize that to a generic type 'a we end up with the definition of Async<'a> above.

Using the Async type, we have the following signature for the operation:

val downloadAsync : string  Async<string>

At this point, it is possible to understand why this computation is asynchronous. It is asynchronous because there are two core steps involved — the invocation of the operation and the receipt of the response. Furthermore, we can see how the async type allows us to manage OS threads more efficiently — rather than blocking the calling thread, the calling thread remains free to do other work. We’ll cover this in more detail below.

The actual implementation of the Async type, available on the F# repo, is more involved due to the need to support exceptions, cancellations, a growing stack - some of which are discussed later on. The central 'constructor' for an Async value is the Async.FromContinuations function:

Async.FromContinuations : 
(('a → unit) *
(exn → unit) *
(OperationsCancelledExceptions → unit) → unit) → Async<'a>

In addition to the successful completion callback 'a → unit , it takes callbacks (continuations) for errors and cancellations.

Thread Pool

Rather than managing threads directly, the Async type works along with the .NET Thread Pool to schedule work. The thread pool maintains a pool of threads, growing and shrinking as needed and provides the following key interface:

ThreadPool.QueueUserWorkItem : (unit → unit) → unit

This operation queues an action unit → unit to be run on a thread pool thread. The operation of the ThreadPool can be visually depicted as follows:

While it is possible to simply start a new thread whenever an action needs to be scheduled, using a thread pool allows thread creation costs as well as context switching costs to be amortized. Rather than blocking threads, threads are kept busy with a queue of work maintained by the thread pool. In the example above, the DownloadStringCompleted event might be triggered from a thread pool thread. This approach to scheduling work items is sometimes referred to as green threads. The relationship between Asynccomputations and OS threads is not one-to-one — more Async computations does not automatically result in more threads, and in particular, increasing parallelism isn't achieved by increasing the number of threads, but rather, by increasing the number of in-flight computations. In effect, the Async type encapsulates callbacks and the ThreadPool into a higher-level programming model as described below. With that said, in some cases, tuning the thread count limits on the ThreadPool can improve performance.

Async Workflows

F# async workflows provide a syntax that permits expressing sequential workflows in terms of Async computations. For example, given the downloadAsync operation above, we may want to perform another download based on the result of the first and then perform a transformation on both results:

let callApi (url:string) = async {
let! data1 = downloadAsync url
let! data2 = downloadAsync (url + data1)
return data1,data2 }

While this workflow is expressed sequentially, the underlying computation runs asynchronously and avoids blocking an OS thread during the processing of downloadAsync. This is achieved by translating the workflow syntax into Bind and Return operations defined on the Async type as follows:

The result of the call to downloadAsync is passed to Bind and the portion of the workflow after the call to downloadAsync is passed to Bind as the continuation. The Return operation takes a value, in this case the pair of data1 and data2, and lifts it into an Asyncvalue. The async workflow in F# also defines operations to support other control flow constructs - loops, delayed execution and exception handling.

A chain of bind operations forms a sequential computation. Much of the theory of computation (ie Turing machine, lambda calculus) models sequential computation, where steps happen sequentially, one after another. Of course, things wouldn’t be much fun if we were limited to sequential computation. The F# Async type also allows us to also express parallel computations, using the Async.Parallel : Async<'a>[] → Async<'a[]> operation, for example. This operation takes an array of async computations and returns a single async computation that will yield their aggregated results, thereby expressing fork-join parallelism. The "fork" part is the starting of the provided computations, and the "join" part is awaiting their results. Another way to express parallelism is with the Async.StartChild : Async<'a> → Async<Async<'a>> operation. This operation starts a computation and returns a 'handle' to a computation that can be used to rendezvous with the result at a later time. This makes it possible to start multiple computations to be run in parallel, but still cleanly gather their results without any low level threading constructs in play. This in turn can be used to implement an operation such as Async.Parallel : Async<'a> * Async<'b> → Async<'a * 'b>. This operation can also be implemented using the sequential workflow with the Bind operation; however the provided computations would run sequentially rather than in parallel, changing the semantics significantly.

Hazards

There are several hazards to programming F# Async. Some are already covered in Tomas Petricek’s excellent Async in C# and F# Asynchronous gotchas in C# article, but we discuss a few more here.

Async.RunSynchronously

The Async.RunSynchronously : Async<'a> → 'a operation provides a way to commence and then obtain the result from an async computation. The name of the operation is deliberately made cumbersome to type because it must be used judiciously. F# novices or those new to functional programming in general often struggle with as they seek to access the value produced by a computation and end up "cheating" by calling Async.RunSynchronously. Ideally, Async.RunSynchronously would only be invoked once for the entire program and passed an async computation representing the program. Most importantly, calls to Async.RunSynchronously from within loops should be avoided. The reason for this is that Async.RunSynchronously is implemented by blocking the calling thread until the async computation completes. This in effect undoes much of the benefit of using the Async type in the first place, but is necessary in order for the async computation to take effect. If the call is made only once for the entire program, only one thread remains blocked waiting for the program to complete, which of course is fine. Frequent calls to Async.RunSynchronously however don't play well with the .NET ThreadPool. Blocking threads will pressure the ThreadPool to create more threads, eventually causing it to reach its limits, inducing a high number of context switches and wasted stacks. Instead of calling Async.RunSynchronously, either use async workflows or operations on the Async type such as async.Bind to access the produced value.

See also: Asynchronous Everything by Joe Duffy

Summary

  • Avoid calling Async.RunSynchronously except for at the entry point for the executable.

Async.Start

The Async.Start : Async<'a> → unit operation starts an async computation without waiting for the result by scheduling the computation on a ThreadPool thread. This operation is what actually puts the async computation chain constructed by calls to operations as defined above into motion. As described, the call to Async.RunSynchronously is implemented by starting a computation which stores its result in a wait handle on which the calling thread waits. It is akin to forking a thread. The related operations Async.StartChild : Async<'a> → Async<Async<'a>> and Async.StartChildAsTask also start an operation without awaiting the result, however they also return a handle making it possible to await the result. Care should be taken with these operations because they can result in overly non-deterministic executions. It may cause too many operations to be running in parallel, potentially degrading performance. Moreover, exceptions raised by computations passed to Async.Start aren't propagated to the caller and are easily overlooked. In fact, it should rarely be needed to make use of Async.Start in application code. Instead, favor calls to Async.Parallel or Async.ParallelThrottled for expressing parallelism.

For example, suppose you’ve a sequence of async computations that need to be run. One way to run them is to iterate the sequence, starting each computation with Async.Start. However, this:

  1. May cause more than the desired number of computations to be run in parallel.
  2. Doesn’t provide a way to await the completion of the sequence and
  3. Leaves exceptions thrown by individual computations unhandled.
// a sequence of computations
let comps : Async<unit> seq = ...
// start each computation
// do not await the results
comps |> Seq.iter Async.Start

Instead, it is possible to run the computations in parallel using a call to Async.Parallel which will address the aforementioned issues:

// run computations in parallel, 
// await the results, exceptions
// escalated to the caller
do! comps |> Async.Parallel

Another way a need to call Async.Start may come up is to start a background process of some sort. For example, a program may have a health check or reporting process to be run along the core logic. If however this background process is run using Async.Start, exceptions raised by the background process may be left unhandled, preventing the program from reporting its health.

val coreProcess : Async<unit>
val backgroundProcess : Async<unit>
Async.Start backgroundProcess
Async.RunSynchronously coreProcess

If this is undesirable, the fate of the background process should be tied to the fate of the core logic of the program using Async.Parallel : Async<'a> → Async<'b> → Async<'a * 'b> :

Async.Parallel 
[coreProcess; backgroundProcess]

With the alternative approach, if exceptions raised by the background process should be discarded without causing the program to crash, this can be done explicitly by catching the exceptions and logging as appropriate.

Summary

  • Consider using a higher-level construct before using Async.Start.
  • Determine whether exceptions raised by computations started with Async.Start should affect the calling computation.
  • Be sure to propagate a CancellationToken to Async.Start if applicable.

Async.Parallel

As described above, Async.Parallel is a way to express fork-join parallelism. However, an important consideration when using this operation is the number of input computations provided. If the number of input computations is too high, then the call to Async.Parallel may create too much contention for both memory and IO resulting in performance degradation. Additionally, if the sequence of computations is unbounded, the call to Async.Parallel will run out of memory before starting any of the computations because internally, it allocates an array to store the result of each computation. Instead, consider using either Async.ParallelThrottled : int → Async<'a>[] → Async<'a[]> or Async.ParallelThrottledIgnore : int → Async<unit> seq → Async<int>. The former is like Async.Parallel except it bounds the degree of parallelism, and the latter also bounds parallelism, but doesn't store the result of computations, only the count of the number completed, making it possible to use with unbounded sequences of computations. Care must be taken to tune for the appropriate degree of parallelism, especially for IO bound computations where there aren't rules of thumb such as for CPU bound computations (ie a thread per core). The best value may depend on the nature of the computations and may even change over time. An even more ideal scheduler would automatically control the degree of parallelism with a strategy to either maximize throughput or minimize latency.

The Async.ParallelThrottledIgnore operation can be implemented as follows:

Summary

  • Ensure that the number of input computations passed to Async.Parallel is bounded.
  • Consider using a throttled variant as described above to reduce contention.
  • Consider using a non-Async based parallelization mechanism for compute-bound computations which don’t use Async.

Compute-Bound Computations

While it is possible to express parallelism with Async, as described in the previous section, using this approach for compute-bound computations may not be the most efficient. A compute-bound computation is one where a majority of time is spent on computational tasks rather than awaiting IO operations. In these cases, it is better to use something like Parallel.For or PLinq to take advantage of parallelism. This method avoids the overhead involved in the Async continuation mechanism. However, it is important to note that if a compute-bound operation does make an IO request, using Async.RunSynchronously to await it will cause blocking and may reduce performance over using Async.Parallel.

MailboxProcessor

As described above, the MailboxProcessor (MBP) provides an actor-based concurrent programing model. However, for most applications, this model is fairly low-level and requires considerable care to avoid common pitfalls. The MBP is best suited for implementing higher-level library constructs, but it should be avoided in domain code for reasons described below. One of the most common hazards with the MBP is that it is easy to overlook exceptions thrown by the processing computations. These exceptions are published on the Error event, however this event needs to be explicitly subscribed to in order to observe the errors. Even if the error is caught, it may not be clear how to proceed as the context is lost. Next, the PostWithAsyncReply operation together with the AsyncReplyChannel type do not provide a way to propagate exceptions, forcing users to express exceptions using an explicit Result value or by using a TaskCompletionSource instead.

For example:

let rec proc mbp = async {
let! (data,replyCh) = mbp.Receive ()
let! result = .... // logic
replyCh.Reply result
return! proc mbp }
let mbp =
MailboxProcessor.Start proc
let handle (data:string) : Async<string> =
mbp.PostAndAsyncReply
(fun replyCh -> data,replyCh)

Here, if the processing logic throws an exception, the caller in handle will be suspended indefinitely and the exception will be swallowed. Moreover, the MailboxProcessor will halt and be unable to process any additional messages. One might instead expect the exception to be escalated to the caller, and for the MailboxProcessor to continue processing. This can be done by explicitly catching exceptions inside of the processing loop and then propagating to the caller, either using an explicit Result value or through a TaskCompletionSource rather than an AsyncReplyChannel. For example:

let postAndAwaitResult 
(mbp:MailboxProcessor<'a>)
(f:TaskCompletionSource<'b> 'a) = async {
let ivar = TaskCompletionSource<_>()
mbp.Post (f ivar)
return! ivar.Task |> Async.AwaitTask }
let rec proc mbp = async {
let! (data,ivar) = mbp.Receive ()
try
let
! result = ....
ivar.SetResult result
with ex ->
ivar.SetException ex
return! proc mbp }
let mbp = MailboxProcessor.Start proc// exceptions will be escalated
// to the caller
let handle (data:string) : Async<string> =
postAndAwaitResult mbp
(fun ivar data,ivar)

Another thing to keep in mind with MBP is that the mailbox is unbounded and therefore, has the potential to overflow. In the context of a producer-consumer scenario, the producer may produce messages at a higher rate than the consumer is able to consume them, resulting in an unstable system. An explicit backpressure mechanism is needed to coordinate the consumer and the producer for preventing overflow. One way to do this is using the BoundedMb type which places a bound on the number of messages in the mailbox. If the bound is reached, the BoundedMb exerts back-pressure on the producer.

Beyond these nuances with exceptions and back-pressure, the MailboxProcessor programming model can lead to needless layers of indirection. In the example above, if the desired outcome is to invoke the processing logic, it is much more reasonable to simply invoke the logic directly rather than routing through the MBP. Of course the MBP can do more than simply forwarding messages, but if more complex behaviors behaviors are required, it is better to encapsulate these behaviors in a reusable data structure.

Examples of higher-level async structures that can be implemented with MBP are:

  • MVar — a serialized variable with lazy initialization, akin to a ref but with support for serialized, async-based mutation. Beware of deadlocks when mutating!
  • SVar — like MVar but with an additional tap operation which returns an AsyncSeq of values stored.
  • Channel — synchronizes a producer and a consumer of a message. Similar in spirit to channels in Go and Concurrent ML, however without support for selective communication.
  • BoundedMb — a bounded mailbox, similar in functionality to BlockingCollection, however using Async to represent waiting. This is an effective way to include back-pressure for produce-consumer scenarios.
  • BatchProcessingAgent — a buffer which forms and publishes batches of publishes messages.

In many cases, it is better to rely on these data structures rather than implementing a custom MBP for a domain-specific use-case.

Another way to approach this programming model is to turn the processing logic “inside out” using AsyncSeq. First, we repurpose the MBP to act as solely as a mailbox:

let mbp : MailboxProcessor<'a> = 
MailboxProcessor.Start
(fun _ -> async.Return())

Then we represent the incoming messages as a stream using AsyncSeq:

let stream : AsyncSeq<'a> = 
AsyncSeq.replicateInfiniteAsync mbp.Receive

Now we can publish messages to the mailbox asynchronously, and consume the resulting AsyncSeq explicitly. This allows us to use existing operations on AsyncSeq to filter, transform and buffer the messages, it allows us to merge the stream with other streams, and represents the process explicitly as an Async operation such that we can join it with other operations:

let proc : Async<unit> =
stream
|> AsyncSeq.bufferByTimeAndCount 100 100
|> AsyncSeq.iterAsync processBatch

This approach makes the processing logic explicit and provides a more convenient way to handle exceptions.

Summary

  • Beware of exceptions raised by processing logic used inside a MailboxProcessor.
  • Consider using TaskCompletionSource rather than AsyncReplyChannel to signal from within a MailboxProcessor, particularly when exceptions may be raised.
  • Consider using or implementing a higher-level component rather than using a MailboxProcessor for domain-specific code.

CancellationToken

A CancellationToken is used to cancel computations in response to cancellation requests that are external to the computation itself. Several of the operations on Async, such as Async.Start and Async.RunSynchronously, are parameterized with an optional CancellationToken, such that if a cancellation is requested on that token, the computation can be notified, allowing it to terminate. There are many reasons to cancel a computation. One of the most common is to impose a timeout on a computation. More generally, the reason could be as a response to new information, invalidating the inflight computation. Care must be taken to ensure that a computation will actually respond to a cancellation request. In many cases, this is done automatically by machinery inside Async itself. For example, before each async.Bind is invoked, the cancellation token is checked. Also, calls to Async.Sleep will be cancelled as expected. However, if an async computation has a prolonged compute-bound section, the cancellation token must be checked manually.

Each async computation is bound to a CancellationToken and is accessible with Async.CancellationToken : Async<CancellationToken>. If a token isn't provided explicitly as described above, Async.DefaultCancellationToken is used. The default cancellation token can be cancelled by calling Async.CancelDefaultToken, however this will signal a cancellation for all computations bound to this token. To explicitly bind an async computation to a token, the token can be passed along with the computation to Async.Start or other operations.

As a convenience:

Note how in this case, the argument CancellationToken is linked with the ambient CancellationToken, and the linked token is passed to Async.Start. As a result, the computation will be cancelled in response to either the argument CancellationToken or in response to the ambient CancellationToken. This may not be desired in all cases.

Cancellation tokens are not a first-class concept within the Async type and require special treatment. In some cases, it is possible to use a first-class selective communication mechanism, or at least a best-effort attempt. What would it mean for cancellation to be first-class? A cancellation token establishes a race between two computations: the core computation at hand and the computation that represents a cancellation. For example, a timeout can be viewed as a race between a computation and a timer.

More generally, we can implement cancellations using the Async.Choice : Async<'a option> seq → Async<'a option> operation. Given a sequence of input computations, this operation will start all of them, return the result of the first one to complete, and cancel the others. However, cancellation is a best-effort attempt, and therefore, does not represent true selective communication. For example, if we apply Async.Choice to the Receive operations on two MailboxProcessor instances, the message received from the second one of the two to complete will be lost. A more elaborate synchronization mechanism is required to implement true selective communication wherein the message remains in the second mailbox.

Summary

  • Be explicit about propagating cancellation tokens when calling Async.Start and related operations accepting a cancellation token.
  • Avoid calling Async.CancelDefaultToken to avoid interference with unrelated computations.
  • Be sure to extract and reference the ambient cancellation token via Async.CancellationToken when an computation has an extensive compute-bound section to ensure that it is properly cancelled.
  • Consider using Async.Choice in scenarios requiring first-class flow control.
  • Take note of the issue when using Async.AwaitTask on cancelled Task instances as described in the next section.

Async.AwaitTask

The Async.AwaitTask : Task<'a> → Async<'a> operation translates a Task value to an Async value. Many asynchronous operations in the .NET Framework return Task and this operation is used to map them to Async. In versions of F# prior to 4.1, the implementation of Async.AwaitTask had a bug wherein cancellations to Task computations would be lost, resulting in indefinitely suspended Async computations. This would lead to difficult to find bugs in the program. Many have encountered this when using HttpClient from F#. Indefinitely suspended Async computations are a broader hazard discussed next.

Another hazard involving Task and Async is in attempting to use selective communication among them. For example, suppose you've a component such as a Socket or state representing a node's view of a cluster. We can represent the state of this component using a TaskCompletionSource which is set to the Completed state when the component is closed, or to the Faulted state when the component fails. Suppose also that you've component-dependent Async operations, such as sends and receives. We'd like to cancel an in-flight operation whenever the component is closed or faulted, so that they can be retried on a new component instance. This calls for selective communication - we'd like to select between awaiting the completion of an operation or the closing of a resource. More precisely, we're looking for a function of type chooseTaskOrAsync : Task<'a> → Async<'a> → Async<'a> where the first argument would correspond to the component state and the second to the operation. If the component is closed, we'd like to raise an exception, and to do that, we could use Task.ContinueWith. However, since for each instance of a component we might have a large number of component-dependent operations, we'd add a large number of continuations to the Task corresponding to the component. If those continuations aren't properly cleaned up, we end up with a memory leak. The Task.WhenAny operation on the other hand ensures that orphaned continuations are properly cleaned up and allows us to avoid a memory leak.

Summary

  • Ensure that you’re using a correct implementation of Async.AwaitTask to await Task instances which may be cancelled.

Indefinite Suspension

Nothing in the Async type ensures that the computation terminates. It is possible to impose a timeout, as described in the previous section, but this isn't done automatically. As a result, it is quite possible to end up with an async computation that never terminates, causing an indefinite suspension in the program. On the one hand, this accurately depicts the nature of asynchrony, but on the other hand, it can lead to some adventurous bug hunting.

A helpful operation to impose timeouts is as follows:

This operation can be applied onto top-level handler functions where it isn’t certain whether internal operations take care of timeouts, but where there is an evident upper bound on the time the operation should require. Of course some computations are deliberately non-terminating, such as a heartbeating process, for example. In this case, timeouts aren’t needed, and it may be helpful to explicitly signal this fact by returning a constructor-less Void type from the computation.

Summary

  • Consider imposing a limit on the duration of an async computation.
  • Take care to propagate all forms of completion for an async computation, including errors and cancellations.

Laziness

While F# is, by default, eagerly evaluated, Async computations are lazy, albeit with important exceptions. Laziness implies that simply having a reference to an Async computation does not imply that that computation is running. This is in contrast to Task, for example, which usually represents a computation which is already running. In addition, unlike lazy evaluation in languages like Haskell, Async computations are not memoized, which means they will be reevaluated each time they are run. This is again in contrast to Task, which is idempotent - once it completes, the produced value is memoized. The lazy nature of Async is evident through the async.Delay : (unit → Async<'a>) → Async<'a> operation which takes a function producing an async value, and represents it as an async value. The function will be evaluated each time the Async computation is evaluated. The Delay operation is used as part of a syntactic transformation of an async workflow, making everything inside an async block lazy. However, it is also possible to explicitly memoize an Async computation and it is impossible to determine whether a given async computation is memoized or not. For example, an Async computation can be memoized by using a TaskCompletionSource to store its result:

Another example where an Async computation is a already in flight is the result of the Async.StartChild : Async<'a> → Async<Async<'a>> operation. When the outer Async computation is bound, the input computation is started, and the inner Async computation is a handle to the started computation, which when bound, awaits its result. Awaiting the inner computation multiple times does not reevaluate the input computation.

The (mostly) lazy nature of Async can lead to unexpected results. For example, suppose you want to run two Async computations in parallel, and be notified when the first one completes, but also be able to retrieve the result of the second computation once it completes. Using the Async.choose operation as defined above would cause the second computation to be cancelled. If the calling code were to await its result, the computation would be reevaluated. Instead, the following operation might be better suited to this task:

The Async.race operation explicitly memoized the result of the second computation. We can compare this with the Task.WhenAny operation which will also returns the first computation to complete, however the other computations are not cancelled and can still be awaited by the caller.

Thread Local Storage

As described in the Thread Pool section, async computations aren’t bound to specific threads, and a given workflow may execute across several thread pool threads throughout its lifecycle. As such, the Thread Local Storage (TLS) mechanism can’t be used to store contextual data for a workflow. However, cross-cutting concerns often require a notion of workflow-local storage, for example to store a tracing context. Even though this mechanism isn’t provided out of the box, it is possible to implement it explicitly by building a workflow for the following type:

type Context = Dictionary<string, obj>// An async computation explicitly
// depending on a context
type AsyncEnv<'a> = Context Async<'a>

This type can be treated in the same way as the existing Async type by implementing a computation workflow, however it can also provide operations for reading and writing into the context. In fact, the existing Async type already stores the ambient CancellationToken in its context and it should be possible to extend the implementation to support arbitrary data items. Note that workflow context should be used judiciously as it can lead to unexpected results and leaks.

Summary

  • Don’t rely on thread-local storage from within Async computations.
  • If you need workflow-local storage, consider implementing a extended Async computation workflow.

Related Programming Models

In this section, we compare the Async type to similar concepts in .NET and other programming languages.

.NET System.Threading.Tasks.Task

The System.Threading.Tasks.Task type in the .NET Framework serves a very similar purpose to Async. It also represents a computation that eventually produces a value. Async has operations to map to and from Task. However, there are some important differences. First, a Task is idempotent (monotonic): once it produces a value, the task is completed and will no longer perform additional computation. Async on the other hand can be evaluated many times. It is possible to cache the result of an Async computation, however this must be done explicitly. Second, in most cases, a Task represents an in-progress computation, whereas an Async represents a computation which must be explicitly evaluated. The Task.ContinueWith operation is similar to async.Bind - it binds a continuation to the result of the computation. Since Task is monotonic and idempotent, it is important to note that Task.ContinueWith adds the continuations to a list in the target computation, whereas async.Bind returns a copy of the workflow which will be reevaluated. As a shoutout to the monad people, Task.ContinueWith is actually the comonadic extend operation, whereas async.Bind is the monadic bind operation. Task has the additional Unwrap operation corresponding to the monadic join. It is possible to map between Async and Task using the Async.StartAsTask and Async.AwaitTask operations. In F# this is commonly done to interact with existing C# libraries, or to take advantage of Task in scenarios where it is a better fit.

Java java.util.concurrent.Future

The Future type in Java is essentially the same as the Task type above.

Akka

Akka is an actor framework for the JVM. It is heavily inspired by Erlang, and in addition to the actor model itself, provides facilities for routing, fault tolerance and distribution. As described in the MailboxProcessor section, the actor model is too low-level for many use-cases, making it easy to make mistakes. To that end, Akka also provides a Future type to express request-reply interactions. The FSharp.Akka library is a wrapper for the Akka.net port of Akka.

Go Goroutine

A Goroutine is very similar to F# Async. The Go concurrency model is heavily inspired by CSP, and in addition to goroutines, it includes channels. A channel is a junction across which goroutines can exchange messages. The select statement provides selective communication amongst channels. Note that selective communication is not an entirely trivial concept.

JavaScript Promise

A JavaScript promise is essentially the same thing as Task and Future, and also similar to Async. NodeJS users are familiar with the pain of callback-style programming, and JavaScript promises adapt it to the more convenient sequential flow control style.

Haskell Control.Concurrent.Async

The Haskell Async type is a thin layer atop the IO monad and is very similar to the F# Async type. There are additional constructs in the Control.Concurrent namespace, such as MVar, IVar and Chan. IVar is essentially TaskCompletionSource and MVar is described above in the MailboxProcessor section. Chan is similar to channels in Go and Concurrent ML. In addition, Haskell has other concurrent programming models such as Software Transactional Memory (STM) and Transactional Events. Simon Marlow's book Parallel and Concurrent Programming in Haskell offers a wealth of information on concurrent programming in Haskell.

Concurrent ML

Concurrent ML is a concurrency library for the ML programming language. The Event construct is very similar to F# Async, however at closer inspection it supports a richer set of operations. In particular, Event and the accompanying Channel construct in ML support selective communication. Selective communication forms a proper disjunction between computations, committing to one and ensuring the other is not committed to. Hopac is an implementation of Concurrent ML in F#, with a vast array of operations and types. In essence, it is an implementation of the pi-calculus.

Joinads

Joinads is a research extension of F# based on the join-calculus programming model. Joinads also include a syntactic construct extending the existing match syntax in F#, allowing the expression of join patterns among multiple channels. This provides a richer and more convenient set of synchronization mechanisms beyond F# Async - in particular, selective communication. With any luck, the programming model will make it into the core F# language at some point.

Hopac

Hopac is an implementation of Concurrent ML in F#. It provides a much richer set of operations than the F# Async type, in particular for selective communication. It is also more efficient than F# Async or Task for many workloads. In addition, the library is accompanied by a wealth of documentation which is useful for programmers in any language.

Clojure Async

F# Async is similar to and is motivated by many of the same reasons that Clojure Async is.

Concepts

This section is a narrative on concepts of concurrent and parallel programming used throughout the post.

Concurrency & Parallelism

Concurrency refers to the absence of ordering information among events. In other words, given two events, if we don’t know which came first, we call the events concurrent. Furthermore, even if we impose a total order on the events in the system, operations, consisting of an invocation and completion event, are regarded as concurrent when they overlap. Even though one operation may start before the other, overlap in their spans makes the ordering between operations a partial order. Concurrent programming refers to programming in the face of absence of ordering information among some subset of events in the system. Various models of concurrency have been developed in order to better understand the semantics of concurrency and/or to provide a programming model suited to concurrent domains. We shall discuss a few of these models and relate them to F#.

One model of concurrency from the process calculi family is called Communicating Sequential Processes (CSP). CSP models a concurrent system as a collection of independent, sequential processes (i.e. threads) which interact at explicit junctions. An interaction event is a point of synchronization between processes, allowing the exchange of information. Another model of concurrency is the actor model wherein actors, which are sequential threads of control, are a core computational primitive. Both processes in CSP and actors in the actor model interact using explicit message passing, rather than through shared memory, such as in the PRAM model. Note however that this distinction between shared memory and message passing becomes blurred since interactions with shared memory can also be modeled using message passing. Indeed, it takes a non-negligible amount of time to send a read request across the memory bus, and moreover, modern memory systems rely on cache coherence protocols in order to provide consistent guarantees. Both CSP and the actor model are notable because they’ve been very influential in the design of programming models for concurrency. The actor model is well known through the Erlang programming language, or the Akka actor framework on the JVM. CSP influenced the Concurrent ML programming model as well as the concurrency model in Go.

In .NET, we’ve the fundamental synchronization primitives which include locks, synchronization events, wait handles, interlocked operations, etc. A lock or mutex, for example, facilitates interaction among threads by delimiting a section of code — called the critical section — that can only be accessed by one thread at a time, providing mutual exclusion. Multiple threads can execute a critical section, but just one at a time, which makes it much easier to reason about memory access and mutation. Synchronization events also facilitate interaction among threads by allowing one thread to wait on a signal from another thread or process. Interlocked operations are essentially locks at the hardware level. The introduction of concurrent collections in .NET provided access to the higher-level producer-consumer pattern. The TaskCompletionSource type is similar to a synchronization event, however the signal can be accompanied by data, and waiting is expressed using the Task type.

In F# we also have the MailboxProcessor (MBP) which, as alluded by the name consists of a mailbox and a processor. The mailbox can be posted to and received from, and the processor is a thread of control interacting with the mailbox. Semantically, the MailboxProcessor can be associated to the actor model of concurrency, though typical actor model implementations (such as Akka.NET) are accompanied by support for distribution as well as a range of facilities for routing and fault-tolerance. The MBP manages concurrency by (FIFO) ordering messages posted to the mailbox. The thread of control processes a single message at a time without any need to consider parallelism in the implementation as only a single message is processed from the queue at any point in time. MBPs are particularly useful for implementing higher-level constructs such as producer-consumer queues, buffers, channels, etc.

Concurrency and parallelism are related notions and are often used interchangeably. However, upon a closer inspection, their relationship is more of a duality. Parallelism is the idea of launching operations to be run in parallel. This in turn results in events, generated by those operations, which are concurrent, because ordering information is absent. Concurrency, on the other hand, typically refers to synchronization among concurrent events. Speaking loosely, parallelism generates disorder and concurrency synchronizes it. As an example, the Async.Parallel operation involves both - it first parallelizes the input computations, but then it synchronizes the parallel computations into a single converged result.

Asynchronous & Synchronous

The Async type is so called because it enables controlled use of asynchrony by decoupling the invocation of an action from the handing of its result, while retaining sequential flow control. Asynchrony allows for more efficient use of threads, as well as for expression of parallelism and concurrency. A related notion is that of an asynchronous network wherein there is no bound on message transmission delay. The underlying substrate is that of asynchrony — the event that represents a message being transmitted is decoupled from the event representing receipt or completion, resulting in temporal decoupling. However, complete asynchrony wouldn’t be of much use without synchronization. In terms of events, synchronization is the act of combining multiple events into one. For example, an interaction between two processes can be represented by two events, one at each process. In the theory of concurrency this is known as synchronous rendezvous. In .NET, TaskCompletionSource is a way to implement a form of rendezvous between threads, with one thread waiting for a value and another signaling the value. In Go and Hopac, for example, channels are used as a rendezvous mechanism. It should be noted that synchronization requires coordination among participants. This can be costly in the context of a single process and even more so across network boundaries. As such, systems should be designed to be asynchronous to the extent possible, but with principled use of synchronization where it is required, keeping locality in mind.

Selective Communication

Selective communication is a concept involving channels, as seen in Go, Haskell, Concurrent ML, and F# Hopac. Selective communication is the idea of selecting a message from a set of channels, picking the first one to produce a message, while leaving the others intact. A critical component of selective communication is that only one channel is picked and received from, with the others left intact. Simply invoking a receive operation from multiple channels in parallel doesn’t quite do the trick since it may cause multiple channels to dequeue a message where only one will be received by the caller. F# Async doesn’t provide a selective communication mechanism out of the box. More broadly in .NET, we’ve the BlockingCollection.TakeFromAny operation, but of course BlockingCollection uses blocking as its synchronization mechanism. The need for selective communication is quite common. Whenever a choice needs to be made among a set of possible events, there's a need for selective communication. In this sense, selective communication is the dual to parallelism. However, selective communication is typically implemented in ad-hoc ways; in .NET it is usually done using CancellationToken.

See also: The Hopac Programming Manual.

Acknowledgements

Thanks to Gustavo Leon, Eirik Tsarpalis, Ruben Bartelink and many others at Jet for comments, edits, suggestions.

--

--

Leo Gorodinski

interested in: distributed systems, geometry, algebra, logic, leadership, economics, machine learning, game theory, philosophy, music