Building an Unbounded Channel in Go

Post One in a series on Go

Concurrency in Go is built on three interdependent features: goroutines, channels, and the select statement. Granted, there are helpers like sync.WaitGroup and sync.Once that make concurrency simpler. Also, under the covers you have standard lower-level features, like mutexes, conds, and atomics. But when people solve problems with concurrency in Go, they are mostly looking at goroutines, channels and selects. Let’s explore channels a bit more.

Channels come in two flavors: buffered and unbuffered. The default — unbuffered channels — pass values from one goroutine to another, one at a time. The goroutine that writes to the channel blocks until another goroutine reads from the same channel. If the read happens before the write, the reading goroutine blocks until there is data in the channel.

A buffered channel gives writers a bit more flexibility. When a channel is buffered, a set number of values can be written to the channel and not read before the channel blocks. It behaves like a synchronized queue with a bounded size.

But what if you want a goroutine to be able to write an infinite amount of data without waiting for a reader? Buffered channels seem to offer a solution to this since they allow one goroutine to keep writing without waiting for another goroutine to read. The problem is that buffered channels are always bounded; if you use buffered channels in this way, you will potentially (and perhaps eventually) hit a point where the writing goroutine is blocked waiting for a reader. That leaves the question: How do I create a writer that never blocks on a write to a channel?

This problem is harder than it looks. Since Go won’t automatically store an unbounded amount of data in a buffered channel, we need to put an intermediary between writing to a channel and reading from a channel. This imposes some limitations.

The first is that we are limited to channels of type interface{}. This is because we cannot write our own generic functions in Go, and we need to write a function to provide, well, functionality. The arguments over generics in the Go community have been long-ranging, and rather than revisit the issue here, we’re going to just explore what works.

The second limitation is that we need to turn our single channel into two channels. Since Go does not allow operator overloading, there’s no way to hide our functionality behind an overloaded <- operator. Unlike the built-in make function that returns a single channel, we will need to return two channels, one that we write to (let’s call this channel in), and a second channel that’s used for reading from our unbounded channel (let’s call this channel out).

While you can have multiple readers of the out channel and multiple writers to the in channel, we need to enforce some limitations:

  • You cannot read from the in channel
  • You cannot write to the out channel
  • You cannot close the out channel

Doing any of these things will break the unbounded channel semantics and potentially trigger a panic.

Given these requirements, here’s a proposed function signature:

Our function will take in nothing and return two channels. We don’t need any input parameters, because there’s only one possible channel type, interface{}. The first output parameter is the in channel that is write-only (and closable) and the second output parameter is the read-only out channel. By using the channel direction markers on the returned channels, we prevent our users from performing actions that will break our unbounded channel. This eliminates sources of bugs for our users.

Testbed

Before we write the code, we should write a test that tells us if the code is working correctly. We’re going to test our function with the following code:

This code sends 100 values — 0 through 99 — and then makes sure they all come out the other side in the correct order. It does not make sure that the writes and reads are asynchronous. In order to do that we need a test that slows down the writing side and a test that slows down the reading side. We can do that by creating two more tests, one inserting time.Sleep(50*time.Millisecond) into the writing loop and and one inserting the same code into the reading loop instead. The code is otherwise identical.

Now that we have figured out what our interface looks like and how to test it, let’s see how we can build this function up.

Design

In order to not block when writing, we need to read as soon as the in channel is written to and store the values in the order they are received. The number of values stored needs to be unbounded, in case the reader is slower than the writer. An unbounded, in-order list of values implies that we should use a slice to store the queued data. We also need to launch our own goroutine to do the reads from the in channel.

So far, this isn’t difficult. The pattern for the first part of the problem is:

1) Create the in and out channels

2) Launch a goroutine

a) While the in channel is open

i) Read from the in channel

ii) Put the value from the in channel into a slice

b) Close the out channel

c) Exit the goroutine

3) Return the in and out channels

Code to implement this would look like this:

We are taking advantage of the , ok idiom in Go to detect if a channel is closed. Reading from a channel usually looks like this:

This will block until there is a value to read from the channel ch. The value will then be assigned to v. When you try to read from a closed channel, the channel always returns a value, which is the zero value for the channel. Since it’s legal to write a zero value to a channel, how do you tell if the value read from a channel is a zero value that was written, or a zero value because the channel is closed? You do so by using a second, boolean parameter as part of the read from the channel:

If ok is true, the channel is open. If ok is false, the channel is closed. We use this information to know when to break out of the loop.

One other thing to note:

we need to use a label on the for loop so we can break out from the case clause. If a label isn’t used, the break will apply to the current case, and the for loop will run forever.

If we run our test now, it will fail as expected, because we never write anything back out again:

Writing Without Blocking

Now that we are queuing up all of the values that are written, we need some way to send them back out, and select helps us out here. While we usually use select to read from multiple channels at the same time, you can also use select to write to channels. The cases with channel writes are handled just like the cases with channel reads; if a write succeeds (the channel that is written to is read by another goroutine), the code in the case clause is executed. So the first pass at getting writing working looks like:

We can just write the first value in inQueue to the out channel, and when it succeeds, we pull that value off of the head of the queue and loop again. Let’s run our test and see what happens:

Oops. We get an error when we try to read the first value from inQueue, because inQueue starts off empty. This isn’t just a startup problem. If the reader is faster than the writer, the queue will empty again and we will have the same problem. How do we solve it?

Like many computer science questions, the solution is another layer of indirection. Rather than read inQueue directly, we can wrap it in a function call that checks if the queue is empty before trying to pull off the first value. Closures in Go makes it easy to inline the function:

Let’s see how this works:

Huh. It looks like writing out that nil value is a bad idea. It was picked up by our reading goroutine and when it tried to convert to an int, the test panicked. We don’t want to write nil to the out channel whenever there is nothing in the queue; we don’t want to write at all. Is there a way to block writing to the out channel when there’s nothing to send?

We’re going to combine two tricks, nil channels and wrapping with a function. If you try to write to a nil channel, the write attempt will block forever. Normally, this would seem to be bad behavior but there’s one place where this is okay — inside of a select statement. When there’s no data to send, we should write to a nil channel, so that the case blocks and doesn’t send anything.

In order to get different behavior depending on the length of the queue, we need a layer of indirection around the channel; this sounds like another function. As a bonus, our code gets a little simpler. Since we’re no longer potentially writing when inQueue is empty, we no longer need an if statement wrapping the removal of the head from the queue.

This produces the code:

Let’s see how this works:

Uh-oh. Now we have a different problem. We can read and write, but we shut down too soon because the writer was faster than the reader. How do we get all of the values?

Draining the Queue

Clearly, there’s a bug in this code. Any time the writer is faster than the reader, there will be multiple values in inQueue waiting to be read. If we exit the for loop without waiting for the queue to be emptied, those queued values will never get written out. We need to keep looping until two things are true: the channel is closed and the queue is empty.

Most of this code is straightforward, and we are able to get rid of the label on the for statement, but the channel handling and the condition on the for statement might require some explanation. In particular, what’s going on with the nil?

Just like writing to a nil channel blocks forever, reading from a nil channel also blocks forever. By setting in to nil, the select statement will never try to read from in again. This is good, because we only set in to nil when the channel is closed, and a closed channel always returns a value. Reading from a closed channel inside of a for-select loop will cause the channel to be read over and over; this will severely impact performance and CPU usage.

Given this, the for condition makes sense. We are going to stay in the for loop as long as we can potentially still read a value (in != nil) or there are values still queued up (len(inQueue) > 0). If the queue is empty and the channel is closed, we are done, so close the out channel and exit the goroutine.

This code finally passes our tests:

Huh?

As it turns out, if you want a channel that never blocks on writes, buffered channels don’t figure into the implementation. So what are they good for? Why was this feature added to Go? I’ll cover the uses of buffered channels in the next blog post in this series.


Show your support

Clapping shows how much you appreciated Jon Bodner’s story.