A Go Func Scheduler With A Cap

Hanalei Pier, Kauai, Hawaii. Photo by Saurabh Deoras

Several years ago I was working on an interesting problem but the software design required for it was clearly out of my expertise. At the time, I used to mostly write scientific computing programs dealing with matrix algebra and hardware communications over GPIB, serial ports and such. None of that required concurrent execution designs or use of libraries that allowed running for loops in parallel. However, things changed when we had to scale the code to run over tens (and sometimes hundreds) of hardware devices from a central communicating host.

In particular, the goal was to produce a binary with minimal dependencies that would invoke concurrent communication requests to multiple devices attached to a system. So the pattern for the software design was more about managing these concurrent requests as opposed to managing the actual compute. The compute would happen on the remote devices. In other words, it was like making concurrent HTTP Get calls and waiting… a quintessential problem domain to try Go’s concurrency features!

I was new to Go at that time but the idea of spawning a go-routine seemed both powerful and dangerous. While it was very easy to spawn a go-routine, questions remained on how we would control the rate of execution considering available host resources.

go func(){}() // spawns a new go routine... executes it right away

This post is about my journey into controlling the execution of go-routines and I am going to describe a code-pattern that I have been using lately. Let’s start with the interface definition.

Dispatcher Interface

The interface should be as simple and minimal as possible with details hidden behind the scene in implementation. For this case the idea is to be able to schedule and func for execution and check if anything is running in the scheduler. So the interface would look something as follows:

// Dispatcher defines how to dispatch a func
type Dispatcher interface {
// Do schedules f for execution
Do(f func())
// IsRunning tells if func's are running
IsRunning() bool
}

As you can see, there is no mention of capacity in the interface definition. We will capture the notion of capacity in implementation shortly.

Receiver Type

A typical pattern for implementing an interface is to define a type struct and capture state of the implementation in it. This becomes a receiver for the methods defined in the interface.

So what would that state look like for the behavior we want to capture? It would need to have following elements:

// dispatcher implements Dispatcher interface.
type dispatcher struct {
queue *queue
mu sync.Mutex
}

We will soon implement the methods on this struct but let’s first understand what these fields would do.

queue is essentially a queue of functions scheduled for execution. We define queue type simply as a slice of func() and protect it using a mutex.

type queue []func()

So the idea is that we would put incoming function literals into this queue and draw from it only what we allow to be scheduled as a go-routine. So the next question is how do we implement the notion of a cap on the number of gophers?

Hawaiian gopher with a cap, made by gopherize.me

Implementing Capacity

In order to implement a notion of capacity for the number of go-routine executions, we need a few additional constructs. Let’s define cap as an int32 value, indicating the maximum number of go-routines we want to be running at any given moment in time. Let’s also define active as a pointer that gets updated from various concurrently executing go-routines. Using these two fields we should be able to express the notion of capacity for controlling the number of go-routines that are executing.

type dispatcher struct {
queue *queue
cap int32
active *int32
poke chan struct{}
mu sync.Mutex
}

Implementing Do

Now let’s look at implementing Do method to satisfy the interface we defined earlier. A Do method would push the incoming function literal into queue of the receiver and immediately trigger a call to the so-called dispatch method, which will execute a function literal by drawing from the queue only if a certain criteria is met. We will look at this criteria later.

func (d *dispatcher) Do(f func()) {
// lock
d.mu.Lock()
defer d.mu.Unlock()

// push into queue
d.queue.push(f)

d.dispatch()
}

So far so good! We have ensured that an incoming function literal f gets pushed into receiver queue protected by a mutex lock. Furthermore, an attempt is made to execute it right away… In other words, if nothing is pending, the behavior is equivalent to running the same func as go f().

However, the whole point of this exercise is to handle the cases when we have reached a certain capacity and we defer the execution of go-routines till some capacity frees up. The implementation I show below uses a polling model, which is perhaps not ideal and I should probably think of doing the same using buffered channels but that would be for my future post.

So what we have is a bot that runs in the background and monitors the queue

func (d *dispatcher) bot() {
go func() {
// run infinite loop waiting every second
for {
d.mu.Lock()
d.dispatch()
d.mu.Unlock()
select {
case <-d.poke:
case <-time.After(time.Second):
}
}
}()
}

The bot runs forever for the lifecycle of the parent program. In every cycle of the for loop, it either waits for one second (arbitrarily chosen) or if it receives some kind of a ping from the so-called poke channel. We will define the behavior of poke channel in a bit, but what we have so far ensures that any pending function literals in the queue will get dispatched as and when some criteria is met. The logic of dispatching against a criteria is defined in the dispatch implementation.

Dispatching Criteria

Every call to dispatch will try to execute as many function literals from queue as possible. At each step it will pop from the queue, increment active pointer atomically (that is what atomic.Add.. does) and then execute the function in a go-routine by wrapping it into another functional literal for message passing after the function executes.

func (d *dispatcher) dispatch() {
for *(d.active) < d.cap {
f := d.queue.pop()
if f == nil {
break
}

// increment the active counter
atomic.AddInt32(d.active, 1)

go func(active *int32, poke chan struct{}) {
f()
atomic.AddInt32(active, -1)
d.poke <- struct{}{}
}(d.active, d.poke)
}
}

Status Check

Finally, let’s look at how we can implement a status check for our scheduler. That would be a simple check on the active field of the receiver.

func (d *dispatcher) IsRunning() bool {
return *(d.active) > 0
}

And that is essentially what allows us to implement a capacity for executing go-routines. We can capture the user input for capacity via New function that allows us to instantiate the dispatcher.

Implementing New

// New provides a new instance of dispatcher
func New(numConcurrent int32) *dispatcher {
d := new(dispatcher)
d.queue = new(queue)
d.cap = numConcurrent
d.active = new(int32)
d.poke = make(chan struct{})
d.bot() // starts a daemon that will schedule pending func
return d
}

Summary

I wanted to write this post to share my perspective on how a function scheduler with a capacity could potentially be written in Go. It is probably not the best design but something I feel was a nice exercise in learning about Go’s concurrency features. All the code shown here is available at: https://github.com/sdeoras/dispatcher


Hanalei Pier, Kauai, Hawaii

I shot the featured image of Hanalei pier during sunrise. I love this pier particularly seen with the backdrop of those mountains. The sunrise glow is always nice to capture.