Getting started with js-csp
The TLDR of CSP (Concurrent Sequential Processes) is that it’s another way of modelling concurrency just like other models you might be familiar with — e.g. using Observables in ReactiveX implementations; Promises; or even callbacks (which I believe is a.k.a CPS (Continuation Passing Style)).
CSP uses a message passing style of communicating between processes. “Processes in JS!?” you might say — sure we might not be running more than one thread at a time, but the single thread we are running is typically in no shortage of asynchronous concerns to deal with — something CSP can help us out with.
Perhaps, in the context of JS, “communicating between generator functions” might be easier to follow.
In any case:
I hear and I forget. I see and I remember. I do and I understand. — Confucius
There… I can feel my beard growing longer already…
App requirements
We’ll be talking about CSP in the context of an example CLI app. Which means… this might not be the most focused discussion of CSP you’ll come across. But hey, my editor approves it (pats self on back).
We’ll be going through most of the code here but not entirely, so checkout the repo on Github.
We want:
- Given: Github owner, Github repo, your Github username, your Github password
- Output: a JSON document with all contributors of the selected repository (i.e. the given owner + repo), as well as all followers of each contributor of said repository.
Some things to note:
- We need to exhaust Github’s pagination for the followers API.
- We need to buffer found results in memory (as opposed to spitting data out ASAP), since we also want to do some head counting and make a record of totals in our output JSON.
- We will only
console.log
the output JSON so we can re-direct the program’sstdout
to a file. Errors can useconsole.error
.
Another thing to note is that you might want to put the following in your .bash_profile
(or similar):
export HISTCONTROL=ignorespace
That way, you’ll be able to prefix any commands you run through your shell with a space and have that command omitted from your history. (Of course, you might want to check if your shell actually supports this — I’m running zsh, but this should work fine for bash too).
Now you can export your Github password as an environment variable (GITHUB_PASS
) once (prefixed by a space); not have that stored in your history; and not have to supply it every time to our CLI app (running the risk of forgetting to whitespace prefix that command) — phew… that was a lot shorter to explain in my mind…
Channels and processes
In CSP, we pass messages through channels. Unlike message passing in some other models of concurrency (like the Actor model), channels are not a simple “fire and forget” mechanism.
… and walk away from it. Think:
- Phoning someone and waiting for them to pick up to tell them a message (synchronous message passing)
- V.s. sending them an email (asynchronous message passing)
Another way you’ll hear this put is that channels come with “back pressure” built-in. Through back pressure, a channel’s consumer can communicate it’s readiness for consumption to the channel’s producer — which in turn can stop producing messages until the consumer is ready.
(Note: there are variations to this (like buffered channels) — but we’ll only be covering the basics here).
That’s pretty much all we need to know about CSP to start using it. Lets now have a look at our app’s processes and the channels they’ll be using.
Request process
We’ll have one process solely dedicated to making HTTP requests:
- We’re using an infinite loop (a common pattern when programming with generators) to
csp.take
messages from arequestCh
channel. This operation blocks until there is a message on the channel. That doesn’t mean it blocks our single thread (our program). We’reyield
ing out of this generator function allowing anything else on the event loop to execute. - We’re checking we got the right kind of message (more on this later).
- Using the message’s content, we’re making an HTTP request and replying back on the given
successCh
channel with the parsed JSON we got back from Github’s API.
Contributor process
Another process will handle what to do with the response from the “get me all contributors request”:
- I only skimmed the Github API but I saw no reference to pagination there, so I’m assuming the contributors request gets us all contributors.
- We’re taking a message from the channel (put there by the
requestProcess
) and for each contributor in the JSON response, we’re adding the contributor to our output by sending another message to theoutputCh
. - We’re “spawning” another process (i.e. running a generator function) to handle getting the contributor’s followers. Apart from the initial URL to fetch a given contributor’s followers from (and the contributor’s login name), we’re also passing this new process a dedicated channel it can use to communicate with
requestProcess
. - Notice that spawning a new process gives us back another channel (
doneCh
). This channel receives the process’s return value. Since we’retake
ing from this channel… it’s kind of important that ourfollowerProcess
generator function actually returns a value (and not loop infinitely) as that will unblock thecontributorProcess
and allow it to process the next contributor. - Once all contributors are processed (and, because of the way we set it up — all followers too), we’re ready to put a
FlushMsg
on ouroutputCh
tostdout
our JSON.
Follower process
The followerProcess
below will be spawned for each contributor and it’s purpose will be to request all followers for a given contributor (exhausting pagination):
- Right off the bat we put a message on
requestCh
to make an HTTP request and get our first page of followers. - Again, we enter an infinite loop… but remember, unlike the
requestProcess
, we must eventually exit this loop as thecontributorProcess
is waiting for a done signal on thedoneCh
. However, since we don’t know how many pages we’ll be requesting, we start off in an infinite loop. - After getting our JSON back, we send all followers to the
outputProcess
. - We check for the existence of the
Link
header in the response. If it exists we must put another message on therequestCh
to make our next HTTP request. Otherwise, we’ll just return since we’re done paginating. Returning ends the generator function and thedoneCh
returned bycsp.go
in ourcontributorProcess
gets a message (supposedlyundefined
since we’re not returning a value here).
Output process
Our final process, outputProcess
is responsible for buffering all collected values and outputting our result when we tell it to. If you’re familiar with Redux, you should be pretty familiar with what’s going on:
- We define our initial state and enter an infinite loop (since we don’t know how many messages we’ll be processing).
- The idea then is to wait for messages and change the state accordingly. The reducers (which change our state) are not included here as they’re not quite relevant to the subject matter (see final code).
- Upon receiving a
FlushMsg
we make our final reduction to our state (to add totals to the output JSON) andconsole.log
.
Our messages are some type of sum types — ye?
When sending messages around, it stands to reason that you want to be able to distinguish between different types of messages. You might be familiar with the use of the action
field in Redux messages — or some other ad-hoc way of tagging your messages. That’s cool — it works (even Erlang uses it).
However, I have a preference for using sum types if at all possible. JS doesn’t have (static) types, so we’ll be using the nifty daggy npm package to simulate them.
Why? Because I want to be able to create enumeration of messages, each with their own constructor to create them (without typos etc…) and check whether a given value (message) is of a particular kind.
The following example should given you a good idea of how to use daggy
if you’ve never used it before:
A couple of loose ends & conclusion
CSP-unrelated, we’re using meow for CLI arg parsing and then calling the following main function:
Notice how we’re calling our initial go-routines (a.k.a processes, a.k.a generator functions — wow such names).
More importantly, notice that you cannot just yield
in any old function — it must be a generator function. What’s more, you cannot just csp.take / put
in any old generator function — it must be a generator function kicked off with csp.go
. Hence the use of the helper function put
above — which allows us to send the first few messages to kick off the whole thing (setting the project name and making our first HTTP request for the list of contributors).
Yes it takes a while
Note: if you’re actually going to try running this — it does take a while. The default owner/repo
combo is ubulonton/js-csp
and the output took about 1min 35sec to complete (I used time for this).
If you see the reducer’s code on Github , you’ll notice I’m not mutating the state and opting to copy over values instead. You might think that mutation would speed things up — but I tried doing so and there was only around a 1sec difference between the two.
In hind sight — the real issue is that not much is happening concurrently here folks lol… hey you gotta start somewhere right? At least you now know the basics of js-csp
— perhaps you could write a concurrent version of this as an exercise? Or perhaps I’ll do just that next time. (liking the post doesn’t hurt :P)
Edit: Read part 2 here.
Got some js-csp examples in mind?
As an FYI, the people behind js-csp are currently looking to document some pragmatic use of the CSP model in JS, particularly:
how channels can help synchronize state in an interactive web app.