Promises + FP = Beautiful Streams

Yassine Elouafi
18 min readJul 17, 2015

--

W’ve seen that Futures/Promises emerge as a logical meaning for asynchronous functions like in this Node example :

fs.readFile(‘…’) // => a Future/Promise

Similarly, we w’d like to represent the meaning of functions like this

someEventSource.on('event', ...)  // => ?

The second code shares the asynchronous semantics of the first, hence the use of callbacks in both cases. But instead of a single future value, we are dealing here with multiples future values, more exactly a sequence of successive future values.

In this article, we will talk about an abstraction that captures the above meaning, the famous concept of Stream.

Look at the Wikipedia definition of streams

a sequence of data elements made available over time

A Stream can be viewed like a standard list. But like Promises/Futures, it enriches the meaning of its elements with the notion of time of occurrence.

Streams are becoming a central abstraction in JavaScript, especially on the server side. There are of course the builtin Node streams. But recent years saw the raise of another type of streams called Reactive Streams, they are based on the Reactive Programming paradigm and was made popular into JavaScript by libraries like Rx.js and Bacon.js (not to confound with the Reactive Streams initiative, which is another newcomer).

The basic idea is that you can represent different asynchronous data like events or HTTP responses by an unifying concept called observable collections. Then you can manipulate those collections by standard array operations like map, filter, reduce, forEach …etc.

This article is about an interesting experiment : we will use functional programming and algebraic data types to derive a pure functional definition of RP like streams. You’ll see that it’s quite simple to roll your own, beautiful, [Reactive] Stream implementation in a surprisingly simple and extensible way.

The key difference is that for our Streams, Promises are the unit of asynchrony. The other characteristic is that they will be much easier to implement using a pure FP style (if you are familiar with Haskell or similar, you’ll recognize it immediately).

Note: Full sources are in this Github repo.

On the meaning of Streams

Ok, how can functional thinking helps us to derive a nice and clean definition of streams ?

The first thing is to come up with the right representation for the stream type. By right i mean the one that can describe all possible variants of a stream. What can be a Stream ?

To represent our concept, w’ll use Algebraic Data Types (ADT) we saw in the previous post. We need to find all the possible variants of streams that capture our intent.

Since Streams are a super concept of lists let’s examine the standard lists variants. So first

Stream a = Empty

Naturally a stream can be empty, meaning with no occurrences on it.

Second the standard recursive definition

Stream a = Cons a (Stream a)

You can think of it in this sens : at the moment we have an occurrence (head), there could be more occurrences on the future or there could be nothing more (empty), in either case the following occurrences (tail) are also a Stream.

Stream a = Abort Object

w’d like to consider the case when a data stream, like an incoming HTTP message, interrupts due to some error, in which case we say the stream aborts with a given error.

Now time for our VIP guest

Stream a = Future (Promise (Stream a))

Literally, a Stream can be of a type ‘Future’, the type holds a value of type ‘Promise (Stream a)’ : i.e. a Promise that will resolve eventually to a Stream.

It looks like a strange variant, how can a stream be a future of itself ?

Imagine the following case : w’d like to get a sequence of mouse move events until a mouse press event occurs. If the user presses the mouse immediately, no mouse moves will occur so the stream will be empty. If the user moves the mouse some time before pressing it then the stream will have some occurrences, i.e. not empty.

You may see the problem : we don’t know yet if the stream is empty or not until we get the first mouse move or the first mouse press. Put another way, a Stream can be a future value that will resolve to another Stream representation (possibly more concrete Empty or Cons). It happens that we already have Promises for this just purpose.

Here is our complete definition

Stream a = Empty
| Abort Object
| Future (Promise (Stream a))
| Cons a (Stream a)

Our representation now complete, we implement it using the the adt helper from the previous post

Before we move on how to program our Stream let’s first see some examples (all the factory functions below can be found on the Github sources).

Asynchronous Recursion

The nice thing about ADTs is that implementing functions that operates on them tends to be similar : pattern matching and recursion.Our Stream is nothing more than an old good recursive ADT. It means we can program it as we do with any other recursive data type like List.

As a first demonstration, let’s define the effectful (with side effects) ‘forEach’ method. W’d like to execute an action on all occurrences of a stream, another action when the stream aborts and finally one when the stream ends

The whole method is a long chained ternary expression that matches each possible variant on the stream type. We can write such chained code because the conditional operator (?:) is associative to the right.

The big news: all stream operations we will define in this article will be nothing more than conditional expressions (yes i’m serious, all operations). FP developers will recognize that this construct maps directly to pattern matching in FP languages.

First, let’s also define a helper debug method

Like arrays/lists, Stream is also a Functor, the ‘map’ operation, applied on a stream, return a stream of the successive results transformed from the successive occurrences on the input stream

Note how in the case of a promised stream, w’re doing a sort of asynchronous recursion because the promise callback will execute asynchronously in a microtask queue. It means we get the benefits of a clean and concise functional/recursive code without fear of stack overflow problems.

The error handling equivalent of map, called ‘mapError’ extends the lifetime of an aborted stream by another stream

From Haskell to JavaScript (in no time)

If you are a Haskeller (if you are not it’s OK, you can follow reading) then you have certainly recognized the similarities with the definitions of functions on standard lists. The nice thing above is that you can easily adapt most of the concise FP definitions to define the same functions on streams.

For example take the Haskell definition of filter which returns the list of elements that satisfy a given predicate (if you don’t know Haskell JavaScript code will follow immediately)

And here is the equivalent definition on Streams: instead of filtering elements of a list, we filter incoming occurrences as they happen

The only difference is the Abort and Future case, which are trivially defined.

A little bit more elaborated example is the ‘span’ function which takes 2 inputs: a predicate and a list, and returns a tuple of :

1- first elements that satisfy the predicate and
2- the remainder of the list

Haskell code

Applied to streams, the function return a pair of streams: the first yields the first occurrences that satisfies the predicate and the second yields the remainder of the stream. The JavaScript code is derived straight from the Haskell equivalent

Taking a more elaborated example, the ‘groupBy’ function, which groups occurrences into nested sublists/substreams given a predicate, is defined in terms of span

Then the equality grouping function is trivially derived

Using the same declarative approach, we can define all imaginable list operations on streams. If you know the Haskell definition (or any equivalent FP definition), then you know the Stream definition. No hard to follow async code, no event listeners, no error prone async state manipulation just pure FP code and promises.

Here are some other functions that are defined in the source code. You may try to define them by youreself before looking at the Sources, just think about the meaning of the resulting streamsfor each case and the answer comes quite naturally

  • take(n) takes the first n occurrences from the stream
  • skip(n) skips the first n occurrences on the stream
  • takeWhile(predicate) takes the longest prefix that satisfy the predicate.
  • skipWhile(predicate) returns the suffix remaining after takeWhile
  • break(predicate) like span but the first stream is the longest prefix of occurrences that do not satisfy predicate

Streams and Promises interplay

Our ADT Streams play nicely with Promises in various ways. In this section w’ll see some useful operations that combine the 2 concepts.

1- Streams reduce to Promises

As a first example, let’s take the well known ‘reduce’ method (known as ‘foldl ’in Haskell). When applied to arrays, the method yields a single value representing the result of the accumulation, like the sum of an array of numbers:

[1,2,3].reduce( (prev, cur) => prev + cur ) // => 6

Now applied to streams (for example the sum of a stream of numbers), the final result is a future value because w’d have to wait for all future occurrences on the stream. So, the meaning of ‘Stream.reduce(…)’ is effectively a promise that will resolve to the final result of the accumulation or will be rejected in case of the stream aborted.

I find this meaning more logical than the one found in libraries like Rx or Bacon where the result of the accumulation is a single occurrence stream. It feels to me as if normal functions returned a single element array [x] instead of simply returning x itself. As a rule of thumb, we can say that Streams reduce to Promises

Higher abstract operations can be defined on top of ‘reduce’.

‘length’ which yields the number of occurrences in a stream

‘all’ Checks if all occurrences satisfy a predicate

2- Streams reduce as soon as possible

We may be tempted, as we did for ‘all’, to define the peer function ‘any’ — which yields true if any occurrence satisfies the predicate — in terms of reduce like this

this.reduce( (prev, cur) => prev || !!pred(cur), false )

While the above code will yield the correct value. We must be aware of the meaning of time, because ‘reduce’ will wait for all occurrences on the stream before completing the promise. But we know that the result of an ‘or’ operation can be deduced as soon as we get a ‘true’ operand

false || /* stop here */true || false || true

Similarly on streams, we must yield a result on the first occurrence that satisfy the predicate

Hence the 2nd rule of thumb : Streams reduce as soon as possible.

3- *Until

A key interplay between Streams and Promises is the *until operators which act on a Stream based on the completion time of a Promise.

For example, the ‘takeUntil(promise)’ will take occurrences on the stream until a given promise is resolved, or will abort the result stream if the promise is rejected.

Here is our first attempt

Let’s review the code, the first variant is trivial. For the 2nd variant, if the stream has already a head then yield the head and forward the until request to the tail (this way we yield all past occurrences).

The third variant deserves some discussion: Promise.race(arr) will return a promise that will complete as soon as one of the input promises complete, so if the ‘untilP’ promise completes first we stop taking by yielding an empty stream, and if the future stream complete first we continue taking.

There is a subtle performance problem above, if ‘Promise.race’ completes with an empty Stream (‘untilP’ wins), the second promise ‘this.promise.then( s => s.takeUntil(promise), …)’ is still active. ‘Promise.race’ just returns the winner promise, it does not stop the other competing promises from continuing (Promise cancellation is an under-discussion topic and no standard way has emerged yet).

If we can’t cancel the fulfillment of the future stream itself, we can at least stop the fulfillment propagation and thus prevent the wasetful execution of the chained blocks (like ‘s => s.takeUntil(promise)’ above). And w’d like to keep our pure FP approach so w’ll have to avoid any state manipulation.

A simple and still pure FP solution is lazy evaluation, instead of the competing promises resolving themselves to the desired value, w’ll make them resolve to a lazy value and delay the evaluation until we get the winning promise. So the code will have something like

You can convert a promise callback returning a normal value to another returning a lazy one by simply inserting an ‘() =>’ in front of the callback body, so a callback with the form ‘x => y’ will become ‘x => () => y’.

So now if the ‘untilP’ promise resolves first, no further execution will happen on the loser’s chained blocks because evaluation will be postponed until we know which way to go.

Similarly to takeUnil, can you derive skipUntil which skips occurrences until a promise complete?

4- Asynchronous mapping (and filtering, taking…)

Previously, we have defined the ‘map’ operation as taking a function of type ‘a -> b’ which results on a ‘Stream b’ whose occurrences are values transformed from the input ‘Stream a’. The direct consequence of the type signature ‘a -> b’ is that the resulting occurrences will happen as soon as the sources occurrences occur. Put another way, w’are only acting on the value meaning of occurrences not their time meaning.

What if the mapping function can’t return a value immediately ? Imagine for example that we have a stream of button clicks that is mapped to a stream of ajax requests.

// result : Stream (Promise aResponseType)
result = Stream.fromDomEvent(fetchButton, 'click')
.map( e => $.ajax(...) )

The logical meaning of the above code is that we want to map button clicks to the future responses from the ‘$.ajax(…)’ calls and not to the ajax promises themselves.

We can define a an asynchronous version of ‘map’ — something like ‘asyncMap’ — to handle this case. Another solution is to extend the codomain (i.e. the range of the permissible outputs) of the mapping function to accept promises beside normal values, so the type signature will pass from ‘a -> b’ to ‘a -> b | Promise b’ so the mapping can now return either present or future values. This way, we can have a single function that handles both the sync and async cases.

the trick is in ‘Promise.resolve( f(this.head) )’. Promise.resolve() can take both a normal value as well as another promise. In which case it’ll follow the passed in promise and wait for its completion.

The above reasoning is not restricted to ‘map’, it can be extended to any stream function that takes a callback (like filter, span, takeWhile, …etc).

Taking the filter example, we can make the predicate asynchronous, thus extending the range of its use cases (if the filtering decision depends for example on some asynchronous IO operation)

As said above, the same pattern can be extended to all other stream operations, opening the door to a full range of new use cases. And all this without any state manipulation neither any buffering mechanism. Just simple conditional expressions.

Combining multiples Streams

In imperative code, defining operations on multiples streams is a non trivial task. Because elements on different streams can occur in different moments and at different rates, we have to solve synchronization problems between different sources (ever tried to drive a state machine whose transitions depend on multiple time-separated inputs?).

In the case of ADT Streams the only additional complication you get is that you’ll have to pattern-match against combined cases. Usually, combined operations involves 2 streams which lead to pattern matching -at most-on 4 x 4 cases, but, as w’ll see, most cases reduce to the same answer and you’ll get generally less variants to check.

Take for example the ‘concat’ operation, which like in arrays, joins 2 streams in a sequential manner : it means the concatenation will first yield all occurrences of the first stream then passes to the 2nd.

Because all decisions depend only on the first stream, we don’t have to pattern-match on all the possible combinations but just on the first one. So the 4x4 cases reduces only to 4 cases.

Note the concatenation of 2 streams delays the occurrences from the second stream even if they occur before or interleaved withe the first one. It’s as if they were buffered and released after the end of the first stream. Even worse, they will never occur if the first stream is aborted.

If we want to suppress the delay effect and yields occurrences from both streams at their time, then what we need is a ‘merge’ operation that takes into account the position in time of each occurrence

On merging 2 streams, past values (‘Cons’ case) always takes precedence (with a bias toward the 1st stream). But if the 2 streams are both promises we run a race between the two and the winner will take precedence. As with the *until functions, we use lazy values to prevent unnecessary and wasteful merging on the loser promise.

Another famous multi-list operation is ‘zip’, which pairs elements from 2 lists based on their positions ([1,2,3] zip [‘a’, ‘b’] => [ [1, ‘a’], [2, ‘b’] ]). On streams, it means each occurrence on the 2 input streams has to wait for its peer to happen before yielding the while pair :

Monadic Streams

Until now we have considered occurrences of a Stream as normal JavaScript values. What if the incoming occurrences are Streams by themselves ? i.e. what is the meaning of a Stream of Streams? or put another way: How do we flatten a 2-layered stream ?

There is no single answer to the above question as we can combine nested streams in different ways.

We can merge the nested streams: we yield occurrences from the 1st incoming stream (that comes with the 1st occurrence), then when the 2nd stream appears we merge it with 1st stream and so on. For example, imagine we have a chat room with new users coming over time, and each user can type multiple messages in the chat room. The chat room can be viewed as a stream of incoming users, and each incoming user himself can be viewed as a stream of incoming messages, so we end up with a stream whose occurrences (users) are themselves streams(messages from each user). The log of all messages from all users, present and futures, is the merge of messages from all users.

Another way to flatten a 2-layered Stream is to concatenate the nested streams side by side, i.e. we take all occurrences from the 1st incoming stream, then we take the occurrences from the 2nd incoming stream …etc. As a practical example, imagine now we’re sending an ajax request each time the user hits a button, and suppose we can get, for each request, a stream representing the response body, so we end up with a stream whose occurrences (ajax responses) are themselves streams (body of each response). The log of all bodies, printed one after another, is the concatenation of bodies from all ajax responses.

In fact, if you think a little about it, flattening a stream is quite similar to the ‘reduce’ operation, we combine nested occurrences by a binary function as they appear on the stream. So we can define a generic flattening version

Then we can define specialized versions in terms of the generic flatten. For example the flatten-by-merge operation can be defined like this

Similarly, the flatten-by-concat is

And so you can easily flatten by any binary operation.

The meaning of the monadic operation ‘flatMap’ (or bind, or chain) is less obvious : mapping a stream with a function that transforms each occurrence on the source stream into a child stream, then flatten all generated streams.

For example, the meaning of ‘mergeMap’ is mapping with a function that transforms occurrences into sub-streams then merge all generated streams

Similarly, ‘concatMap’ concatenates generated sub-streams

Throttling and Debouncing

I did not want to conclude before testing how ADT streams can handle some advanced operations found on RP Stream libraries, i’ll take the famous examples of throttling and debouncing.

You may be aware of the terms from libraries like JQuery or Underscore. There are also present with multiple variants on most RP libraries.

The simplest of both is throttling; a simple illustration is when handling mouse move events, instead of running the handler on each mouse event, you may prefer to skip some move events for performance reason. For example you may choose to handle a single mouse move event once each 50 milliseconds.

If you look to a conventional/imperative implementation of throttling (for example form Underscore sources), you’ll see a bunch of state variables and timeouts operations along with the required synchronization code. As i intend to keep my promise on the beginning of this article, the actual Stream definition will be no more than a single conditional expression

Let’s review the code, throttle takes a generator function ‘() => Promise’, w’ll use it to generate a new event whenever we start a new throttling iteration. This way we can throttle by any event not just timeouts.

As soon as we get an input, we yield it then skip all subsequent inputs until the throttling event terminates (‘skipUntil’ will skip occurrences from the tail until the passed in promise completes).

The case of debouncing is a little bit complicated. Basically, debouncing is a technique that can be used to optimize handling of events that occurs at a rapid rate.

For example, if you’re handling the window ‘resize’ event in the browser, you’ll get all ‘resize’ events as the user keeps dragging the mouse. If you’re doing some heavy DOM manipulations inside your handler you may prefer to let the user finish resizing and react directly to the final result (window size). We can suppose that if a period of 500ms passed without receiving any resize event, then the user may have finished dragging and then we handle the last event occurred before this quiet period. We say that w’ve debounced the resize event by a delay of 500ms.

Implementing debouncing can be a bit tricky using imperative stateful code, w’ve get to run timeouts on each event occurrence and do some state housekeeping in order to keep track of the delay between successive occurrences.

As usual, in our ADT streams, this is just another conditional expression

‘debounce’ takes also a generator function to make new debouncing events. We use an accumulating input ‘last’ to mark if w’re inside an ongoing debounce operation (ie w’ve a last input to be checked). The ‘undef’ special value let us know if there were any precedent input (we can’t use null or undefined because streams can yield any JavaScript value).

As soon as w’ve got an input (the Cons case) we start debouncing using its value. And if the tail is a future, we run a race between a new generated event (call to ‘event()’ inside Promise.race) and the tail itself, if the debouncing event wins, it means no occurrences happened in the stream during this period so we include the last input and start a new debouncing iteration. if instead, the future tail wins, then there were occurrence(s) before the debouncing event, so the last input is not (yet) included and we continue debouncing with the completed tail.

Note we do not discard yet the last input because w’d have to know the type of the completed tail : if it is Empty or Abort then we include the last input (because w’re sure there will be no more occurrences). If it is a Cons, then we effectively discard it and restart debouncing with the new head.

As a said, that’s a little more complicated but even, w’ve managed to do it with a simple (a little longer) conditional expression, meaning no state and no imperative code.

Conclusion

You can find the complete sources at Github along with usage instructions.

Note it’s more about experimenting a concept than releasing a new library. The main intention of the Github Sources is to provide a starting point for anyone willing to go further with this. Hopefully, there will be people providing some useful feedback on the concept and its real world feasibility. What follows are some general thoughts on the practical issues

Testing vs proof

I putted some unit tests in the sources (obviously this is far from enough). For a lot of reasons, it’s a not a good idea to test this kind of code in a real machine so i used the concept of a Virtual Scheduler borrowed from the RX library. The scheduler can generate virtual and precise timeouts events so we can test all Stream operations in a precise and deterministic environment.

But i’m still thinking that, speaking of unit tests, it’s never enough. We feed our functions with some inputs and make assertions on the output. Unit testing, while important, is just a poor, and probabilistic way of proof.

While we can’t go for a formal proof method, we can do better than standard unit tests: i’m thinking of something like Quickcheck in Haskell, w’re still in the testing land, but now instead of testing a few inputs we test for invariants, i.e. general properties that our function should satisfy. This is best motivated by this quote from the above link

(…) you might be able to guess some more interesting cases to check for “edge” conditions (…). But another issue is figuring out what the right answers should be for such cases. This is not too difficult for a simple function, but can be much harder for more complex problems — — even for experienced programmers.

So the idea is to think of properties a function should verify for each possible input, write the property assertion and the testing framework will try to fail the assertion by checking it against a high number of generated inputs. A simple example is of the ‘reverse’ function on lists : logically, reversing an already reversed list shoud yield the original list

reverse( reverse(list) ) == list

Obviously this is more reliable than testing against 2 or 3 sample inputs.

I think there exist some Quickcheck based libraries in JavaScript so the idea should be feasible. The trick is — of course — to come out with pertinent assertions for each stream function.

Resource consumption

In reality, i’m less worried about the correctness of the definitions (they are derived from Haskell after all) than i’m about resources, time and memory.

For the time question, i think the key factor is the promise case : promises execute their then blocks asynchronously in a microtask queue, this prevents stack issues but can introduce some latency issues, especially when we’re dealing with occurrences happening at an extremely rapid rate. For most use cases, i think this is less from an issue, because in a microtask queue (a true and not a timeout emulator) w’re still far from the milliseconds magnitude but maybe i’m missing something here.

For the memory issue, ADT Streams in themselves are very lightweight objects (one or 2 variables), and the question translates directly to a more general Garbage Collector/Object retention problem. If you hold a long living reference to a huge Stream (like a global variable in the browser) then the entire Stream chain will be kept in memory. So we’ve to avoid global or other kind of long living references (like module properties).

IMO, the real cool thing about ADT Streams is that they open a full, and infinite, range of possibilities on stream programmability.

The ADT concept simplifies a lot the programming model, and the Promise variant makes handling asynchronous cases a lot more simple. Instead of providing a big monolithic library with a huge list of operations, we can instead provide just a tiny core with a small set of operations and anyone can define the needed operations using a clean and beautiful FP style.

--

--