Go Concurrency: Producer/Consumer

Guy J Grigsby
Star Gazers
Published in
4 min readOct 9, 2020
Go Gopher used under Creative Commons 3.0 license.

I promised my readers, all three of them, more on concurrency in Go a long time ago and I figure it’s about time to pay up.

This next pattern, also frequently called ranging over a channel, is one that I like a lot, but I have had trouble with this one. I always had to look it up because I kept forgetting how to close the channel at the right time. As an interesting aside, you don’t have to close a channel unless the receiver need to know that it has been closed. Otherwise you can just leave it open and it will get garbage collected. See this comment from Rob Pike for more about that. We’ll start with the case where we know exactly the number of “things” the producer will produce because that’s the least complex.

To start we need a producer.

This bit is incomplete, and because we have no synchronization method, it just exits immediately. The producer is started is a separate go routine, but but blocks at the first send on our channel. Nothing else happens because there are no goroutines to read from the channel and we are missing synchronization. So let’s make some consumers.

Let’s go over what this code is doing, even though it doesn’t work quite yet. We are creating 10 consumers and they are all ranging over the same channel. The range loop on a channel will handle the synchronization between consumers for us. The program will still exit right away though because we don’t have a way to make the main goroutine wait until all of the consumers are done. One way to fix this is the use a sync.WaitGroup. A wait group can help us block on all the other running goroutines until they are finished. The following code has a wait group added.

The first draft:

Basic producer/consumer pattern: Go Playground

This is elegant and lovely, but because it is the most simple case, it doesn’t account for everything that we may want to do. It doesn’t collect errors and it only works if we know ahead of time the number of things the producer will produce. Depending on your use case, this may be ok, but lets dig a little deeper in case you need other features.

First we’ll add the ability to produce an arbitrary number of things. Sometimes in a real world situation, you will not know how many of a thing the producer will produce. This is the part that always got me in the past. I was overthinking it. For a long time I used a separate goroutine as a kind of “coordinator” because I misunderstood how the channel was working. It turns out that the producer can close the channel when it is done and the consumers will continue to receive the final send before returning.

Producer/Consumer with arbitrary production Go Playground

Now the producer will exit when it’s done and close the channel so that all the consumers will exit as soon as they are finished. :)

There is one more common case that I want to visit. That is the case where we need to capture a return value from the consumers. That could be an error or some piece of data or both. The implementations are slightly different depending on what you want to gather. We will start with the case that we want to check for errors because go has a type called an errgroup.Group. It’s a lot like a WaitGroup, but rather than wait on all goroutines, it waits until the first error is returned by one of the goroutines. You can read more about it here.

Producer/Consumer w/errogroup Go Playground

Look at that! It’s pretty similar to a waitgroup. The biggest change, I think, is that you pass a function to the errgroup.Group rather than manually handling the goroutines. As I mentioned, this will exit gracefully when it encounters the first error. If you need to collect all the errors for some reason, you will have to do that manually using the method we are about to go over.

What if we need to return arbitrary data from the consumers? When I think of producer/consumer, I think of using the consumers as independent workers that don’t usually return data, but are cases where we need to collect something from them so we’ll go over that.

We’ll remove the error group for this example because if we think of the consumers an independant workers, then it doesn’t really affect all of them if one has an error. We will gather any errors that occur from all consumers in addition to the data that each “worker” could return. So basically, a consumer can return either an error or data about the completed job. We’ll keep the wait group, but have to make some changes to how it works. We will use a “coordinator” that I mentioned earlier because we need to make sure we get one and only one response per goroutine. We’ll use a pretty sweet construct in Go, the select.

Producer/Consumer with returns Go Playground

Tada! Now we can concurreny process data from our consumer and gather the returns as well. By this point, we’ve gone beyond what we started out to and ended up with a scatter gather pattern again!

If we were writing this for production, we would want to include a context and have another part in our select that checks for ctx.Done and then observes ctx.Err(). That way we could include a timeout that would clean up all the go routines if it took too long. I encourage anyone reading this to try to add that part.

--

--

Guy J Grigsby
Star Gazers

Technologist, taco lover and Gopher. Technologist and software engineer in distributed systems. Neuro-divergent.