Day 6: Conduit — Streamy Co-routines for Piping Together Computations
Lazy evaluation lets us compose two different computations without thinking too much about who is “in control”: we can generate primes lazily, and take the root of them lazily and evaluation alternates back and forth between the “producer” and the “consumer”: if we try to take an element of rootsOfPrimes
, that computation then causes a bit more of primes
to be computed, but just enough to give the next rootsOfPrimes
element. Neither the primes not rootsOfPrimes computation needs to know about what is happening inside the other.
primes = computeAllPrimes
rootsOfPrimes = map sqrt (filter isOdd primes)
This gets messier when our producer and consumer (and perhaps intermediate steps) are monadic processes; and conduit
aims to help.
Imagine we want to grab a very long list of posts from some site, and process each one.
Sites like Facebook or reddit return paged results: each HTTP request returns a handful of results and invites you to make another request for the next pageful. So you might write (pseudo)code like this to apply action
to every post:
forPosts action = do
(ps, key) <- getFirstPage
mapM_ action ps
processNextPage key action
processNextPage key action = do
(ps, key') <- getNextPage key
mapM_ action ps
processNextPage key' action
forPosts
is in charge, and action
has to sit back and let itself be called exactly once per post, from one of two different locations in the above code.
But our post processing code might want to be in charge because it has its own complex control flow, like this contrived example:
consumeAll = do
p <- drainUntilLastYear
processPost p
processRemaining
processRemaining = do
p <- getPost
processPost p
processRemaining
drainUntilLastYear = do
p <- getPost
if (isThisYear p) then drainUntilLastyear
else return p
The first code block wants the consumer to supply action
that can simply be called with the next post; and the second code block wants the producer to supply getPost
which can simply be called to get the next post. These interfaces don’t fit together at all.
Conduits attempt to make this work.
You can write a producer and a consumer, and stick them together like this:
producer .| consumer
Both consumer
and producer
can be written as if they are in charge, but with two primitives await
and yield
to pass values between them.
When a consumer wants to get the next post, it can call await
right where getPost
is in the above code. Magically the producer will start churning away until it produces a result, at which point it should call yield post
right where action
is called in the above code… and the consumer will start running again, returning from the await
call with the yielded value from the consumer. The two processes can yield
/ await
back and forth between each other whenever they want, at any point in their respective code.
If you’re used to multithreaded programming, you might imagine this like two threads, communicating back and forth — and it is a bit like that. There are clearly two “threads” of control, but they aren’t separate OS or Haskell level threads. Instead, conduit sits as a monad transformer over something like IO
, and allows one or the other of the threads to run until it needs to pass control — a bit like the scheduler in an OS but only allowing context switching in very restricted circumstances.
It is straightforward to stick a filter in the middle: producer .| filter .| consumer
where filter is another process that both awaits
(from the producer) and yields (to the consumer) in whatever complex pattern it wants. There is no need, for example, to have the same number of awaits or yields.
The conduit README has a lot more information on other operations available at a higher level than await
and yield
, and plenty of examples and motivation.