Running concurrent jobs with ordered results

A peek into different execution models using a common pattern

Devrim Şahin
Picus Security Engineering
26 min readJun 19, 2023

--

Introduction

In this post, we will discuss a pattern that keeps showing up in many concurrency problems. We will then consider some solutions to it in three different programming languages, namely Go, Rust and Java; hoping to provide some variety given how each provide different paradigms and execution models.

In Picus Security, we develop an automated security control validation platform. To measure the effectiveness of security controls and processes, we employ an agent; which is a client application that runs actions in order to validate the organization’s security posture.

For most cases, individual actions are independent of one another; so they can be run concurrently. However, the action order must be preserved during result calculation. Therefore, we want to run concurrent jobs, but retain the order of their outputs.

Concurrent jobs with ordered results. Rectangles are jobs, squares are the results being handled. Horizontal axis is time.

The figure above shows four concurrent jobs spawned in quick succession. The squares at the end depict their results being handled. Note that even though Job3 ends earlier than Job2, its result is not processed until Job2’s result is handled. In other words, each job’s result is causally dependent on not only the job itself, but also the previous job’s result.

This is a common pattern that keeps showing up in concurrent programming. Here’s another example:

Imagine we were writing a distributed actor framework (In short, an actor is a concurrent unit of execution, that receives messages and processes them sequentially). The messages to hit the “mailbox” (the input message queue of the actor) should first get deserialized (because they were sent over the wire), then the deserialized messages should be handled sequentially by the actor’s logic.

We prototype it and find out that the deserialization logic is so slow that it’s the bottleneck for the actor’s performance. We decide to concurrently deserialize the messages, since each can be deserialized independent of the others. However, we must not lose the order of messages. It’s the same problem.

One thing is different, though: In the latter example we don’t have an array of jobs handed to us at the beginning, but a stream of them that keeps coming, potentially indefinitely. For the agent case we could just decide to be lazy and wait for all results to be available before processing them (i.e. join()ing the jobs). For the actor example, that’s not possible because the messages keep flowing. So we must immediately start working on the next result as soon as it is available.

Go

Let’s implement this ‘concurrent jobs, ordered results’ pattern for different assumptions, in increasing complexity.

Fixed number of jobs

Let’s start with a very trivial case: The number of jobs to execute is known to us at the time of programming. For example, let’s say that we want to implement a very simple program that will execute exactly 3 long-running calculations:

  • Count the number of stars in the universe,
  • Count the number of grains of sand on Earth,
  • Calculate the answer to the ultimate question about life, universe and everything.

We will then print their results in this specific order.

Let’s implement the jobs in Go first. Let’s also print the elapsed time in the beginning and end of each function, so that we can analyze the logs more easily:

var startTime = time.Now()

func elapsed() int64 {
return time.Now().Sub(startTime).Milliseconds() / 1000
}

func countStars() int64 {
fmt.Printf("[%d] countStars(job0) STARTED\n", elapsed())
time.Sleep(2 * time.Second)
fmt.Printf("[%d] countStars(job0) DONE\n", elapsed())
return 1 << 62 // real number is much much higher by the way
}

func countGrainsOfSandOnEarth() int64 {
fmt.Printf("[%d] countGrainsOfSandOnEarth(job1) STARTED\n", elapsed())
time.Sleep(time.Second)
fmt.Printf("[%d] countGrainsOfSandOnEarth(job1) DONE\n", elapsed())
return 7.5e18
}

func meaningOfLife() int64 {
fmt.Printf("[%d] meaningOfLife(job2) STARTED\n", elapsed())
time.Sleep(3 * time.Second)
fmt.Printf("[%d] meaningOfLife(job2) DONE\n", elapsed())
return 42
}

We want each of these functions to run concurrently. Go provides concurrency through goroutines, which are basically green threads scheduled in user space. Goroutines then communicate by passing values through channels. We want each long-running job to run in its own goroutine, then return the result through a channel. We can implement a helper function for that:

func runInGoroutine[T any](longRunning func() T) <-chan T {
// result will be written on this channel
future := make(chan T, 1)
// long-running job is run in a goroutine
go func() {
// once result is obtained, it's sent to the channel
future <- longRunning()
}()
// we spawn the goroutine and return the channel
return future
}

This function returns a channel with a buffer of 1. We are basically using the channel as a future here (granted, it’s the bare minimum to be called a ‘future’, but it does the job for now). Since Go’s goroutines sidestep all the “red function/blue function” concept, we can simply block on these values like we used to do with actual OS-level threads in The Olden Days™, when an average server didn’t have to handle gazillion concurrent requests:

func main() {
// spawn concurrent tasks
numStarsFut := runInGoroutine(countStars)
numGrainsFut := runInGoroutine(countGrainsOfSandOnEarth)
meaningFut := runInGoroutine(meaningOfLife)
// await results in order
numStars := <-numStarsFut
fmt.Printf("[%d] result of job #0: %d\n", elapsed(), numStars)
numGrains := <-numGrainsFut
fmt.Printf("[%d] result of job #1: %d\n", elapsed(), numGrains)
meaning := <-meaningFut
fmt.Printf("[%d] result of job #2: %d\n", elapsed(), meaning)
}

(Examples in this blog post avoid handling errors, exceptions, panics etc. for the sake of simplicity.)

After we spawn three goroutines that execute the long-running computations, we start blocking on the results by reading from their respective channels.

Here is the output of the program:

[0] meaningOfLife(job2) STARTED
[0] countGrainsOfSandOnEarth(job1) STARTED
[0] countStars(job0) STARTED
[1] countGrainsOfSandOnEarth(job1) DONE
[2] countStars(job0) DONE
[2] result of job #0: 4611686018427387904
[2] result of job #1: 7500000000000000000
[3] meaningOfLife(job2) DONE
[3] result of job #2: 42
Output of the program, plotted

Several things to note here:

  • We don’t wait for all results to complete. Job0 is completed in 2 seconds, and by that time Job1’s result is also available; so we print the results of both Job0 and Job1 in second 2. Third result is printed one second later, when it is also available. If we had waited for all tasks to join, we’d have printed everything in the 3rd second.
  • We print the results in order. If the Printf statements were instead at the end of the long-running functions, the second one could be printed first at second 1. In our example, this order never changes.

Array of jobs

What if we don’t get to hard-code the jobs, but receive a dynamic (but finite) number of them? Like an array of jobs? Well, we can easily extend the Go channel approach to accomodate:

func main() {
// array of jobs
jobs := []func() int64{
countStars, countGrainsOfSandOnEarth, meaningOfLife,
}
// spawn concurrent tasks, store future results in order
orderedFutures := make([]<-chan int64, len(jobs))
for idx, job := range jobs {
orderedFutures[idx] = runInGoroutine(job)
}
// await results in order
for idx, future := range orderedFutures {
// (do this in two lines so that elapsed time is logged correctly)
value := <-future
fmt.Printf("[%d] result of job #%d: %d\n", elapsed(), idx, value)
}
}

We start all long-running functions concurrently, and store the futures in an array. As each task runs to completion, they send their results into the respective 1-sized channel; then we iterate the array of futures in order. This gives us the exact same output:

[0] meaningOfLife(job2) STARTED
[0] countStars(job0) STARTED
[0] countGrainsOfSandOnEarth(job1) STARTED
[1] countGrainsOfSandOnEarth(job1) DONE
[2] countStars(job0) DONE
[2] result of job #0: 4611686018427387904
[2] result of job #1: 7500000000000000000
[3] meaningOfLife(job2) DONE
[3] result of job #2: 42

Stream of jobs

What if the number of futures wasn’t finite? What if we had a stream of long-running jobs, and we had to handle their results while preserving order, like in the actor example? In that case, we cannot use an array; we’d need a channel of futures… A channel of channels!

We should implement a helper here; so that we can consume the stream of futures and turn it into a stream of values:

func processSequentially[T any](futStream <-chan <-chan T) <-chan T {
// output stream (of values)
output := make(chan T, 1)
go func() {
// iterate each future in the input
for fut := range futStream {
// block until the next result is ready;
// then push it to the output channel
output <- <-fut // ***
}
// if we got here, futStream is closed.
// also close the output and exit goroutine
close(output)
}()
return output
}

This function spawns its own goroutine which blocks on the head of the future stream (the future on the front). Once the future is resolved, it emits the result and blocks on the next.

The line marked with *** can be expanded like this:

// block on the future to get the value
value := <-fut
// send the value to the output channel
output <- value

Let’s use the helper function to process a stream of jobs:

func main() {
futStream := make(chan (<-chan int64), 1)
orderedOutputs := processSequentially(futStream)
// start spawning concurrent tasks, and pushing their futures to the stream
futStream <- runInGoroutine(countStars)
futStream <- runInGoroutine(countGrainsOfSandOnEarth)
futStream <- runInGoroutine(meaningOfLife)
close(futStream)
// the 'handler' part
jobIdx := 0
for output := range orderedOutputs {
fmt.Printf("[%d] result of job #%d: %d\n", elapsed(), jobIdx, output)
jobIdx++
}
}

We push the spawned jobs’ futures into the futStream, which processSequentially blocks on and converts to a stream of results. We then iterate on the output in the handler part to write the results. The output is the same.

Rust

Let’s work on a Rust equivalent. Unlike Go, Rust employs an opt-in async/await model instead of a user-level scheduler that comes with the runtime. You get to choose the async runtime among many libraries, and for this blog post we use tokio. We also grab the futures library for some very useful async building blocks:

# add to Cargo.toml
[dependencies]
futures = "0.3.28"
tokio = { version = "1.28.0", features = ["full"] }

Here are the jobs:

use std::time::{Duration, Instant};
use tokio::time::sleep;

async fn count_stars(start: Instant) -> u64 {
println!("[{}] countStars(job0) STARTED", start.elapsed().as_secs());
sleep(Duration::from_secs(2)).await;
println!("[{}] countStars(job0) DONE", start.elapsed().as_secs());
1u64 << 62
}

async fn count_grains_of_sand_on_earth(start: Instant) -> u64 {
println!("[{}] countGrainsOfSandOnEarth(job1) STARTED", start.elapsed().as_secs());
sleep(Duration::from_secs(1)).await;
println!("[{}] countGrainsOfSandOnEarth(job1) DONE", start.elapsed().as_secs());
7.5e18 as u64
}

async fn meaning_of_life(start: Instant) -> u64 {
println!("[{}] meaningOfLife(job2) STARTED", start.elapsed().as_secs());
sleep(Duration::from_secs(3)).await;
println!("[{}] meaningOfLife(job2) DONE", start.elapsed().as_secs());
42
}

Note that each of the long-running functions are defined as async fn. Instead of blocking on a channel; we will call .await on the outputs of these functions. Also note that the sleep function is that of the tokio framework, instead of the std version; so that we don’t block the whole OS thread as we sleep.

Let’s start with the fixed-number-of-jobs case. If we implement the main function like this…

// WARNING: this code is WRONG intentionally! keep reading...
#[tokio::main]
async fn main() {
let start = Instant::now();
// start both calculations concurrently
let num_stars = count_stars(start);
let num_grains = count_grains_of_sand_on_earth(start);
let meaning = meaning_of_life(start);
// first result is received
let num_stars = num_stars.await;
println!("[{}] result of job #0: {}", start.elapsed().as_secs(), num_stars);
// second result is received
let num_grains = num_grains.await;
println!("[{}] result of job #1: {}", start.elapsed().as_secs(), num_grains);
// third result is received
let meaning = meaning.await;
println!("[{}] result of job #2: {}", start.elapsed().as_secs(), meaning);
}

…here is the output:

[0] countStars(job0) STARTED
[2] countStars(job0) DONE
[2] result of job #0: 4611686018427387904
[2] countGrainsOfSandOnEarth(job1) STARTED
[3] countGrainsOfSandOnEarth(job1) DONE
[3] result of job #1: 7500000000000000000
[3] meaningOfLife(job2) STARTED
[6] meaningOfLife(job2) DONE
[6] result of job #2: 42

It’s the same ou- wait. WAIT. Everything ran sequentially! This is NOT what we were trying to do!

Jobs running sequentially

Ha, gotcha. Rust’s async model is special: In most languages, you would expect the async task to start executing immediately when invoked. Rust’s async is poll-based, therefore futures won’t do any work until actively awaited. Tokio has a fantastic tutorial summarizing what ‘poll-based’ means here, and how async/await and Futures work in Rust.

OK, interesting; but we still want to achieve the ‘concurrent run, ordered result’ behavior. How do we do that?

For that, we would need to implement a combinator.

A primer to Futures and Streams

A quick-and-dirty crash course, then. Rust has two traits that provide the async behavior: Future and Stream. Future asynchronously returns one result, and Stream can yield multiple. Since we want to receive the results of each future one by one; we want to implement a stream combinator.

// (simplified)
pub trait Future {
type Output;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output>;
}

// (simplified)
pub trait Stream {
type Item;
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>>;
}

We check if a Future is completed by calling poll. If the value is ready, the Future returns Poll::Ready(value), otherwise it returns Poll::Pending. A Future that returned Ready should not be called again. A Future does nothing unless poll is called repeatedly.

Similarly for the Stream, we call poll_next(), if the result is Poll::Pending we must poll again later. Poll::Ready(Some(value)) indicates that the next result is yielded, Poll::Ready(None) indicates that the stream is exhausted. Like Future, Stream should also be repeatedly polled to do work.

Who polls these async blocks then? That would be the executor; in our case tokio. For simplicity, we can say that the .await expression triggers the polling of a Future.

How frequently the executor polls a future is a whole other discussion. To prevent busy polling, Rust futures employs a very clever callback mechanism called Waker, which is why there is a Context parameter in the signature there. It’s out of scope for this blog post, but check that out some time; it’s very elegant.

Our own Stream combinator

Instead of defining a new Stream type ourselves; we can get lazy and inline our combinator using the futures::stream::poll_fn helper, which receives the body of poll_next as a closure and returns a stream combinator type:

use std::task::Poll;
use futures::stream::poll_fn;
use futures::task::Context;

// 'futures' is an anonymous type that implements Stream
let mut futures = poll_fn(move |cx: &mut Context<'_>| -> Poll<Option<u64>> {
// empty stream; directly returns 'None'
Poll::Ready(None)
}

Let’s think about this for a second… We have a bunch of Futures; and to do any work, each of these Futures must be repeatedly polled. We are trying to consume the output of a Stream, which means that the executor will be polling the Stream. So whenever the Stream is polled; that’s when all its Futures should be polled. Then we should call each Future’s poll in our Stream’s poll_next function!

Keep in mind though, that we don’t want to keep polling any finished Futures: calling poll() twice on a completed Future violates its contract. So we must also keep track of completed Futures, and store their values somewhere. If we keep the Futures in an array (“futs”), and their results in another (“out”), we can keep track of completed Futures. Similarly, since we want to yield results one by one and in order; we should keep track of the next Future’s index (“next_job_idx”). Put together, something like the following should work for our case:

use std::task::Poll;
use futures::stream::poll_fn;
use futures::task::Context;
use futures::{FutureExt, StreamExt};

#[tokio::main]
async fn main() {
let start = Instant::now();
let mut futs = vec![
// the call to 'boxed' is necessary because each function's
// returned Future impl. is a different type. boxing it
// makes the type dynamic (BoxFuture), and lets us put all
// these futures in a Vec like this.
count_stars(start).boxed(),
count_grains_of_sand_on_earth(start).boxed(),
meaning_of_life(start).boxed(),
];

// index of the job we should be waiting on
let mut next_job_idx = 0;
// store completed results here. incomplete results are None
let mut out = vec![None; futs.len()];
// the 'futures' variable implements the Stream trait
let mut futures = poll_fn(move |cx: &mut Context<'_>| -> Poll<Option<u64>> {
// inside of the Stream's poll_next function. this closure
// will be called multiple times, until it returns Poll::Ready(None).

// if all jobs are complete; return Ready(None)
// to indicate end of stream
if next_job_idx >= out.len() {
return Poll::Ready(None);
}

// otherwise iterate all results
for (i, o) in out.iter_mut().enumerate() {
// for each *missing* result only...
if o.is_none() {
// ...poll the future that corresponds
// (this is where the jobs do some work)
if let Poll::Ready(v) = futs[i].as_mut().poll(cx) {
// the future completed! save the result,
// we won't call this future anymore
*o = Some(v)
}
}
}

// now that all active futures are polled,
// check out the job in next_job_idx
match out.get_mut(next_job_idx) {
// if the job is complete
Some(Some(v)) => {
// yield the result and move to the next
next_job_idx += 1;
Poll::Ready(Some(*v))
}
// otherwise tell that we're pending
_ => Poll::Pending,
}

});

// result handler part
let mut idx = 0;
while let Some(value) = futures.next().await {
println!("[{}] result of job #{}: {}",
start.elapsed().as_secs(), idx, value);
idx += 1;
}
}

…yeah. Implementing combinators by hand like this can get really complicated, and ours isn’t even a good implementation. Here’s the output though!

[0] countStars(job0) STARTED
[0] countGrainsOfSandOnEarth(job1) STARTED
[0] meaningOfLife(job2) STARTED
[1] countGrainsOfSandOnEarth(job1) DONE
[2] countStars(job0) DONE
[2] result of job #0: 4611686018427387904
[2] result of job #1: 7500000000000000000
[3] meaningOfLife(job2) DONE
[3] result of job #2: 42

Finally we get ordered results for concurrently running jobs!

Fresh out of the oven

Now, we implemented the combinator ourselves, because we wanted to see how things work. Luckily, we don’t normally have to implement these things by hand: It has been implemented for us. Enter FuturesOrdered:

use futures::{FutureExt, StreamExt};
use futures::stream::FuturesOrdered;

#[tokio::main]
async fn main() {
let start = Instant::now();
// we simply push async jobs into FuturesOrdered
let mut futures = FuturesOrdered::new();
futures.push_back(count_stars(start).boxed());
futures.push_back(count_grains_of_sand_on_earth(start).boxed());
futures.push_back(meaning_of_life(start).boxed());

// result handler part is the same
let mut idx = 0;
while let Some(value) = futures.next().await {
println!("[{}] result of job #{}: {}",
start.elapsed().as_secs(), idx, value);
idx += 1;
}
}

This combinator gives us exactly the kind of behavior we need from our program with only a few lines of code! The async ecosystem in Rust is filled with such useful building blocks. The implementation of FuturesOrdered is quite sophisticated.

We can probably use it for the ‘streaming jobs’ case as well without changing anything.

Can we, though?

Well, we don’t know yet, and we really should check. Here is a scenario specific to the streaming case: What if there is a point in time where some jobs are not yet queued, but the ones that are queued are already completed?

Imagine the actor implementation scenario, where the actor receives a message every second. Most of these poll_next calls will encounter an empty Stream. What then?

In all examples so far, we push all futures in advance; so by the time we start handling their results, all futures are in the queue already. But, consider what happens if the third job (meaning_of_life) was spawned 10 seconds after the second one. The while-loop in our handler would complete the first two jobs, try to pull the next one; and see that the queue is empty. Would it wait for an extra entry, or would it simply return? How would FuturesOrdered tell those two cases apart?

Remember that in the Go example, we explicitly closed our channel using a close() call once we were done. The channel of futures blocks forever until it is closed; so it immediately gave us the behavior we wanted.

In fact, how would we even test this case in Rust? We can’t just slap a 10-second sleep in betweeen two push_back calls; because that would also delay the handler. The part where we push futures and the handler part should also take place concurrently. We would have to split these two operations into their own async code blocks first:

#[tokio::main]
async fn main() {
let start = Instant::now();
let mut futures = FuturesOrdered::new();

// async block to push the futures
let pusher = async {
futures.push_back(count_stars(start).boxed());
futures.push_back(count_grains_of_sand_on_earth(start).boxed());
sleep(Duration::from_secs(10)).await;
futures.push_back(meaning_of_life(start).boxed());
};

// another async block to handle the results
let handler = async {
// result handler part is the same
let mut idx = 0;
while let Some(value) = futures.next().await {
println!("[{}] result of job #{}: {}",
start.elapsed().as_secs(), idx, value);
idx += 1;
}
};

// run both blocks concurrently; wait until BOTH are resolved
futures::join!(pusher, handler);
}

Here we use the futures::join! combinator so that we can wait until both blocks are completed. But this simply won’t compile in Rust. The reason is that both async blocks are trying to mutably borrow the FuturesOrdered. In other words; both async blocks are trying to modify FuturesOrdered, which can cause a race condition. Rust won’t allow that.

In desperation, we slap a Mutex on the combinator and start acquiring locks everywhere:

use tokio::sync::Mutex;
use std::sync::Arc;

#[tokio::main]
async fn main() {
let start = Instant::now();

// make two counted, mutex-protected references
// one for read, one for write
let rd = Arc::new(Mutex::new(FuturesOrdered::new()));
let wr = Arc::clone(&rd);

let pusher = async move {
// each operation briefly gets the lock; pushes a future and releases
wr.lock().await.push_back(count_stars(start).boxed());
wr.lock().await.push_back(count_grains_of_sand_on_earth(start).boxed());
// 10 second wait before the 3rd job
sleep(Duration::from_secs(10)).await;
wr.lock().await.push_back(meaning_of_life(start).boxed());
};

// another async block that we handle them
let handler = async {
let mut idx = 0;
while let Some(value) = rd.lock().await.next().await {
println!("[{}] result of job #{}: {}",
start.elapsed().as_secs(), idx, value);
idx += 1;
}
// so that we can see when the handler loop terminates
println!("[{}] stream terminated", start.elapsed().as_secs());
};

// concurrently run both blocks and join when BOTH are completed
futures::join!(pusher, handler);
}

Eugh. There are many ways we could make this code prettier, but we’re trying to keep things short here. The output of this function is:

[0] countStars(job0) STARTED
[0] countGrainsOfSandOnEarth(job1) STARTED
[1] countGrainsOfSandOnEarth(job1) DONE
[2] countStars(job0) DONE
[2] result of job #0: 4611686018427387904
[2] result of job #1: 7500000000000000000
[2] stream terminated

As we have suspected; FuturesOrdered does not wait for the third future to be enqueued. In fact, Job3 is nowhere to be seen: We couldn’t even run the job, let alone print its result! (This is because there is no one polling it).

We will have to handle this case ourselves!

Let’s not get intimidated; we’ve seen worse while trying to implement a FuturesOrdered clone. We can satisfy the Stream trait with a custom struct this time. We will wrap FuturesOrdered to intervene whenever it returns Poll::Ready(None). If we keep the inner state behind a Mutex, that should make things easier:

use futures::{FutureExt, Stream};
use futures::future::BoxFuture;
use futures::stream::FuturesOrdered;
use std::task::{Context, Poll};
use std::sync::Mutex;

struct FutOrdForStream<'a, T> (Mutex<InnerState<'a, T>>);
struct InnerState<'a, T> {
inner: FuturesOrdered<BoxFuture<'a, T>>,
closed: bool,
}

impl<'a, T> FutOrdForStream<'a, T> {
fn new() -> Self {
Self(Mutex::new(InnerState {
inner: FuturesOrdered::new(), closed: false,
}))
}
fn close_write(&self) {
self.0.lock().unwrap().closed = true;
}
fn push_back<E: Future<Output=T> + Send + 'a>(&self, f: E) {
self.0.lock().unwrap().inner.push_back(f.boxed());
}
}

impl<T> Future for &FutOrdForStream<'_, T> {
type Output = Option<T>;
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let mut this = self.0.lock().unwrap();
match (Pin::new(&mut this.inner).poll_next(cx), this.closed) {
(Poll::Ready(None), false) => {
this.inner = FuturesOrdered::new();
Poll::Pending
}
(v, _) => v,
}
}
}

With this wrapper we do several things:

  • We expose a close_write function that allows us to express our intent to end the handler loop.
  • Whenever the inner FuturesOrdered declares that it is resolved; we replace it with a brand new FuturesOrdered (so that we don’t poll the resolved one again) and return Poll::Pending instead. If close_write was called; we skip this step and simply relay the resolved response.
  • Finally, we use the Mutex to present a read-only interface that does not require mutable references. This sneaky case of “internal mutability” might be a bit offputting, but it helps us keep things short for the blog post. We also very carefully avoided dealing with Pin or Waker anywhere: Let’s not make this any more in-depth than it already is. (There is an async Mutex implementation in tokio, instead of the stdlib one we’re using here; but polling that is left as an exercise for the reader).
  • One other thing: Take a look at the push_back implementation here. There is a generic type E that’s not attached to the struct, but the method itself. This type can be different for each push_back call; and it helps us get rid of the .boxed() calls in main(). Internally we still call .boxed() to make the types homogeneous for FuturesOrdered. This prevents us from eliding lifetimes (the ‘a and ‘_ markers you see around the code), but that’s not a lot of boilerplate.
  • Even though FutOrdForStream is internally a Stream; we implement the Future trait here instead. Previously, the .next().await call in the handler loop returned a specific wrapper Future type called Next (provided by the StreamExt trait); which allowed us to treat the head of the Stream as a Future. This time, we directly implement poll instead of poll_next.

Short and hacky, but does it work? Let’s see:

// async function that spawns the jobs and pushes the futures
async fn pusher(f: &FutOrdForStream<'_, u64>, start: Instant) {
f.push_back(count_stars(start));
f.push_back(count_grains_of_sand_on_earth(start));
// 10 second wait before the 3rd job
sleep(Duration::from_secs(10)).await;
f.push_back(meaning_of_life(start));
println!("[{}] all futures pushed", start.elapsed().as_secs());
// we declare the intent to close the stream,
// which eventually signals the handler loop to terminate
f.close_write();
}

// async function that handles (prints) the results
async fn handler(f: &FutOrdForStream<'_, u64>, start: Instant) {
let mut idx = 0;
while let Some(value) = f.await {
println!("[{}] result of job #{}: {}",
start.elapsed().as_secs(), idx, value);
idx += 1;
}
// to see when the handler loop terminates
println!("[{}] stream terminated", start.elapsed().as_secs());
}

#[tokio::main]
async fn main() {
let start = Instant::now();
let f = FutOrdForStream::new();
// concurrently run both blocks and join when BOTH are completed
futures::join!(pusher(&f, start), handler(&f, start));
}

Note that we got rid of the StreamExt trait’s .next().await call (the line marked with ****); because &FutOrdForStream directly implements Future, so we can call .await directly on it. Also all calls to .boxed() are gone. With handler and pusher becoming functions, the code is simple and clean again.

Let’s check the output:

[0] countStars(job0) STARTED
[0] countGrainsOfSandOnEarth(job1) STARTED
[1] countGrainsOfSandOnEarth(job1) DONE
[2] countStars(job0) DONE
[2] result of job #0: 4611686018427387904
[2] result of job #1: 7500000000000000000
[10] all futures pushed
[10] meaningOfLife(job2) STARTED
[13] meaningOfLife(job2) DONE
[13] result of job #2: 42
[13] stream terminated
Delayed job insertion case. Job2 starts at 10 sec.; the long wait is depicted with a gray bar.

There are so many things that we could improve upon here; but that should do for today.

Java

The third language we will attempt this is Java.

Java is, uh, interesting; because while it does have Futures and Executors, it does not have async/await syntax. Java programs run on the Java Virtual Machine, yet each Java thread is one-to-one mapped to an OS thread, so Java does not have green threads, nor a user-space scheduler like Go does.

There have been some attempts to implement fibers in Java in the past (like Quasar Fibers, which is apparently no longer maintained); and more recently OpenJDK’s Project Loom which seems to have made its way into Java 19. While the Java developers have been bringing many useful features into the language, your humble author did not keep up with what’s new in Java in a while, so I will primarily be talking about Java 8.

Java separates the aspects of a asynchronous computation into two interfaces; Future and CompletionStage. Its standard library provides the very useful concrete class CompletableFuture that implements both interfaces. Since there are no green threads and no async/await support in the language; we cannot await on a Future’s completion. It is of course possible to block the current thread, but the thread here is the OS-thread; which can mean trouble for your concurrent program.

CompletionStage provides many ‘modifiers’ that allow us to pass a callback to the job’s completion: We can register a Consumer to handle the result, a Function to map the result into another type, and so on.

The asynchronous operations are explicitly triggered by passing a Callable to an ExecutorService:

// the lambda function here implements Callable<Integer>
// it is invoked on the ForkJoinPool.commonPool() ExecutorService.
final CompletableFuture<Integer> fut = CompletableFuture.supplyAsync(() -> 123);
// once the result (123) is received; the registered Consumer<Integer>
// (which is another lambda function) prints the value
fut.thenAccept(value -> System.out.printf("Value is %d\n", value));

Unlike Rust, Java enqueues the job and starts running it ambiently as soon as possible, without the need for polling. One downside is that it is not possible to know which OS thread will execute the registered callbacks; so collecting the result of concurrent computations can get relatively trickier.

Let’s start by defining the jobs again:

import java.time.LocalDateTime;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.*;

public final class Entry {
private static final LocalDateTime start = LocalDateTime.now();
private static long elapsed() {
return ChronoUnit.SECONDS.between(start, LocalDateTime.now());
}

private static void sleep(int secs) {
try {
Thread.sleep(secs * 1000L);
} catch (final InterruptedException e) {
throw new RuntimeException(e);
}
}

private static long countStars() {
System.out.printf("[%d] countStars(job0) STARTED\n", elapsed());
sleep(2);
System.out.printf("[%d] countStars(job0) DONE\n", elapsed());
return 1L << 62;
}

private static long countGrainsOfSandOnEarth() {
System.out.printf("[%d] countGrainsOfSandOnEarth(job1) STARTED\n", elapsed());
sleep(1);
System.out.printf("[%d] countGrainsOfSandOnEarth(job1) DONE\n", elapsed());
return (long) 7.5e18;
}

private static long meaningOfLife() {
System.out.printf("[%d] meaningOfLife(job2) STARTED\n", elapsed());
sleep(3);
System.out.printf("[%d] meaningOfLife(job2) DONE\n", elapsed());
return 42;
}
}

Again, Thread.sleep() blocks the OS-thread; so we shouldn’t use it beyond toy examples. For a better approach, see ScheduledExecutorService.

For a fixed number of jobs; we can hard-code the sequentiality here:

public static void main(String[] args) {
// spawn all three jobs
final CompletableFuture<Long> numStarsFut = CompletableFuture.supplyAsync(Entry::countStars);
final CompletableFuture<Long> numGrainsFut = CompletableFuture.supplyAsync(Entry::countGrainsOfSandOnEarth);
final CompletableFuture<Long> meaningFut = CompletableFuture.supplyAsync(Entry::meaningOfLife);

CompletableFuture.completedFuture(null)
.thenCombine(numStarsFut, (n, v) -> System.out.printf("[%d] result of job #0: %d\n", elapsed(), v))
.thenCombine(numGrainsFut, (n, v) -> System.out.printf("[%d] result of job #1: %d\n", elapsed(), v))
.thenCombine(meaningFut, (n, v) -> System.out.printf("[%d] result of job #2: %d\n", elapsed(), v))
.join();
}

The modifier thenCombine essentially joins a future with another; and calls a BiFunction with both results. We will discard the left parameter (n in our code) because we are not interested in the previous future’s result, just that it is completed. The first, already-resolved future is there so that the example is more readable.

The calls to thenCombine ensures that both the previous result and the future itself resolved; ensuring the results do not get reordered. The join at the end blocks the main thread; ensuring that we complete all the jobs before terminating the program. We add this because we want to see the logs, obviously.

The output is as expected:

[0] countStars(job0) STARTED
[0] countGrainsOfSandOnEarth(job1) STARTED
[0] meaningOfLife(job2) STARTED
[1] countGrainsOfSandOnEarth(job1) DONE
[2] countStars(job0) DONE
[2] result of job #0: 4611686018427387904
[2] result of job #1: 7500000000000000000
[3] meaningOfLife(job2) DONE
[3] result of job #2: 42

Array of jobs

For an array of jobs, we can trivially extend this approach like this:

public static void main(String[] args) {
final ArrayList<CompletableFuture<Long>> futures = new ArrayList<>();
futures.add(CompletableFuture.supplyAsync(Entry::countStars));
futures.add(CompletableFuture.supplyAsync(Entry::countGrainsOfSandOnEarth));
futures.add(CompletableFuture.supplyAsync(Entry::meaningOfLife));

// we start by a completed future, so that the first one joins immediately
CompletableFuture<Void> f = CompletableFuture.completedFuture(null);
for (int i = 0; i < futures.size(); i++) {
// closure requires that the captured index value is effectively final
final int index = i;
f = f.thenCombine(futures.get(i), (n, v) -> {
System.out.printf("[%d] result of job #%d: %d\n", elapsed(), index, v);
// to unify future types here; we explicitly return null,
// which is the only valid value for the type Void
return null;
});
}
// join at the end to block the main thread
f.join();
}

This approach works for arrays, but has problems with streams. Mainly, the futures combined in this way keep references to the previous ones, preventing them from being garbage collected. Keep combining futures like this indefinitely, and you’ll have an OOM. Instead we should move on to a queue-based approach.

Stream of jobs

First, we need a thread-safe queue, for that we will wrap a LinkedList. We want an enqueue operation that returns the element if it was the first insertion; and a dequeue that returns the next element after remove. Let’s call it Mailbox, to allude to the actor framework example:

import java.util.LinkedList;

public final class Mailbox<T> {
private final LinkedList<T> queue = new LinkedList<>();

// returns the value if it was inserted in
// an empty queue; null otherwise
public T enqueue(final T t) {
T first = null;
if (t != null)
synchronized (queue) {
if (queue.isEmpty())
first = t;
queue.add(t);
}
return first;
}

// dequeues the first element; returns the next
public T dequeue() {
synchronized (queue) {
queue.remove();
return queue.peek();
}
}
}

Basically, both operations return the next element to process. We get null if the queue is empty after dequeue (because there is nothing left to process), or if it already wasn’t empty before enqueue (because there is no need to trigger processing). It’ll make more sense when we use it.

First, we implement a JobProcessor that receives futures of jobs that return Runnable. That is, asynchronous tasks that give us a callback when completed. This is a bit awkward, but it’s as general-purpose as it gets, and it’s how ExecutorService does it anyway. We will extend this for futures that return values.

import java.util.concurrent.*;

public class JobProcessor {
private final ExecutorService executor;
private final Mailbox<CompletionStage<Runnable>> queue = new Mailbox<>();

public JobProcessor(final ExecutorService executor) {
this.executor = executor;
}

public void enqueueJob(final CompletionStage<Runnable> future) {
process(queue.enqueue(future));
}

private void process(final CompletionStage<Runnable> future) {
if (future != null)
// when the next future in line completes, start a new task
future.thenAccept(orderedJob -> executor.execute(() -> {
// the task first runs the ordered job (i.e. handles the result)
orderedJob.run();
// then triggers the process of the next future
process(queue.dequeue());
}));
}
}

The function enqueueJob first pushes the future into Mailbox. If the mailbox was empty before; we trigger process for the newly-inserted element. Each process call tails the future by first executing the runnable on the executor; then popping the handled future and triggering the next job until the queue is drained, then stops. The next time there is a future enqueued, enqueueJob will trigger a new drain.

Note that the call to process within process here is not recursive; as we don’t invoke the inner lambda immediately. Also note that all ordered jobs are executed on the provided ExecutorService; whereas we can pick another ExecutorService for the original concurrent jobs to run. We can even separate the two executors and scale them independently if we like: “These two threads are dedicated to deserialization, and there three handle the messages”.

Our previous implementations used join() to block the main thread until the jobs are done. We need a similar mechanism. Since JobProcessor receives Futures with Runnable callbacks that run in order, we can do something like this: Enqueue an already completed future, which has a callback. The callback will then resolve another future, on which we can join(). Let’s call this a barrier:

public CompletableFuture<Void> barrier() {
final CompletableFuture<Void> completion = new CompletableFuture<>();
enqueueJob(CompletableFuture.completedFuture(() -> completion.complete(null)));
return completion;
}

This way, we can push the jobs, then push a barrier; and join on the barrier; which will not resolve until all jobs are handled.

Depiction of the ‘barrier’. Since the Future for the barrier job is already resolved; there is only the ‘handling’ part; which resolves when all previous results are handled.

This function is also why we started with CompletionStage<Runnable>.

To handle CompletionStage<T> for any T; we must specify how to handle T values. We can do that by passing a Consumer<T>. Let’s extend the class with this:

import java.util.concurrent.*;
import java.util.function.Consumer;

public class Processor<T> extends JobProcessor {
private final Consumer<T> consumer;

public Processor(final ExecutorService executor, final Consumer<T> consumer) {
super(executor);
this.consumer = consumer;
}

public void enqueue(final CompletionStage<T> future) {
// map CompletionStage<T> to CompletionStage<Runnable>
enqueueJob(future.thenApply(v -> () -> consumer.accept(v)));
}
}

Here is how we can use it:

  public static void main(String[] args) {
final ExecutorService es = ForkJoinPool.commonPool();
// we specify the result handler at the constructor
final Processor<Long> p = new Processor<>(es, Entry::handleResult);
// now we start spawning and pushing concurrent jobs
p.enqueue(CompletableFuture.supplyAsync(Entry::countStars, es));
p.enqueue(CompletableFuture.supplyAsync(Entry::countGrainsOfSandOnEarth, es));
p.enqueue(CompletableFuture.supplyAsync(Entry::meaningOfLife, es));
// then we push a barrier and join on it
p.barrier().join();
}

private static int jobIdx = 0;
private static void handleResult(long v) {
System.out.printf("[%d] result of job #%d: %d\n", elapsed(), jobIdx++, v);
}

Simple and clean. The output is the same, and we don’t break the garbage collector’s heart.

One thing to note here: handleResult is called inside another task sent to the executor; therefore each result might be handled on a different OS-thread. This solution has no concept of thread affinity. However, two tasks cannot concurrently run handleResult here (as we don’t enqueue another until the first one is finished); therefore we can treat the function as critical section (which is why we don’t need to make jobIdx atomic here). Compare this to Rust’s poll-based approach; where the Future is polled on the OS-thread that awaits it, simplifying the result handling immensely (once we survive the combinator implementations, that is).

Conclusion

In this blog post, we identified a common concurrency problem and its variants; then tried to provide in-depth solutions to it in three different programming languages, revealing how they differ from one another.

In our case, we implement the Picus cross-platform agent in Go; therefore we went ahead with an approach essentially similar to the array-of-jobs case in Go. Obviously, our version is a bit more complicated with proper handling of errors, panics, context cancellation and telemetry. Our initial prototype for the cross-platform agent was in Rust; and we used the FuturesOrdered combinator there. Finally, I had to implement an actor framework in Java in a previous job, where I had implemented something similar to the Processor here for the actors’ execution model.

In Picus Security, we regularly tackle problems that challenge us to combine the most OS-specific minutiae with the highest-level system design concepts. We meticulously innovate at a junction where many concepts like concurrency, telemetry, network protocols, OS fingerprinting, scalability, graph theory, CI/CD, cryptography and binary obfuscation can casually come together. By bringing hard work and the joy of discovery together, we have been pushing boundaries on Breach and Attack Simulation for 10 years. We invite you to witness it yourself: Get started with our Complete Security Validation Platform today!

--

--