Day 6: Conduit — Streamy Co-routines for Piping Together Computations

Ben Clifford
Twelve Monads of Christmas
3 min readDec 13, 2016

Lazy evaluation lets us compose two different computations without thinking too much about who is “in control”: we can generate primes lazily, and take the root of them lazily and evaluation alternates back and forth between the “producer” and the “consumer”: if we try to take an element of rootsOfPrimes, that computation then causes a bit more of primesto be computed, but just enough to give the next rootsOfPrimes element. Neither the primes not rootsOfPrimes computation needs to know about what is happening inside the other.

primes = computeAllPrimes
rootsOfPrimes = map sqrt (filter isOdd primes)

This gets messier when our producer and consumer (and perhaps intermediate steps) are monadic processes; and conduit aims to help.

Imagine we want to grab a very long list of posts from some site, and process each one.

Sites like Facebook or reddit return paged results: each HTTP request returns a handful of results and invites you to make another request for the next pageful. So you might write (pseudo)code like this to apply action to every post:

forPosts action = do
(ps, key) <- getFirstPage
mapM_ action ps
processNextPage key action
processNextPage key action = do
(ps, key') <- getNextPage key
mapM_ action ps
processNextPage key' action

forPosts is in charge, and action has to sit back and let itself be called exactly once per post, from one of two different locations in the above code.

But our post processing code might want to be in charge because it has its own complex control flow, like this contrived example:

consumeAll = do
p <- drainUntilLastYear
processPost p
processRemaining
processRemaining = do
p <- getPost
processPost p
processRemaining
drainUntilLastYear = do
p <- getPost
if (isThisYear p) then drainUntilLastyear
else return p

The first code block wants the consumer to supply action that can simply be called with the next post; and the second code block wants the producer to supply getPost which can simply be called to get the next post. These interfaces don’t fit together at all.

Conduits attempt to make this work.

You can write a producer and a consumer, and stick them together like this:

producer .| consumer

Both consumer and producer can be written as if they are in charge, but with two primitives await and yield to pass values between them.

When a consumer wants to get the next post, it can call await right where getPost is in the above code. Magically the producer will start churning away until it produces a result, at which point it should call yield post right where action is called in the above code… and the consumer will start running again, returning from the await call with the yielded value from the consumer. The two processes can yield / await back and forth between each other whenever they want, at any point in their respective code.

If you’re used to multithreaded programming, you might imagine this like two threads, communicating back and forth — and it is a bit like that. There are clearly two “threads” of control, but they aren’t separate OS or Haskell level threads. Instead, conduit sits as a monad transformer over something like IO, and allows one or the other of the threads to run until it needs to pass control — a bit like the scheduler in an OS but only allowing context switching in very restricted circumstances.

It is straightforward to stick a filter in the middle: producer .| filter .| consumer where filter is another process that both awaits (from the producer) and yields (to the consumer) in whatever complex pattern it wants. There is no need, for example, to have the same number of awaits or yields.

The conduit README has a lot more information on other operations available at a higher level than await and yield, and plenty of examples and motivation.

--

--