Applying Modern Go Concurrency Patterns to Data Pipelines

Resilient, fast, efficient and concise? Pick all four! How to apply old and new concurrency patterns to pipelines in Golang

AMBOSS
AMBOSS
Sep 20 · 11 min read
Photo by Victor on Unsplash

This article was written by Florian Beeres, Software Engineer at AMBOSS.

Table of Contents

Golang is known for its built-in facilities for writing concurrent programs, most notably channels. These channels let concurrent processes synchronize by sending messages to each other instead of sharing memory. Even with that built-in support for concurrent code, writing concurrent and parallel programs that are bug free can be a challenge. You still need to come up with your own strategy for error handling and context cancelation, and there are also a few “gotchas” surrounding the creation and cleanup of Go routines.

The official Go blog has a post from 2014 that showcases a principled approach to building pipelines in Go. At GopherCon 2018, Bryan C. Mills improved upon some of the techniques from the blog post, especially with an eye towards efficient resource utilization.

In this blog post we will build a simple pipeline, first using the techniques from the 2014 blog post and later on redoing the pipeline — this time with the techniques from the GopherCon video. Along the way, I’ll highlight common mistakes and how to solve them. Finally, I will show how the upcoming generics feature can be put to use to greatly cut down on boilerplate code.

A basic understanding of Go channels is required to understand the code and follow along.

The final code for this blog post can be found on GitHub and the commit history tries to stick to the chapters in this post. But you can also follow along by applying the Git diffs to the initial main.go file that is shown at the start of the next chapter.

A Simple Pipeline

If you run this with go run main.go you'll see a deadlock. Off to a good start!

Luckily this is easy to fix. The channel returned by producer is not buffered, meaning you can only send values to the channel if someone is receiving values on the other end. But since sink is called later in the program, there is no receiver at the point where outChannel <- s is called, causing the deadlock.

You can easily fix this by either making the channel buffered, in which case the deadlock will occur once the buffer is full, or by running the producer in a Go routine. The producer will still block that Go routine, but it will no longer affect the main thread.

Now we see some values being logged, but Go still complains about a deadlock!

This time the deadlock happens because outChannel is never closed, and therefore our sink will be waiting for new values until the end of time.

The solution is simple: close outChannel!

Whenever you write concurrent Go code, you should have a strategy for closing channels that you apply consistently throughout your program. One good strategy, which is mentioned in the linked blog post, is that whoever creates the channel is also in charge of closing it. This makes it easy to avoid sending to a closed channel, which would result in panic.

Graceful Shutdown With Context

But first, we’ll introduce an artificial delay so we have something to play around with.

Now it takes producer 3 seconds to send a value to the channel, so the entire program will take at least 9 seconds.

Next, we’ll add context.Context to our pipeline.

This Git diff is a bit bigger than the last ones, so let’s go through it one chunk at a time starting at the bottom in main.

We start by initializing a background context that can be canceled. Notice the deferred cancel call on the very next line. It's usually a good idea to place these deferred calls right on the next line so it's easy to see, at a glance, when cleanup happens.

The context is then passed to both producer and consumer. In both cases we replace the body of our loop with a select statement. This statement will try each branch but it won't block. This means that inside producer it'll first try to receive a value from the channel returned by ctx.Done(). If there is no value, it will try the next branch instead of blocking indefinitely, which is the same code as before.

The loop will therefore traverse the list and push each value into the channel, unless a cancellation signal is sent across the background context channel. In that case the function returns and runs its deferred calls, which ultimately closes the outChannel.

As a final sanity check, let’s comment out lines 49–52, so that the program runs to completion.

And what do you know, it never finishes. It does log the three values, but then it just hangs, without any indication of what it’s doing. What gives?

This can be really tedious to debug if you don’t know what you’re looking for. Through the arcane art of sprinkling random print statements over our code, we might eventually end up with a print statement in the case val, ok := <-values: branch inside sink.

This will flood our terminal with empty log messages, like this: 2021/09/08 12:29:30. Apparently the for loop in sink keeps running forever. This is surprising considering that we definitely closed the outChannel.

The answer to this question is displayed prominently in the official documentation on channels:

A receive operation on a closed channel can always proceed immediately, yielding the element type’s zero value after any previously sent values have been received.

Our loop runs forever and on every iteration it’ll receive from the closed outChannel , yielding the zero value, which happens to be an empty string. To avoid this behavior, we need to stop receiving values once the channel is drained. That can be accomplished by making use of the boolean value we get from the receive operation.

The value of ok is true if the value received was delivered by a successful send operation to the channel, or false if it is a zero value generated because the channel is closed and empty.

It’s really important to know how channels work, because these bugs can be very hard to figure out. Imagine that your pipeline receives a cancelation signal from an external service, but not in every run. This means that the bug will appear in some, but not all runs, and that makes debugging quite challenging.

Anyway, with the addition of an else branch our program now finishes without any hiccups.

Adding Parallelism with Fan-Out and Fan-In

Imagine that this second stage took 10 seconds to run, but our producer can go through the entire list of strings in a fraction of a second. With only a single Go routine processing the items, we’d be waiting for 30s to process the three strings. On a machine with multiple CPU cores, we can do better!

Let’s create a bunch of Go routines that all run the same pipeline step. That way we should be able to run the same function in parallel and bring total execution time down to around 10 seconds — the duration of a single transformation function.

It’s tempting to write a short for loop that spawns, let’s say, 10 Go routines and then passes a single output channel to each routine. The problem with this approach is that you need to be very careful not to cause a panic when you close the output channel.

Remember, sending values to a closed channel is a panic.

It’s therefore much simpler (but not necessarily easier) to have every spawned Go routine create and close its own output channel, as mentioned earlier. The downside is that you need extra code to merge those channels together. But you’ll sleep much better knowing that your program is far less likely to panic.

The idea here is to run a loop that spawns as many Go routines as we have CPU cores available. In each loop iteration, we create a Go routine that runs the same pipeline step function. That step function returns a channel, which we append to a variable that will contain all channels thus created. Finally, we merge all of those channels together and pass the resulting single channel to sink.

The implementation of the channel merging function is copied more or less verbatim from various existing blog posts out there. The number one disadvantage of this approach is that you’ll have to create an almost exact copy of this function for every type your pipeline has to process.

Right now, the most idiomatic way of coping with this is either manual copy and paste or code generation. But in the future, you’ll be able to use generics to dynamically generate specialized versions of this function. Please see the last two chapters of this post for a working example that uses generics.

Lastly, I added the second transformation and reduced the delays a bit. Instead of a diff, I’ll give you the complete program, so we’re all on the same page.

Before we move on to the next chapter, here’s a link to the full source code at this point in the blog post, so we’re all on the same page.

Error Handling

The most common way of propagating errors that I’ve seen is through a separate error channel. Unlike the value channels that connect pipeline stages, the error channels are not passed to downstream stages.

Instead we gather up all error channels, merge them and pass them to sink. If we receive any error in any channel we cancel the context, thereby closing all channels that are still active, and we log the error.

Removing Boilerplate With Generics

And indeed, since the only thing that changes from one merge function to another is the type parameter, it is really straight forward to remove all but one merge function.

We first declare a type parameter T, which is a bit like a variable. We use this T in two places. First, it's the type of the channels passed to the merge function, and secondly it's the type of the returned channel. In other words the channels passed to the function as an argument, and the returned channel must have the same type since we use the same type parameter in both places.

When we compile our program, T will be replaced with the concrete type at the call site. In our case, T will either refer to string or error.

In that sense, the generic function is like a blueprint for more specific versions:

Maximum Efficiency With Semaphores

To limit the number of Go routines to the available work, we need to completely change the loop that creates Go routines. Instead of creating a fixed number of Go routines, we will range over the input channel. For every value we receive from it, we will spawn a Go routine (see the example in the semaphore package).

This means that we will only have a single channel per pipeline stage and must therefore pay attention when we close that channel.

The snippet below is not the complete main.go file, but since producer and sink are the same as before, we'll focus on the parts that have changed. Notice how we are now ranging over the input channel to spawn Go routines instead of having a loop go from 0 to NumCPU(). This lets us defer doing the next unit work until one is actually available.

The second thing to watch out for is that we call sem.Acquire before we start the Go routine. Otherwise, you create a Go routine that will immediately block until sem.Acquire succeeds. That's just wasting memory so we want to avoid this.

The third and final piece of the puzzle is the last call to sem.Acquire, where we try to acquire all tokens at once. This effectively waits until we've drained inputChannel and have completed all operations. Only then can we close our two remaining channels so sink knows when to exit. If you comment out close(outputChannel) the program will go through all values but then deadlock.

You could now extract the whole part where we range over inputChannel and create a slight variation of this function for every pipeline stage. But as we saw in the previous bonus chapter, Go generics have the potential to make copy and pasting entirely unnecessary.

Can we apply the same technique here as well? Absolutely!

The only thing that changes from one pipeline stage to the next is the function that we call on the input. Here it’s strings.ToLower, but in the next it could be something that returns integers for example.

What we need is a function that accepts values of type A and returns values of type B. Maybe also throw in the potential for returning errors, we'll probably need it at some point. This gives us func (input A) (B, error). The pipeline step itself will take this transformation function as an input, in addition to an input channel, that must send values of type A, and an output channel type B.

What does that look like in actual code? Here you go:

This gives us a pretty neat looking main function:

We can run the whole snippet to verify that it actually works:

$ time go run -gcflags=-G=3 main.go
2021/09/09 13:51:17 sink: Bax
2021/09/09 13:51:17 sink: Bar
2021/09/09 13:51:17 error: oh no
2021/09/09 13:51:17 context canceled
________________________________________________________
Executed in 6.60 secs fish external
usr time 142.51 millis 0.12 millis 142.40 millis
sys time 282.23 millis 1.81 millis 280.42 millis

Summary

A newer technique uses the semaphore concept, most likely with the semaphore package, to make sure we only spawn Go routines if we have work to do. In this case, the Go routines no longer create and return their own output channels. This requires a bit more attention to when channels are closed, but it's more efficient.

We can apply generics to both approaches to eliminate the need for code repetition. In the first pipeline, we avoid having multiple copies of the function that merges channels. In the second pipeline, we use generics to create a generic pipeline step function.

I hope that you now feel empowered to write concurrent and parallel programs in Go, if it makes sense for the problem you’re trying to solve.

Engineering, Product and Design at AMBOSS

Stories from the AMBOSS team.