Better Abstractions With core.async


Photo credit: nervous system

core.async is a Clojure library implementing communication sequential processes, an approach that allows code to be structured as producers and consumers of messages passed through channels. CSPs are an approach to dealing with concurrent activity in a program, and exist as a strong alternative to the kind of callback oriented programming present in NodeJS (for example).

I’ve been a big fan of Clojure’s core.async library since I first heard about it, and have been eagerly using it in a number of ways. I see core.async as fulfilling two roles: enabler and simplifier.

Enablers allow something to be accomplished that otherwise could not be, at least without a large amount of work and uncertainty. Most of the technologies we use are enablers, from the very language, to the libraries and frameworks. For example, after using Clojure for a while, you will find that expressing algorithms using lazy evaluation is clear and simple and desirable. Clojure enables lazy evaluation, by having all that support built into the core language and library. You wouldn’t want to build all that yourself.

Likewise, core.async gives your application many desirable features; breaking your processes into sequential building blocks, connected by channels, is a terrific way to build servers with great throughput and responsiveness. That’s the majority of what core.async is about, and it paints a very desirable picture of your server humming along, always keeping a small pool of threads hot (fully loaded into memory and ready to run) and busy (doing useful work).

But core.async can also act as a simplifier. It’s not just about performance and scalability, it’s about expressing yourself more clearly.

I’ve been working on a bit of code to allocate payment codes (for a project at Aviso). The codes are associated with an invoice that needs to be paid; the payment code acts as a temporary alias that can be texted to a user’s phone.

The codes are short: six alphabetic character such as UWESHL. Essentially, the codes are a number in base 26, ranging from 0 (AAAAAA) to 308,915,775 (ZZZZZZ).

In order to allocate these easily in a cluster of servers, with minimal coordination, the range of possible numbers is broken up into 64K blocks of 4K codes each (that adds up to 28 bits, which covers most of the range of what can be expressed in six base 26 digits). There’s no requirement that the payment codes be sequential … just that they are unique. The database is used to allow servers to allocate blocks, and then any individual server can allocate the stream of 4096 codes within a block without again touching the database.

Ok, so here’s the issue; my code gets to the point where it needs one of these payment codes, so there’s a function that gets called to provide the next payment code. My first pass looked something like this:

https://gist.github.com/hlship/8271a2744dc4705a9a46

At its core is an atom that is used to track the state; the id of the payment code block, and the most recently allocated index. I don’t show generate-payment-code but it has to do a few things, to allocate a new block when needed, then advance to the next payment code index, then generate the six character string … and return the new value and the new state (needed to generate the next payment code). And, of course, the code must incorporate a spin loop just in case multiple threads allocate codes at the exact same instant.

This is ugly in a number of ways; not least of which is that, as a function, a consumer of payment codes needs to provide specific arguments (the database, the state-holding atom) needed by the generator of payment codes. That feels complected, and fragile.

So, as I was figuring out what the code inside generate-payment-code would look like I had that delightful spark of insight: If I could look at this problem differently, not as a function to invoke, but as a channel to take values from, I might end up with a better design, easier to implement.

A core.async channel is a perfect way to isolate the generation of payment codes from the consumption of them. But that leaves the question of how many payment codes should be generated. As it turns out, the answer is … all of them! Let’s take a look:

https://gist.github.com/hlship/bd3938545395be7e0545

The returned feed channel has a buffer size of 10 … this means that the go block will always try to have 10 payment codes “in the queue” ahead of any consumers.

The loop is endless; it allocates a block from the database, and then generates all the keys for that block; the onto-chan function reads from a seq and puts each successive value into the channel. At first glance, it looks like it will immediately generate every possible payment code and stuff them all into the feed, filling up all available memory. Fortunately, that’s not the case.

Visualize what this code does: at startup it allocates the first block id and generates a lazy sequence of all the payment codes for that block. Immediately, the feed channel is filled with the first 10 codes from the lazy sequence. Since the feed channel is now full, the go loop inside onto-chan parks.

Eventually, some other thread takes a value from the feed; once that happens, the process created by onto-chan un-parks, takes one more value from the lazy sequence, and puts that into the feed channel.

Much later, enough payment codes have been taken out of the feed channel that the 4096th payment code from the block can be pushed into the feed channel, exhausting the lazy sequence of codes from the block. At that point, the create-payment-code-feed process, parked at line 16, un-parks … and the process begins again, with a new block immediately allocated.

Perhaps the most confusing part is what happens when the application shuts down … what happens to the two processes (the one directly inside create-payment-code-feed, and the nested one inside into-chan)?
The answer is: everything just cleans itself up. Underneath the covers of core.async channels are the same kind of callbacks that are so derided when they appear in JavaScript. When a process is parked, what’s really happening is that a callback has been added to the channel … for example, at line 16, a callback is added to the channel returned by onto-chan. Inside onto-chan, there’s a callback on the feed channel. Once the system shuts down, the outside references to the feed channel will be released; eventually, the callbacks related to the feed channel and the onto-chan channel will form a little island of self-reference, with nothing outside pointing in … at which point, the channels, the callbacks, the lazy sequence, and everything else will be garbage collected.

This whole approach is very appealing … it lets you express stateful and asynchronous operations using the same approach that would normally apply to the stateless and synchronous operations elsewhere in your code. It teases apart logic that would normally be combined due to short-term convenience or necessity.

Just as when I was first learning object oriented programming, and then later first learning functional programming, I suspect I will continue to learn better and more elegant ways to make use of channels. It’s something to look forward to.