Performance Optimization in Clojure Using Reducers and Transducers — A FORMCEPT Exclusive

Abhishek Jangid
FORMCEPT
Published in
9 min readMay 10, 2018

Why Reducers

Reducers give us an alternative approach of using sequences/sequence functions to manipulate standard Clojure collections. These sequence functions are typically applied lazily (to delay execution of operations until they are forced, to get higher performance), in order (to get consistent results), create intermediate results (because of nested execution structure) and in a single thread.”

Yet, some of the sequence functions (like map, filter etc.) can be applied in parallel, which will result in performance enhancement as the machine will get more cores to execute process.

Exploring Reducers

A reducer is basically the combination of a reducible collection (a collection which knows how to reduce itself) and a reducible function (the process of what needs to be done during reduction/execution of operations).” As mentioned earlier, sequence functions are generally applied lazily, so in reducer’s case, execution of the operations is deferred until the last reduction is performed. This implies that it removes the intermediate results and lazy evaluation seen with sequences.

Apart from that, some collections (persistent vectors and maps) are foldable, which helps reducer to execute reduction in parallel (on multithread) by:

  • Partitioning the reducible collection at a specified granularity (default = 512 elements)
  • Applying reduce function to each of the partitions.
  • Recursively combining each partition using Java’s fork/join framework.

Note: If collection does not support folding then it will fall back to non-parallel reduce.

Fork/Join Framework

This particular framework was designed to speed up the execution of tasks that can be divided into smaller subtasks by executing them in parallel first and then combining their results back into a single one. The process is detailed out below:

  • Applies a divide and conquer principle and recursively break the task up into smaller parts/subtasks until a given threshold. This is the fork part.
  • These subtasks are processed independently from each other on multi-threads.
  • The results of these processes are recursively combined to get an accumulated one. This is the join part.
  • To execute the tasks in parallel, the framework uses a pool of threads, with a number of threads equal to the number of processors available to the Java Virtual Machine (JVM) by default.
Fig. Showing the working of Java’s fork/join framework
  • The most important feature here is that it uses Work Stealing Algorithm:
  • Each thread maintains its own queue to perform tasks.
  • With Work Stealing Algorithm, threads that has no task left to process can steal tasks (by removing tasks from the tail of the queue of another thread) from other threads that are still busy in processing.
  • This approach makes processing more efficient by increasing throughput when there are many tasks to process or threads have varying rate of performance in terms of executing tasks.

Now, let us dive into examples to demonstrate that we can optimize our code significantly by deploying reducers. We will also be defining reduce and fold functionally.

reduce and fold

The Clojure.core.reducers namespace (aliased as r here) provides an alternative r/reduce function.

(r/reduce f coll)
(r/reduce f init coll)

Here, f is reduce function, init is base or initial accumulator and coll is reducible collection.

The reducers version differs in that:

  • Map collections are reduced with reduce-kv and for other collections it uses CollReduce protocol (which is used to reduce a collection like persistent vectors).
  • When init is not provided, f is called with no arguments to produce an identity value.

But, in general, instead of reduce we use fold, which implements reduce and combine in parallel and makes execution faster.

Sometimes it may also be useful to execute eager reduce with fewer nested structures or intermediate results, since applying lazy evaluation will store big objects of data (thunks) into the memory.

(r/fold reducef coll)
(r/fold combinef reducef coll)
(r/fold n combinef reducef coll)

Here, n is granularity or partition size (default = 512 elements) for a collection, reducef is reduction function which will be applied to each partition, combinef (default will be equal to reducef function) will be used to combine results coming from each thread and finally coll is our reducible collection. This operation maybe performed in parallel but the results will preserve the order.

The following functions create reducers from a reducible or foldable collection: r/map, r/mapcat, r/filter, r/remove, r/flatten, r/take, r/take-while and r/drop. As we talked about how execution of the operations is deferred until the last reduction is performed, so none of these functions transforms the source collection. To produce an accumulated result, we will have to use r/reduce or r/fold.

To produce an output collection, use clojure.core/into to choose a collection type or r/foldcat to produce a collection which is foldable, reducible, sequable and counted.

Let us compare the performance of r/fold in adding large range of numbers with reduce:
(reduce + (map inc (range 1 100000000))) ;; => 11341.426846 msecs
(r/fold + (r/map inc (range 1 100000000))) ;; => 8442.76924 msecs

Here, we are dealing with the addition of 100 million positive integers. In the first example, we are using map and reduce with plus on single thread. So, we are maintaining only one queue here to perform this task on a single thread. But, in the second example, we are using r/map and r/fold with plus. We are running it on a machine with 4 core processors. Computation can be faster if machine has more cores for the data to be divided among. r/map will create a reducer from given reducible collection and hold the execution of operations. But when f/fold with plus gets invoked, it will make partition out of this reducer (based on default granularity 512 elements) and divide this task among cores available on the machine for execution. Now that the task has been divided into smaller subtasks, reduce function (which is plus here) will now be applied on each part and then these results will be combined using combine function (which is also plus here, because of default value as reduce function).

Since this whole execution is done in parallel with smaller subtasks, it will be way faster than our single-thread approach (as compared with the execution time above).

But, if we have a collection with small number of elements, then eager reduce will be a good option since dividing tasks among threads and combining them is an overhead in itself.

Let us take one more example — this one is of counting frequencies of elements in a collection. We will be implementing both reduce and fold versions of it and then compare the performance.

(def data (into [] (take 100000000 (repeatedly #(rand-int 1000)))))(defn frequencies-v1 [coll]
(reduce
(fn [counts x]
(merge-with + counts {x 1}))
{} coll))
(defn frequencies-v2 [coll]
(r/fold
(fn combinef
([] {})
([x y] (merge-with + x y)))
(fn reducef
([counts x] (merge-with + counts {x 1})))
coll))
(frequencies-v1 data) ;; => 57001.849834 msecs
(frequencies-v2 data) ;; => 33784.687088 msecs

In this particular example, we have taken 100 million random numbers as our base collection. Now we are calculating frequencies for each element in the collection. The first example shows implementation using reduce on a single thread and second one shows implementation using fold to make it multi-threaded. Here, we are defining our combine function as well because this time it can’t be same as reduce function. On each thread we have to reduce the collection by counting each iteration of element in the collection by adding 1. But, while combining we will have to add values for the same keys (using merge-with) in the hash-map. Here also, we are getting a better performance by using reducers.
We have seen that while reducers are much faster, we should only use them when there is only computation (i.e. no I/O blocking), data is sufficiently large enough (since small data partitioning data is an overhead in itself) and when source data can be generated and held in memory. Now, let us explore Transducers in the subsequent sections.

Exploring Transducers

Transducers are basically a chain of functions wrapping each other or composable algorithmic transformations. They are independent of the context of input and output and because of that they can be used in many different execution of processes like collections, streams, channels etc. Transducers are a transformation from a reducing function to another.

I recommend you to watch Rich Hickey’s talk on Transducers to get a better idea about transducers.

Most of the sequence functions in clojure, have an arity that can produce a transducer.

Ex. (filter odd?) ;; => returns a transducer that filters odd
(map inc) ;; => returns a mapping transducer for incrementing
(take 5) ;; => returns a transducer that will take the first 5
values

For writing transducers, the recommended way is to use clojure.core/comp function to compose algorithmic transformations over data.
Let us define a transducer using comp:

(def xf
(comp
(filter odd?)
(map inc)
(take 5)))

The transducer xf is a stack of transformations which will be applied to our series of elements/collection. Each function in this stack is performed before the function it wraps. Composition of a transformer runs right-to-left but it builds a stack which runs from left-to-right. So, in this case, filtering will happen before mapping, and similarly mapping will happen before taking 5 elements out of it. So, this transformation is equivalent to the sequence transformation:

(->> coll
(filter odd?)
(map inc)
(take 5))

Function transduce is a common way of applying transducers, which is analogous to reduce function.

(transduce xform f coll)
(transduce xform f init coll)

transduce follows eager evaluation, so it will reduce over coll immediately with the transducer xform then applied to the reducing function f, using init as initial value (if provided, else f will be called with no arguments to produce an identity value).

We have defined one transducer xf above, which filters out odd elements from a collection of elements, then increments each element by one and finally takes out first 5 elements from that range. Let us use transduce to apply this transducer over a collection.

Ex. (transduce xf + (range)) ;; => 30
(transduce xf + 100 (range)) ;;=> 130

In the first example, we are using range which will give us a lazy sequence of infinite numbers. Now, we will reduce this lazy sequence over transducer xf with reducing fn plus(+). We can use eduction function to capture the process of applying a transducer. Ex: (eduction xf (range)) ;; => (2 4 6 8 10)

So, applying transducer xf over range giving us a collection of 5 even elements from range 2 to 10. Now, we have our reducing function plus(+) to get accumulated result.

Much of the clojure code relies on applying nested transformations to sequences:

(reduce + 0 (filter odd? (map inc (range 100000000))))

Conceptually, this takes an input sequence of 100 million elements (range 100000000), then creates an intermediate sequence of (map inc), then creates an intermediate sequence of (filter odd?), then reduces that with +.

Now transducers let us represent the transformation parts as an independent and reusable thing.

(def xf
(comp
(map inc)
(filter odd?)))

We will then apply the composite transformation in a single pass over the input:

(transduce xf + 0 (range 100000000))

Here, transducer combines (a) input iteration, (b) transformations and (c) what to do with the results — in this case apply + as a reduce. There are other functions as well to combine results: into collects results into a collection, sequence can incrementally produce a result, eduction can delay the execution.

Why Transducers

Transducers offer a number of benefits:

  • Composability — As we have discussed earlier that transducers are independent of their input and output contexts and so they are reusable in many different contexts such as core.async channel, collections, streams etc.
  • Performance — This sequence example above allocates two large intermediate sequences(map and filter). The transduce version of it does no intermediate allocation. The performance change can be dramatic if we increase the input size or number of transformations here. If we happen to have our data in an input collection(like a vector), into with transducers can be used to move data directly from one vector to another without ever producing a sequence. Since transducers does not possess any intermediate sequences, they will be faster then our simple nested execution of functions over a series of elements.
Ex. (reduce + (filter odd? (map inc (range 100000000))))
;; => 3960.99076 msecs
(transduce xf + 0 (range 100000000))
;; => 1368.760545 msecs
  • Eagerness — By using transducers we will process entire input into the entire output, we can do that eagerly to avoid overhead of holding big objects of data(thunks) in to memory.
  • Resource Management — Transducers has ability to process the input eagerly, so one knows when the process will get done and according to that can manage resources.

So that is how, reducers and transducers play a massive role in code optimization in Clojure. If you would like to read more such tech blogs from us, please follow us on http://medium.com/@formcept and visit our blog here. Stay tuned!

--

--