Reflecting on Worker Pools in Go

Cody Oss
6 min readMay 6, 2019

--

The worker pool pattern in Go is a pattern many Gophers have implemented at least once in their career. It is a great way to play with some of Go’s awesome concurrency primitives and to parallelize work. As I typed out this pattern for the Nth time I wonder to myself , is there a better way? I have implement similar patterns in other languages, and have been able to abstractly build these worker pipelines via generics. But Go does not have generics, at least not yet… So what does Go have that I could use in its place? Reflection.

Is there a better way?

Let’s not put the cart before the horse though, taking a step back…

What is the Worker Pool Pattern?

The snippet below shows an example of a three step worker pool. Lets break down the sections of it.

  1. This is all the setup. We pick a parallelism factor (in this case 5) we want to work with, we create some channels for our worker pools to communicate over, and we create WaitGroups to keep track of all of the goroutines we are about to spawn.
  2. We launch our worker pools and have the proceeding worker pool read from the output channel of the previous pool launched.
  3. We insert some input and wait for the the result to be printed. This could also be done as its own pool as the channel also has a buffer of 5, but to keep things simple for now this work is being kept in the main goroutine.
  4. This is the cleanup portion. Close the first channel, let its input drain and naturally break out of the range loop, wait on the WaitGroup, and repeat until everything has been gracefully shutdown.

This all works, and to be very clear there is absolutely nothing wrong with this code (unless you tell me otherwise in the comments). Again though, I was curious, is there a better way?

Enter flo

So I decided to build an abstraction over this pattern. I call the project flo (short for workflow). Let me first show you what the equivalent code looks like in flo.

  1. To keep the example similar to the one above I decided to still have an input channel used to feed the flo and an output channel used to pull the final results from the flo.
  2. This is the equivalent to section three of the first example. I needed to launch this before the next section of code because that section will block the current goroutine. Also, because I am launching this in a goroutine I added a WaitGroup to keep track of this code.
  3. This is the major difference from the previous example. We start building a flo with the NewBuilder function. This takes some optional arguments; in this case the registration of an input and an output channel to feed and receive data from the flo. Also, a parallelism factor is chosen. This will configure how many workers to launch for each pool and the size of the buffered channel they communicate over.
  4. Here you will notice that the signatures to the function changed a little bit from our first example:
func add1(i int)int

changed to:

func add1(ctx context.Context, i int) (int, error)

A context is added as the first parameter so that the context that is used to start the flo when BuildAndExecute is called can be properly propagated to all of the worker goroutines. This allows flo to support context cancellation from top-to-bottom. An error is also added to the output parameters. Although it is not being used in this example, it allows to stop processing a stream of data at any step in the pipeline. If add2 returned an error, stringify would never be called for that stream of data. This does not stop the execution of any worker goroutines. Optionally, flo allows you to register error handlers should you want to do anything when an error does occur at any step.

How does flo use reflect?

Well, lets take a look at the signatures of some of those methods that were being called in the example above.

func WithInput(ch interface{}) Option { ... }
func WithOutput(ch interface{}) Option { ... }
type Step interface{}
func (b *Builder) Add(s Step, options ...StepOption) *Builder { .. }

I think you get the picture. The only way to support a generic api in Go, that I know of, is interface{}. But to make the the bits of code that the user of the library writes be able to use strong types instead of the empty interface, flo does a bunch of reflection to assert that all the types line up properly. This is much better explained in the godoc if you are curious. At a high level though, everything is validated before the flo processes any data.

What kind of reflection is required to make this work?

  • Get the reflect.Value and reflect.Type of many things
  • invoking functions
  • converting things to empty interfaces
  • looking at the types of inputs and outputs of functions
  • Comparing against reflect.Kind
  • Sending and receiving data from channels
  • Looking at the direction data can flow through a channel
  • Checking if types implement interfaces

I know what you are thinking…

That sounds like a lot of reflection, doesn’t that slow down your code?

Yup. Although a lot of the reflection done in the library is executed before any data processing occurs, a fair bit is still needed at runtime.

Benchmarking flo vs Native Code

In the flo Github repo I wrote a benchmark to compare the performance hit for all of this reflection. The code used to test this is more-or-less the code from the examples above. If you want to checkout the benchmark for yourself look in flo_test.go. The results looked like this when running the benchmark from my laptop:

$ go test -bench=. -benchmem
goos: linux
goarch: amd64
pkg: github.com/codyoss/flo
BenchmarkFlo-8 200000 6577 ns/op 520 B/op 12 allocs/op
BenchmarkNonFlo-8 1000000 1057 ns/op 0 B/op 0 allocs/op
PASS
ok github.com/codyoss/flo 2.462s

I may be wrong, as I have not looked too deeply into this yet, but I believe most if not all of the slowdown and the allocations come from the reflect package. Some of these allocations seem legit but it also looks like there are a couple places the reflect package could be cleaned up for a bit of a performance boost. The main thing I saw was this todo:

func valueInterface(v Value, safe bool) interface{} { 
...
// TODO: pass safe to packEface so we don't need to copy if
// safe==true?
return packEface(v)
}

But at the end of the day, I knew this code was not going to be blazingly fast. I think it is a basic understanding that when you play with reflection, you pay the price.

The real reason I created this project was to explore the reflect package more and that much I did accomplish. I will tell you that it is not the most straight-forward package to use. A lot of the methods will panic if they are used on a value of the wrong underlying type. The api expects you to have clearly read the docs. This is not such a bad thing, but I think it does discourage many from exploring it. I encourage you to checkout flo and see how I used reflection to accomplish abstracting away the worker pattern.

Takeaways

  • Was there a better way? Meh… I wrote a library that traded, what I find to be, a fun syntax that reduces lines of code for performance. I have no regrets!
  • Reflection is powerful, but comes with a performance cost.
  • I need to look into the todo listed above and see if fixing it would improve the performance of this package. If it does maybe I will open an issue/PR.
  • Learning more about the packages in the standard library can give you good perspective into new patterns and ways of doing things. Don’t be afraid!
  • I need to use the reflect package a little more still before I feel comfortable.
  • I am curious to see if the performance of this code would improve should it be re-implemented with generics/contracts once that feature comes out. I will follow-up when this happens.
  • Checkout the repo:

If you made it this far, Thank You.

--

--

Cody Oss

Developer Programs Engineer @ Google, Go enthusiast, lover of music. GH: https://github.com/codyoss