Applying Modern Go Concurrency Patterns to Data Pipelines

Resilient, fast, efficient and concise? Pick all four! How to apply old and new concurrency patterns to pipelines in Golang

Photo by Victor on Unsplash

This article was written by Florian Beeres, Software Engineer at AMBOSS.

Table of Contents

Golang is known for its built-in facilities for writing concurrent programs, most notably channels. These channels let concurrent processes synchronize by sending messages to each other instead of sharing memory. Even with that built-in support for concurrent code, writing concurrent and parallel programs that are bug free can be a challenge. You still need to come up with your own strategy for error handling and context cancelation, and there are also a few “gotchas” surrounding the creation and cleanup of Go routines.

The official Go blog has a post from 2014 that showcases a principled approach to building pipelines in Go. At GopherCon 2018, Bryan C. Mills improved upon some of the techniques from the blog post, especially with an eye towards efficient resource utilization.

In this blog post we will build a simple pipeline, first using the techniques from the 2014 blog post and later on redoing the pipeline — this time with the techniques from the GopherCon video. Along the way, I’ll highlight common mistakes and how to solve them. Finally, I will show how the upcoming generics feature can be put to use to greatly cut down on boilerplate code.

A basic understanding of Go channels is required to understand the code and follow along.

The final code for this blog post can be found on GitHub and the commit history tries to stick to the chapters in this post. But you can also follow along by applying the Git diffs to the initial main.go file that is shown at the start of the next chapter.

A Simple Pipeline

To kick things off, we will implement a simple pair of producer and consumer. The producer goes over a list of words and sends them to a channel, while the consumer is receiving values from that channel and printing them to the console.

If you run this with go run main.go you'll see a deadlock. Off to a good start!

Luckily this is easy to fix. The channel returned by producer is not buffered, meaning you can only send values to the channel if someone is receiving values on the other end. But since sink is called later in the program, there is no receiver at the point where outChannel <- s is called, causing the deadlock.

You can easily fix this by either making the channel buffered, in which case the deadlock will occur once the buffer is full, or by running the producer in a Go routine. The producer will still block that Go routine, but it will no longer affect the main thread.

Now we see some values being logged, but Go still complains about a deadlock!

This time the deadlock happens because outChannel is never closed, and therefore our sink will be waiting for new values until the end of time.

The solution is simple: close outChannel!

Whenever you write concurrent Go code, you should have a strategy for closing channels that you apply consistently throughout your program. One good strategy, which is mentioned in the linked blog post, is that whoever creates the channel is also in charge of closing it. This makes it easy to avoid sending to a closed channel, which would result in panic.

Graceful Shutdown With Context

Especially in Go web development it’s common to thread a context value through all of your long running functions, so that you can cancel those functions gracefully and perform cleanup if necessary. In my experience pipeline code will almost always require this functionality, so let’s implement it!

But first, we’ll introduce an artificial delay so we have something to play around with.

Now it takes producer 3 seconds to send a value to the channel, so the entire program will take at least 9 seconds.

Next, we’ll add context.Context to our pipeline.

This Git diff is a bit bigger than the last ones, so let’s go through it one chunk at a time starting at the bottom in main.

We start by initializing a background context that can be canceled. Notice the deferred cancel call on the very next line. It's usually a good idea to place these deferred calls right on the next line so it's easy to see, at a glance, when cleanup happens.

The context is then passed to both producer and consumer. In both cases we replace the body of our loop with a select statement. This statement will try each branch but it won't block. This means that inside producer it'll first try to receive a value from the channel returned by ctx.Done(). If there is no value, it will try the next branch instead of blocking indefinitely, which is the same code as before.

The loop will therefore traverse the list and push each value into the channel, unless a cancellation signal is sent across the background context channel. In that case the function returns and runs its deferred calls, which ultimately closes the outChannel.

Notice that we’re using two case clauses on line 25 and 27, instead of replacing the second with a default clause. If we used a default clause, and outChannel would not be ready to receive values, we’d be stuck blocking on outChannel. By using a second case on the other hand, Go will continue to switch between trying ctx.Done and outChannel<- s.

As a final sanity check, let’s comment out lines 47–50, so that the program runs to completion.

And what do you know, it never finishes. It does log the three values, but then it just hangs, without any indication of what it’s doing. What gives?

This can be really tedious to debug if you don’t know what you’re looking for. Through the arcane art of sprinkling random print statements over our code, we might eventually end up with a print statement in the case val, ok := <-values: branch inside sink.

This will flood our terminal with empty log messages, like this: 2021/09/08 12:29:30. Apparently the for loop in sink keeps running forever. This is surprising considering that we definitely closed the outChannel.

The answer to this question is displayed prominently in the official documentation on channels:

A receive operation on a closed channel can always proceed immediately, yielding the element type’s zero value after any previously sent values have been received.

Our loop runs forever and on every iteration it’ll receive from the closed outChannel , yielding the zero value, which happens to be an empty string. To avoid this behavior, we need to stop receiving values once the channel is drained. That can be accomplished by making use of the boolean value we get from the receive operation.

The value of ok is true if the value received was delivered by a successful send operation to the channel, or false if it is a zero value generated because the channel is closed and empty.

It’s really important to know how channels work, because these bugs can be very hard to figure out. Imagine that your pipeline receives a cancelation signal from an external service, but not in every run. This means that the bug will appear in some, but not all runs, and that makes debugging quite challenging.

Anyway, with the addition of an else branch our program now finishes without any hiccups.

Adding Parallelism with Fan-Out and Fan-In

Going straight from producer to consumer isn’t really a pipeline, so let’s add the second stage that transforms all strings to lower case. We can pretty much copy/paste the producer stage and add strings.ToLower. But that's not very educational, so we'll use this opportunity to add more parallelism to our program.

Imagine that this second stage took 10 seconds to run, but our producer can go through the entire list of strings in a fraction of a second. With only a single Go routine processing the items, we’d be waiting for 30s to process the three strings. On a machine with multiple CPU cores, we can do better!

Let’s create a bunch of Go routines that all run the same pipeline step. That way we should be able to run the same function in parallel and bring total execution time down to around 10 seconds — the duration of a single transformation function.

It’s tempting to write a short for loop that spawns, let’s say, 10 Go routines and then passes a single output channel to each routine. The problem with this approach is that you need to be very careful not to cause a panic when you close the output channel.

Remember, sending values to a closed channel is a panic.

It’s therefore much simpler (but not necessarily easier) to have every spawned Go routine create and close its own output channel, as mentioned earlier. The downside is that you need extra code to merge those channels together. But you’ll sleep much better knowing that your program is far less likely to panic.

The idea here is to run a loop that spawns as many Go routines as we have CPU cores available. In each loop iteration, we create a Go routine that runs the same pipeline step function. That step function returns a channel, which we append to a variable that will contain all channels thus created. Finally, we merge all of those channels together and pass the resulting single channel to sink.

The implementation of the channel merging function is copied more or less verbatim from various existing blog posts out there. The number one disadvantage of this approach is that you’ll have to create an almost exact copy of this function for every type your pipeline has to process.

Right now, the most idiomatic way of coping with this is either manual copy and paste or code generation. But in the future, you’ll be able to use generics to dynamically generate specialized versions of this function. Please see the last two chapters of this post for a working example that uses generics.

Lastly, I added the second transformation and reduced the delays a bit. Instead of a diff, I’ll give you the complete program, so we’re all on the same page.

Before we move on to the next chapter, here’s a link to the full source code at this point in the blog post, so we’re all on the same page.

Error Handling

There are various ways of handling errors in a pipeline, especially once you start running the same function in parallel. You can gather all errors and let the pipeline continue, but you can also abort the pipeline on the first error, which is what we’re going to do.

The most common way of propagating errors that I’ve seen is through a separate error channel. Unlike the value channels that connect pipeline stages, the error channels are not passed to downstream stages.

Instead we gather up all error channels, merge them and pass them to sink. If we receive any error in any channel we cancel the context, thereby closing all channels that are still active, and we log the error.

Removing Boilerplate With Generics

💡 Go 1.18 is now available and supports generics out of the box. You therefore no longer need the special -gcflags parameter when using Go 1.18

One unfortunate consequence of merging channels is that you’ll need a separate version of the merge function for every type that your pipeline deals with. This is precisely where generics can help! If you can install Go 1.17, then you can try the current generics implementation by running your program like this: go run -gcflags=-G=3 main.go. Nix users can use the Go version from the repository, by running nix develop in the directory.

And indeed, since the only thing that changes from one merge function to another is the type parameter, it is really straight forward to remove all but one merge function.

We first declare a type parameter T, which is a bit like a variable. We use this T in two places. First, it's the type of the channels passed to the merge function, and secondly it's the type of the returned channel. In other words the channels passed to the function as an argument, and the returned channel must have the same type since we use the same type parameter in both places.

When we compile our program, T will be replaced with the concrete type at the call site. In our case, T will either refer to string or error.

In that sense, the generic function is like a blueprint for more specific versions:

Maximum Efficiency With Semaphores

Another downside of the original program, apart from the code repetition, is that we potentially do more work than necessary. What if our input list only had a single element in it? Then we only need a single Go routine, not NumCPU() Go routines. This is what Bryan C. Mills mentioned repeatedly in his talk.

To limit the number of Go routines to the available work, we need to completely change the loop that creates Go routines. Instead of creating a fixed number of Go routines, we will employ our usual select strategy and wait until we’ve either received a new value over the input channel or a cancel signal through context. For every value we receive, we spawn a new Go routine (see the example in the semaphore package).

The snippet below is not the complete main.go file, but since producer and sink are the same as before, we'll focus on the parts that have changed. One thing you’ll have to change, after replacing the main() function, is to add “golang.org/x/sync/semaphore” to the import list. Notice how we are now ranging over the input channel to spawn Go routines instead of having a loop go from 0 to NumCPU(). This lets us defer doing the next unit of work until one is actually available.

The second thing to watch out for is that we call sem.Acquire before we start the Go routine. Otherwise, you create a Go routine that will immediately block until sem.Acquire succeeds. That's just wasting memory so we want to avoid this.

The third and final piece of the puzzle is the last call to sem.Acquire, where we try to acquire all tokens at once. This effectively waits until we’ve drained inputChannel, at which point we will be receiving nilvalues over the channel, okwill be false and we’ll enter the else branch. We now wait until all currently running operations are completed, similar to waitgroup.Wait(), and then we close our two remaining channels, so sink knows when to exit. If you comment out close(outputChannel) the program will go through all values but then deadlock.

The order of operations is therefore:

  1. Wait until the input channel has a value (is there something to do?)
  2. Try to acquire a semaphore (do we have resources to work on something now?)
  3. Start a Go routine (perform work)
  4. Release semaphore (we’re done with one value)
  5. Wait until all semaphores are released (we’re done with all values)

You could now extract the whole part where we range over inputChannel and create a slight variation of this function for every pipeline stage. But as we saw in the previous bonus chapter, Go generics have the potential to make copy and pasting entirely unnecessary.

Can we apply the same technique here as well? Absolutely!

The only thing that changes from one pipeline stage to the next is the function that we call on the input. Here it’s strings.ToLower, but in the next it could be something that returns integers for example.

What we need is a function that accepts values of type A and returns values of type B. Maybe also throw in the potential for returning errors, we'll probably need it at some point. This gives us func (input A) (B, error). The pipeline step itself will take this transformation function as an input, in addition to an input channel, that must send values of type A, and an output channel type B.

The snippet below demonstrates what the new step function looks like. Note that the step function still creates and closes the channels it writes to. This is in line with our earlier credo that;

Whoever creates the channel is also in charge of closing it.

You might be wondering why we have nested Go routines in this function. The outer Go routine is necessary because writing to outputChannel is a blocking operation. By writing to it from a Go routine, our program continues to execute code until it gets to a stage where someone is actually listening to outputChannel. Essentially this avoids a deadlock.

The inner Go routines are the ones we create on-demand.

This gives us a pretty neat looking main function:

We can run the whole snippet to verify that it actually works:

package mainimport (
"context"
"errors"
"log"
"strings"
"sync"
"time"
"golang.org/x/sync/semaphore"
)
func producer(ctx context.Context, strings []string) (<-chan string, error) {
outChannel := make(chan string)
go func() {
defer close(outChannel)
for _, s := range strings {
select {
case <-ctx.Done():
return
case outChannel <- s:
}
}
}()
return outChannel, nil
}
func sink(ctx context.Context, cancelFunc context.CancelFunc, values <-chan string, errors <-chan error) {
for {
select {
case <-ctx.Done():
log.Print(ctx.Err().Error())
return
case err := <-errors:
if err != nil {
log.Println("error: ", err.Error())
cancelFunc()
}
case val, ok := <-values:
if ok {
log.Printf("sink: %s", val)
} else {
log.Print("done")
return
}
}
}
}
func step[In any, Out any](
ctx context.Context,
inputChannel <-chan In,
fn func(In) (Out, error),
) (chan Out, chan error) {
outputChannel := make(chan Out)
errorChannel := make(chan error)
limit := int64(2)
// Use all CPU cores to maximize efficiency. We'll set the limit to 2 so you
// can see the values being processed in batches of 2 at a time, in parallel
// limit := int64(runtime.NumCPU())
sem1 := semaphore.NewWeighted(limit)
go func() {
defer close(outputChannel)
defer close(errorChannel)
for {
select {
case <-ctx.Done():
break
case s, ok := <-inputChannel:
if ok {
if err := sem1.Acquire(ctx, 1); err != nil {
log.Printf("Failed to acquire semaphore: %v", err)
break
}
go func(s In) {
defer sem1.Release(1)
time.Sleep(time.Second * 3)
result, err := fn(s)
if err != nil {
errorChannel <- err
} else {
outputChannel <- result
}
}(s)
} else {
if err := sem1.Acquire(ctx, limit); err != nil {
log.Printf("Failed to acquire semaphore: %v", err)
}
return
}
}
}
}()
return outputChannel, errorChannel
}
func Merge[T any](ctx context.Context, cs ...<-chan T) <-chan T {
var wg sync.WaitGroup
out := make(chan T)
output := func(c <-chan T) {
defer wg.Done()
for n := range c {
select {
case out <- n:
case <-ctx.Done():
return
}
}
}
wg.Add(len(cs))
for _, c := range cs {
go output(c)
}
go func() {
wg.Wait()
close(out)
}()
return out
}
func transformA(s string) (string, error) {
log.Println("transformA input: ", s)
return strings.ToLower(s), nil
}
func transformB(s string) (string, error) {
log.Println("transformB input: ", s)
// Comment this out to see the pipeline finish successfully
if s == "foo" {
return "", errors.New("oh no")
}
return strings.Title(s), nil
}
func main() {
source := []string{"FOO", "BAR", "BAX"}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
readStream, err := producer(ctx, source)
if err != nil {
log.Fatal(err)
}
step1results, step1errors := step(ctx, readStream, transformA)
step2results, step2errors := step(ctx, step1results, transformB)
allErrors := Merge(ctx, step1errors, step2errors)
sink(ctx, cancel, step2results, allErrors)
}

$ time go run -gcflags=-G=3 main.go
2021/09/09 13:51:17 sink: Bax
2021/09/09 13:51:17 sink: Bar
2021/09/09 13:51:17 error: oh no
2021/09/09 13:51:17 context canceled
________________________________________________________
Executed in 6.60 secs fish external
usr time 142.51 millis 0.12 millis 142.40 millis
sys time 282.23 millis 1.81 millis 280.42 millis

Summary

In the classic pipeline pattern, you spawn a fixed number of Go routines, pass an input channel to each, and then merge the output channels together. Each Go routine is in charge of its own output channel, which makes deadlocks less likely.

A newer technique uses the semaphore concept, most likely with the semaphore package, to make sure we only spawn Go routines if we have work to do.

We can apply generics to both approaches to eliminate the need for code repetition. In the first pipeline, we avoid having multiple copies of the function that merges channels. In the second pipeline, we use generics to create a generic pipeline step function.

I hope that you now feel empowered to write concurrent and parallel programs in Go, if it makes sense for the problem you’re trying to solve.

*This article was updated on May 20, 2022.**

**This article was updated on September 12, 2022.**

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store
AMBOSS

AMBOSS

Empowering all doctors to provide the best possible care. Join our team: amboss.com/us/jobs