Of umbrellas, transducers, reactive streams & mushrooms (Pt.2)

thi.ng
thi.ng
Mar 7 · 21 min read
// HOF to build & return a preconfigured multiplier function
const mul = (n) => (x) => x * n;
// build a times 10 multiplier function
const mul10 = mul(10);
// use
mul10(4.2);
// 42
// Array.map() itself is a HOF which applies the given
// user function to each array value and produces a new array
[1, 2, 3, 4].map(mul10);
// [10, 20, 30, 40]
const mulArray = (arr, x) => {
const n = arr.length;
const res = new Array(n);
for(let i = 0; i < n; i++) {
res[i] = arr[i] * x;
}
return res;
};
mulArray([1, 2, 3, 4], 10);
// [10, 20, 30, 40]
// Simple transformation pipeline using built-in HOFs
[1, 2, 3, 4]
.filter((x) => (x & 1)) // keep odd numbers only
.map(mul(14)) // multiply (using fn from above)
.filter((x) => x > 40) // apply lower threshold
.map((x) => `answer: ${x}`) // format
// ["answer: 42"]
input  -> [1, 2, 3, 4]
filter -> [1, 3]
map -> [14, 42]
filter -> [42]
map -> ["answer: 42"]

Reductions

const sum = (acc, x) => acc + x;// Sum all array items
[1,2,3,4,5,6,6,5,4,3,2,1].reduce(sum, 0)
// 42
[].reduce(sum, 100)
// 100
"to be or not to be"
.split(" ")
.reduce(
// reducing function
(acc, x) => ((acc[x] ? acc[x]++ : (acc[x] = 1)), acc),
// initial (empty) reduction result
{}
);
// { to: 2, be: 2, or: 1, not: 1 }
// takes an array, reduction function and initial result
const reductions = (src, rfn, initial) =>
src.reduce(
(acc, x) => (acc.push(rfn(acc[acc.length-1], x)), acc),
[initial]
);
// returns all results
reductions([1, 2, 3, 4, 5], (acc, x) => acc + x, 0);
// [ 0, 1, 3, 6, 10, 15 ]
// returns only last result
[1, 2, 3, 4, 5].reduce((acc, x) => acc + x, 0);
// 15
const map = (src, f) =>
src.reduce((acc, x) => (acc.push(f(x)), acc), []);
const filter = (src, f) =>
src.reduce((acc, x) => (f(x) && acc.push(x), acc), []);
map(filter([1,2,3,4], (x) => x & 1), (x) => x * 10)
// [ 10, 30 ]

Transducers & Reducers

;; Clojure reduction w/ early termination once result >= 10
;; (the last 2 values 5,6 will not be processed)
(reduce
(fn [acc x]
(let [sum (+ acc x)]
(if (>= sum 10) (reduced sum) sum)))
[1 2 3 4 5 6])
;; 10
interface Reducer<A, B> extends Array<any> {
/**
* Initialization, e.g. to provide a suitable initial
* accumulator value, only called when no initial result
* has been provided by the user.
*/
[0]: () => A,
/**
* Completion. When called usually just returns `acc`,
* but stateful transformers should flush/apply their
* outstanding results.
*/
[1]: (acc: A) => A,
/**
* Reduction step. Combines new input with accumulator.
* If reduction should terminate early, wrap result via
* `reduced()`
*/
[2]: (acc: A, x: B) => A | Reduced<A>,
}
const sum = () => [
() => 0,
(acc) => acc,
(acc, x) => acc + x
];
const push = () => [
() => [],
(acc) => acc,
(acc, x) => (acc.push(x), acc)
];
const histogram = () => [
() => ({}),
(acc) => acc,
(acc, x) => ((acc[x] ? acc[x]++ : (acc[x]=1)), acc)
];
const reduce = (reducer, initial, xs) => {
// use reducer's default init if not user provided
let acc = initial != null ? initial : reducer[0]();
// reduce all inputs
for(let x of xs) {
acc = reducer[2](acc, x);
}
// call completion fn to post-process final result
return reducer[1](acc);
}
// then use like:// no initial result provided, so use reducer default init
reduce(sum(), null, [1,2,3,4])
// 10
// with initial result of 100
reduce(sum(), 100, [1,2,3,4])
// 110
// reduction of a ES6 Set
reduce(sum(), 0, new Set([1,2,2,1,1,3,4,3,2]))
// 10
// strings are iterable too
reduce(histogram(), null, "reducers")
// { r: 2, e: 2, d: 1, u: 1, c: 1, s: 1 }
// ES6 generator to produce `n` random values from given `opts`
function* choices<T>(opts: ArrayLike<T>, n: number) {
while(--n >= 0) {
yield opts[(Math.random() * opts.length) | 0];
}
}
[...choices("abcd", 10)]
// [ 'a', 'c', 'd', 'c', 'a', 'b', 'c', 'b', 'b', 'b' ]
reduce(histogram(), null, choices("abcd", 100))
// { a: 29, b: 27, c: 25, d: 19 }
type Transducer<A, B> = (r: Reducer<any, B>) => Reducer<any, A>;
function map<A, B>(fn: (x: A) => B): Transducer<A, B> {
return (r: Reducer<any, B>) => [
r[0],
r[1],
(acc, x: A) => r[2](acc, fn(x))
];
}
// or without types
map = (f) => (r) => [r[0], r[1], (acc, x) => r[2](acc, f(x))];
// composes transducer `xform` with reducer `rfn`
// then calls reduce
const transduce = (xform, rfn, initial, xs) =>
reduce(xform(rfn), initial, xs);
// pre-build a times 10 standalone transducer for re-use
const mul10 = map((x) => x * 10)

// Replicate Array.map()
transduce(mul10, push(), null, [1, 2, 3, 4])
[ 10, 20, 30, 40 ]
// or sum up values
transduce(mul10, sum(), null, [1, 2, 3, 4])
// 100
import * as tx from "@thi.ng/transducers"const parseCSV = (csv, fieldMappers = {}, delim = ",") => {
const [header, ...body] = csv.trim().split("\n");
return tx.transduce(
tx.comp(
// filter out empty lines
tx.filter((x) => x.length > 0),
// tokenize
tx.map((x) => x.split(",")),
// convert to object using column names from header
tx.rename(header.split(delim)),
// apply any column transformations
tx.mapKeys(fieldMappers, false),
),
// collect all results into an array (final reduction)
tx.push(),
// input lines
body
);
};
const src = `
id,lang,rating
js,JavaScript,6
ts,TypeScript,8
clj,Clojure,7.5
go,Go,7
c,C,6.5
cpp,C++,5`;
// parse src and transform `ratings` field
const doc = parseCSV(src, { rating: (x) => parseFloat(x) });
// [ { rating: 6, lang: 'JavaScript', id: 'js' },
// { rating: 8, lang: 'TypeScript', id: 'ts' },
// { rating: 7.5, lang: 'Clojure', id: 'clj' },
// { rating: 7, lang: 'Go', id: 'go' },
// { rating: 6.5, lang: 'C', id: 'c' }
// { rating: 5, lang: 'C++', id: 'cpp' } ]
tx.transduce(
tx.rename(["a","b","c"]),
tx.push(),
[[10, 11, 12], [13, 14, 15]]
)
// [ { c: 12, b: 11, a: 10 }, { c: 15, b: 14, a: 13 } ]
// or rename & extract object keys
tx.transduce(
tx.rename({aa: "a", cc: "c"}),
tx.push(),
[{a: 1, b: 1}, {a: 1, b: 2, c: 3}]
);
// [ { aa: 1 }, { cc: 3, aa: 1 } ]
tx.transduce(
tx.mapKeys({ id: (x) => `id-${x}`, name: (x)=>x.toUpperCase() }),
tx.push(),
[
{id: 1, name: "alice", age: 84 },
{id: 2, name: "bob", age: 66 }
]
)
// [ { id: 'id-1', name: 'ALICE', age: 84 },
// { id: 'id-2', name: 'BOB', age: 66 } ]

ES6 iterables

[...tx.range(10)]
// [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 ]
[...tx.range(10, 20, 2)]
// [ 10, 12, 14, 16, 18 ]
[...tx.range(10,0)]
// [ 10, 9, 8, 7, 6, 5, 4, 3, 2, 1 ]
const spread = <T>(acc: T[], src: Iterable<T>) => {
const iter = src[Symbol.iterator]();
let v: IteratorResult<T>;
while(!(v = iter.next()).done) {
acc.push(v.value);
}
return acc;
};
spread([], tx.range(10))
// [ 0, 1, 2, 3, 4, 5, 6, 7, 8, 9 ]
[...tx.range(5), ...tx.range(10,15)]
// [ 0, 1, 2, 3, 4, 10, 11, 12, 13, 14 ]
spread(spread([], tx.range(5)), tx.range(10,15))
// [ 0, 1, 2, 3, 4, 10, 11, 12, 13, 14 ]
[...tx.range2d(4,3)]
// [ [ 0, 0 ], [ 1, 0 ], [ 2, 0 ], [ 3, 0 ],
// [ 0, 1 ], [ 1, 1 ], [ 2, 1 ], [ 3, 1 ],
// [ 0, 2 ], [ 1, 2 ], [ 2, 2 ], [ 3, 2 ] ]
[...tx.normRange(10)]
// [ 0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1 ]
import { TAU, cossin } from "@thi.ng/math";const makePoly = (n, r) =>
tx.transduce(
tx.map((i) => cossin(i * TAU, r),
tx.push(),
tx.normRange(n, false)
);
makePoly(3, 100);
// [ [ 100, 0 ],
// [ -49.99999999999998, 86.60254037844388 ],
// [ -50.00000000000004, -86.60254037844385 ] ]
const makePolyIter = (n, r) =>
tx.map((i) => cossin(i * TAU, r), tx.normRange(n, false));
[...makePolyIter(3, 100)]
// [ [ 100, 0 ],
// [ -49.99999999999998, 86.60254037844388 ],
// [ -50.00000000000004, -86.60254037844385 ] ]

Multiple inputs

[...tx.zip(tx.range(), "abcd", tx.choices("xyz"))]
// [ [ 0, 'a', 'y' ],
// [ 1, 'b', 'x' ],
// [ 2, 'c', 'z' ],
// [ 3, 'd', 'x' ] ]

Multiple outputs

// repeat each input
tx.transduce(tx.duplicate(2), tx.push(), [1,2,3])
// [ 1, 1, 1, 2, 2, 2, 3, 3, 3 ]
// chunk input into non-overlapping groups
tx.transduce(tx.partition(3), tx.push(), tx.range(10))
// [ [ 0, 1, 2 ], [ 3, 4, 5 ], [ 6, 7, 8 ] ]
// overlaps can be achieved via optional step size
tx.transduce(tx.partition(3, 1), tx.push(), tx.range(10))
[ [ 0, 1, 2 ], [ 1, 2, 3 ], [ 2, 3, 4 ],
[ 3, 4, 5 ], [ 4, 5, 6 ], [ 5, 6, 7 ],
[ 6, 7, 8 ], [ 7, 8, 9 ] ]
// process chunks & re-flatten results
tx.transduce(
tx.comp(
tx.partition(3, 3, true),
tx.mapIndexed((i, p) => [i, p]),
tx.mapcat(([i, p]) => tx.map((x) => x + i * 100, p))
),
tx.push(),
tx.range(10)
)
// [ 0, 1, 2, 103, 104, 105, 206, 207, 208, 309 ]
import { juxt } from "@thi.ng/compose";const neighbors = juxt((x) => x - 1, (x) => x, (x) => x + 1);neighbors(100)
// [ 99, 100, 101 ]
// import moving averages transducers
import { sma, ema, wma } from "@thi.ng/transducers-stats";
tx.transduce(
tx.comp(
// compute simple, exponential and weighted MA
tx.multiplex(sma(5), ema(5), wma(5)),
// drop/skip the first 4 values (due to MA lag)
tx.drop(4)
),
tx.push(),
// generate 10 random values as input
tx.repeatedly(() => Math.random() * 100, 10)
)
// [ [ 64.54710049259242, 64.54710049259242, 63.446878766675205 ],
// [ 67.95324272855098, 76.16013450864521, 75.05991278272799 ],
// [ 64.28349996851921, 70.18377375668821, 71.81918262413573 ],
// [ 63.93406328379683, 48.96377633421058, 52.56594313104779 ],
// [ 49.85688603190547, 37.937601265093676, 36.549672412068794 ],
// [ 42.86881994710021, 36.73100495201606, 31.36998117672058 ] ]

Infinity & early termination

[...tx.range()]
[...tx.iterate((x) => x + 1, 0)]
[...tx.cycle([1,2,3])]
[...tx.take(5, tx.range())]
// [ 0, 1, 2, 3, 4 ]
// nested iterators of 1st 5 positive odd ints
[...tx.take(5, tx.filter((x) => !!(x & 1), tx.range()))]
// [ 1, 3, 5, 7, 9 ]
// same, but via transducer composition
const first5odds = tx.comp(
tx.filter((x) => !!(x & 1)),
tx.take(5)
);
// use `iterator` for composed transducers
[...tx.iterator(first5odds, tx.range())]
// or via standard transduce
tx.transduce(first5odds, tx.add(), tx.range())
// 25
[...tx.takeWhile((x) => x < 2000, tx.iterate((x) => x * 2, 1))]
// [ 1, 2, 4, 8, 16, 32, 64, 128, 256, 512, 1024 ]
// leaky integrator which stops when difference between
// prev/curr goes below 1
[...tx.converge(
(a,b) => Math.abs(a-b) < 1e-3,
tx.iterate((x) => x + (100 - x) * 0.5, 0)
)]
// [ 0, 50, 75, 87.5, 93.75, 96.875, 98.4375, 99.21875 ]
[...tx.map((x) => x < 5 ? x * 10 : tx.reduced(x * 10), tx.range())]
// [ 0, 10, 20, 30, 40, 50 ]

Laziness & stepwise execution

iterator<A, B>(xform: Transducer<A, B>, xs: Iterable<A>): IterableIterator<B>;
const xf = tx.step(
tx.comp(
tx.filter((x) => x >= 0),
tx.map((x) => x * 10)
)
);
xf(1)
// 10
xf(-2)
// undefined
xf(3)
// 30
const xf = tx.step(tx.partition(2));xf(1)
// undefined (waiting for partition to fill)
xf(2)
// [1, 2]
xf(3)
// undefined
xf(4)
// [3, 4]
const rules = { a: "ab", b: "a" };// iterator recursively expanding / replacing rule "a"
const expanded = tx.last(
tx.take(8,
tx.iterate(
(syms) => tx.mapcat((x)=> rules[x] || x, syms),
"a"
)
)
);
// Object [Generator]{}
[...expanded].join("")
// "abaababaabaababaababaabaababaabaab"

Outlook

thi.ng

Written by

thi.ng

Computational design, data, TypeScript, Clojure/script, C

Welcome to a place where words matter. On Medium, smart voices and original ideas take center stage - with no ads in sight. Watch
Follow all the topics you care about, and we’ll deliver the best stories for you to your homepage and inbox. Explore
Get unlimited access to the best stories on Medium — and support writers while you’re at it. Just $5/month. Upgrade