Code branching in Flux

MarcoDB
3 min readJun 14, 2022

--

Photo by Jeremy Bishop on Unsplash

There are many situations where you need to branch between one query flow and a different query flow depending on some condition. Yet Flux doesn’t seem to offer a syntax to branch inline between functions.

Say for example that you’re writing a Flux query to show data in Grafana. You want to use aggregateWindow() to make sure the number of points received by Grafana stays under control. However, as your users start zooming in, the number of points naturally becomes smaller, and people start noticing artifacts introduced by aggregateWindow(), like timestamps not exactly matching the original data. So you want to be able to selectively include aggregateWindow() in your query only when the time range is over a certain threshold, while leaving your original data below that threshold.

In a nutshell, you want to do this:

myData
|> if (v.timeRangeStop - v.timeRangeStart) > 1h then
aggregateWindow(...) // Else no-op

Let’s be clear the snippet above is not Flux, it’s pseudocode. Flux doesn’t support subtracting times (returns error time is not Subtractable), it doesn’t support comparing durations (returns error unsupported binary expression duration > duration), and of course it doesn’t support if blocks in a stream of |> operators, since this is the subject of this post. It’s a head scratcher for me that a language focused on time series can’t do operations on times and durations, but, ok, let’s get that out of the way with some simple functions:

timeDiff = (t1, t2) => duration(v: uint(v: t1) - uint(v: t2))// Returns:
// - `1` if `d1` is bigger than `d2`
// - `-1` if `d1` is smaller than `d2`
// - `0` if they are the same duration
durationCmp = (d1, d2) =>
if uint(v: d1) > uint(v: d2) then 1
else
if uint(v: d1) < uint(v: d2) then -1 else 0
useAggrWin = durationCmp(
d1: timeDiff(t1: v.timeRangeStop, t2: v.timeRangeStart),
d2: 1h) > 0
myData
|> if useAggrWin then aggregateWindow(...) // Else no-op

Now we’re left with the crux of the problem, how do you conditionally execute your aggregateWindow()?

Flux offers conditional expressions that only work as variable assignments. Luckily for us, functions can be assigned to variables, and that’s the basic workaround we need to do what we want:

noOp = (every, fn, column="_value", tables=<-) => tablesaggrWinFn = if useAggrWin then aggregateWindow else noOpmyData
|> aggrWinFn(every: x, fn: sum)

This works fine, but it’s verbose, hard to read, and full of caveats. As you can see, I’ve chosen noOp to have a signature that’s not the full signature of aggregateWindow(). I took this shortcut because I knew that aggrWinFn() would always be called with a subset of the arguments available to aggregateWindow(). But if later I try to add more arguments to aggrWinFn(), this lazy approach will bite me back. A more robust choice would have been:

noOp = (
every: 1m,
period: 1m,
fn: mean,
column: "_value",
timeSrc: "_stop",
timeDst: "_time",
location: {offset: 0h, zone="UTC"},
createEmpty: true,
tables=<-
) => tables

(based on the documented definition of aggregateWindow() as of 6/14/2022).

Given that noOp() doesn’t need to use any of the arguments, it’s ok for those defaults to get out of sync with future definitions of aggregateWindow(), but that might not be true for other use cases, like branch between code-that-does-something and code-that-does-something-different, instead of no-op.

Also, this definition of noOp() can only be used as a replacement of aggregateWindow(), it’s not a general purpose no-op function to be used for the general branching pattern of “do something or do nothing”. So you probably want to choose a more specific name.

These are some of the reasons why an inline implementation of conditional expressions within a stream of |> operators (as in the initial pseudocode) would be a much superior alternative to this hack. But while that’s not an option, the hack will have to do.

--

--