Getting started with channels and the core.async library
Core.async is Clojure’s main library for asynchronous programming and communication. It makes asynchronous programming a lot easier by doing much of the heavy lifting for you.The library has a few objectives which you can read about here, but chiefly it aims to:
- To provide facilities for independent threads of activity, communicating via queue-like channels
- To support both real threads and shared use of thread pools (in any combination), as well as ClojureScript on JS engines
The basic building block is the Channel. Core.async provides a number of channel types off the shelf; buffered channels, dropping-buffer channels and sliding-buffers.
You can think of a channel as a conveyer belt of items that you can add at one end of the conveyer belt and take from the other end. You can only take one item at a time and occasionally you might have to queue up outside of the conveyer belt if there are other people adding items on in front of you. Sometimes you might have to also queue up to collect your items too. The belt can vary in size as well. Small conveyer belts can result in large queues building up outside, especially once the belt becomes full and there are no more people to collect their items.
The system works well most days, but sometimes the whole system shuts down if there are too many people queueing either side of it. One strategy for dealing with this is to simply turn people away if there are too many queing up already to add their items onto the belt. Another might be to simply throw bags off the belt if they haven’t been collected on a one in one out basis.
Before we get started it is important to understand that operations on channels can sometimes block. Blocking occurs when a thread is interacting with a channel and has to wait for something, e.g. a return value or acknowledgement. No further computation can be done on that thread and so future operations are blocked until something changes. In our slightly tedious conveyer belt analgy, a blocked thread might be analagous to a person who is waiting to collect an item from an empty conveyer belt, or a person waiting to add their item on to an already full one.
What is an Inversion of Control thread?
Inversion of Control is essentially the idea of handing over some responsibility for execution over to something else, e.g. a library or framework. Normally we control precisely what the software is doing at any given moment, but with the inversion of control pattern, we are effectively asking the software to control us in response to some event.
To avoid blocking our thread and pausing our program execution, we can instead create an IOC thread using the go macro, which will instantly return control back to our program after each task. It does this internally by parking the channel state, which avoids us having to manually write additional call-backs and events. The internal state machine keeps track of the channel for us and will notify us of any state changes.
To keep things simple we will be working with Clojure’s ordinary threads in this tutorial, so keep in mind that some methods might not work for IOC threads.
Creating a Channel
Let’s start playing around with the library to get a feel for how it works. Using Lein, create a new app and start the repl:
lein new app async-basics
We should then add the async library into our dependencies in project.cli:
:dependencies [[org.clojure/clojure "1.10.0"]
Finally, adjust our core.clj namespace declaration to require the library:
:refer [>! <! >!! <!! go chan buffer close! thread
alts! alts!! take! put! timeout]])
Run the repl
lein repl. We can then start by creating a basic channel, which has no buffer by default:
(def simplechan (chan))
Now that we have defined our simple channel, simplechan, we can write to it as follows:
(put! simplechan "1")
(put! simplechan "2")
(put! simplechan "3"); Or
; (put! port val)
; (put! port val fn1)
; (put! port val fn1 on-caller?)
We can also supply a function, fn1 to call when complete, and optionally specify to call this on the same thread or a different one. Now what happens when we retrieve a value from a channel using take? We will pass in println as the function to call when we have the value:
(take! simplechan println)
(take! simplechan println)
(take! simplechan println)
(take! simplechan println)
(take! simplechan println)
The key thing to remember is that both take and put are sending/receiving one item at a time.
How much data can we put on the channel?
; Create a new channel again so that we know it is empty
(def simplechan (chan)); What happens if we try and call it 1024 times?
(dotimes [i 1024] (put! simplechan i))
; nil - Seems to be working ok; What if we try adding on one more value?
(put! simplechan "Exceeded Capacity")
; Execution error (AssertionError) at Clojure.core.async.impl.channels.ManyToManyChannel/put_BANG_ (channels.clj:152).;Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer.
(< (.size puts) impl/MAX-QUEUE-SIZE)
It looks like Clojure can only handle 1024 items on our simple channel before it topples over. This is because our channel currently has no buffer, so it can only contain 1024 items before it’s input queue is full. We should perhaps try using a different type of channel. What if we don’t really need to keep 100% of the data going into the channel?
Dropping Buffer Channels
We will create a dropping buffer, which will drop values if the channel buffer receives too many values. What if we set the buffer to 2,000 as well, which is way past the 1024 items we had last time?
(def droppingchan2 (chan (a/dropping-buffer 2000))); What happens if we try and call it 20,000 times?
(dotimes [i 20000] (put! droppingchan2 i))
; nil (nothing fell over)
(take! droppingchan println)
It seemed to have solved our crashing problem and we now have up to 2000 items in our channel buffer at any given moment. To answer why this doesn’t crash, we should examine how the dropping buffer actually works. The channel we created has an internal buffer which is 2,000 items long. If we add more than 2,000 items onto the buffer at any point in time, then the newest values are simply dropped until more space in our buffer is made available.
Internally, a core.async channel actually consists of three queues. There is the buffer, which is how many items can remain on the channel at any given time, a put queue for adding items onto the channel, and a take queue for taking items off the channel. The assertion failed error message happens when we fill up either the put queue or the take queue with more than 1024 call handlers. In our previous example, our unbuffered channel had no internal buffer, and so any put requests were added into the put buffer, which has a hard limit of 1024 items.
(def droppingchan (chan (a/dropping-buffer 2000))); Write 20,000 items of data to the channel
(dotimes [i 20000] (put! droppingchan i))
; nil - all seemed ok when adding to the buffer; What if we try overloading the take queue with 20,000 requests?
(dotimes [i 20000] (take! droppingchan println)); Assert failed: No more than 1024 pending takes are allowed on a single channel.
(< (.size takes) impl/MAX-QUEUE-SIZE)
Well our dropping buffer channel was working quite happily when we bombarded it with data. It simply filled the 2,000 item buffer and then started dropping data. However, if we tried to read from it 20,000 times we run into the assertion failed method. The main reason for this is that reading a value from the channel has a bit of overhead, so the take requests stack up until we hit that 1024 request hard limit. At this point, we receive the assert failed message.
Hmmn, what about if we really bombarded our channel, could we overload it with put requests too?
(def droppingchan (chan (a/dropping-buffer 2000))); Write 5,000,000 items of data to the channel!
(dotimes [i 5000000] (put! droppingchan i))
; nil - Our put's succeed but only the first 2000 items are put
; The 2001 - 5,000,000th item never makes the channel!
The answer is no. It looks like the dropping buffer is doing its job and handling our 5,000,000 requests without falling over, and by handling, I mean it doesn’t crash regardless of how many times we put to it. Our dropping buffer simply maintains a buffer of 2,000 items and drops any remaining data instead of putting it onto the channel, i.e. it only writes data to the channel when there is space in the buffer.
What if we were only more interested in the most recent items instead? Well we can use a sliding-buffer for this.
(def slidingchan (chan (a/sliding-buffer 2000))); Write 5,000,000 items of data to the channel!
(dotimes [i 5000000] (put! slidingchan i))
; nil - We successfully wrote 5,000,000 items
(take! slidingchan println)
Again, similar to our dropping-buffer, we can try and put to it as many times as we like without an error. The channel maintains a sliding buffer of 2,000 items. So when we call
take! we get the oldest item first, which in this case is the 4,998,000th item. However, if we called
take! 5,000,000 times then we would get another AssertionError, as we would soon fill up the 1024 hard-limit for requests in the take queue.
The key thing to remember with the operation of both the dropping-buffer and sliding-buffer is that they will fill the buffer and drop packets when bombarded with put requests. The sliding buffer maintains a queue of the most recent n items, whilst the dropping buffer maintains a queue of the first n items until more space is made available on the queue (via a take operation). These are two helpful mechanisms that help consumers deal with onslaughts of data from producers.
Finally, we can also close our buffers:
Core.async channels are great. We have only covered the basics here, but you can do all sorts of fun things with them. You can mix multiple channels together and create pipelines, or distribute processing across multiple channels in parallel using transducers.
To learn more about core.async channels and the philosophy behind them, it is worth watching Rich Hickey’s talk on Clojure core.async channels.
Rich Hickey, the author of Clojure and designer of Datomic, is a software developer with over 20 years of experience in…
Another really helpful introduction video to the core.async library can be found below which features Timothy Baldridge explaining how to use the library. It is well worth a watch if you can spare 40 minutes of your time, but if not then read on as we will be covering some of the basic concepts in this article!
If you are interested in working with core.async in production then I would also highly recommend this video by Zach Tellman, which gives some interesting insight into handling queues. You can also use Zach’s open-source library, durable-queue to write data to disk as a cache for when a queue is being bombarded with requests. This will help your systems to recover in the event of a process crash.
Finally, the core.async reference sheet from Purelyfunctional.tv is a great resource, as is the official documentation. I would also recommend Clojure for the Brave and True which offers a fantastic introduction if you are getting started.