Asynchronous Clojure

Getting started with channels and the core.async library

Functional Human
Jun 17 · 9 min read
Photo by nima hatami on Unsplash

Channels

What is an Inversion of Control thread?

Creating a Channel

lein new app async-basics
:dependencies [[org.clojure/clojure "1.10.0"]
[org.clojure/core.async "0.4.500"]]
(ns async-basics.core
(:require [clojure.core.async
:as a
:refer [>! <! >!! <!! go chan buffer close! thread
alts! alts!! take! put! timeout]])
(:gen-class))
(def simplechan (chan))
(put! simplechan "1")
(put! simplechan "2")
(put! simplechan "3")
; Or
; (put! port val)
; (put! port val fn1)
; (put! port val fn1 on-caller?)
(take! simplechan println)
; 1
(take! simplechan println)
; 2
(take! simplechan println)
; 3
(take! simplechan println)
; nil
(take! simplechan println)
; nil
; Create a new channel again so that we know it is empty
(def simplechan (chan))
; What happens if we try and call it 1024 times?
(dotimes [i 1024] (put! simplechan i))
; nil - Seems to be working ok
; What if we try adding on one more value?
(put! simplechan "Exceeded Capacity")
; Execution error (AssertionError) at Clojure.core.async.impl.channels.ManyToManyChannel/put_BANG_ (channels.clj:152).;Assert failed: No more than 1024 pending puts are allowed on a single channel. Consider using a windowed buffer.
(< (.size puts) impl/MAX-QUEUE-SIZE)

Dropping Buffer Channels

(def droppingchan2 (chan (a/dropping-buffer 2000))); What happens if we try and call it 20,000 times?
(dotimes [i 20000] (put! droppingchan2 i))
; nil (nothing fell over)
(take! droppingchan println)
; 0
Anatomy of a Core.async queue from Rich Hickey’s Channel talk
(def droppingchan (chan (a/dropping-buffer 2000))); Write 20,000 items of data to the channel
(dotimes [i 20000] (put! droppingchan i))
; nil - all seemed ok when adding to the buffer
; What if we try overloading the take queue with 20,000 requests?
(dotimes [i 20000] (take! droppingchan println))
; Assert failed: No more than 1024 pending takes are allowed on a single channel.
(< (.size takes) impl/MAX-QUEUE-SIZE)
(def droppingchan (chan (a/dropping-buffer 2000))); Write 5,000,000 items of data to the channel!
(dotimes [i 5000000] (put! droppingchan i))
; nil - Our put's succeed but only the first 2000 items are put
; The 2001 - 5,000,000th item never makes the channel!

Sliding-Buffer Channels

(def slidingchan (chan (a/sliding-buffer 2000))); Write 5,000,000 items of data to the channel!
(dotimes [i 5000000] (put! slidingchan i))
; nil - We successfully wrote 5,000,000 items
(take! slidingchan println)
; 4998000
(close! simplebuffer)

Further Reading…

The Startup

Medium's largest active publication, followed by +479K people. Follow to join our community.

Functional Human

Written by

Functional Programming for Humans

The Startup

Medium's largest active publication, followed by +479K people. Follow to join our community.