Introducing vice: Go channels across many machines

Concurrency is a great way to get more stuff done faster. Go channels are perfect for enabling multiple concurrent goroutines to safely communicate within a single process, but if we want to let multiple machines/nodes communicate in a similar way, we have to write completely different code and integrate with messaging queues, gRPC, or something else. Until now.

Vice abstracts message queue technologies and gives you plain old Go channels through which you can communicate instead.

Specifically, you can:

  • Use Go channels transparently over a messaging queue technology of your choice (Currently NATS, Redis, NSQ, or Amazon SQS)
  • Swap vice.Transport types to change underlying queue technologies transparently
  • It’s just Go code: write idiomatic Go code instead of learning queue specific APIs
  • Easy to test: write independent unit tests with no dependencies
  • Develop against an in-memory implementation before putting it into the wild
At GrayMeta, we’ve been using Vice successfully for a couple of years; so it’s pretty well battle tested in production.

The remainder of this article will explain the basics of Vice, and show some examples of how to build real services.

Writing a simple service

Let’s write my favourite kind of service; your friendly neighbourhood greeter.

A greeter takes a name, turns that into a greeting (by prepending "Hello "), and returns the new string.

Our Greeter is just going to be a simple Go function. It will take a context.Context (to let people stop it), a receive-only names channel, a send-only greetings channel, and a channel for errors.

If you’ve never used context to cancel things before, then check out Using contexts to avoid leaking goroutines.

Using a select block, we will wait for someone to send a name through the names channel, at which point we’ll create a greeting and send it down the greetings channel.

func Greeter(ctx context.Context, names <-chan []byte,
greetings chan<- []byte, errs <-chan error) {
    for {
select {
case <-ctx.Done():
log.Println("finished")
return
        case err := <-errs:
log.Println("an error occurred:", err)
        case name := <-names:
greeting := "Hello " + string(name)
greetings <- []byte(greeting)
}
  }
}

The channels all communicate slices of bytes []byte, which means we can deal with plain strings for simple cases, or use one of the many encoding options (like JSON or Gob) to transmit more complex data.

So far, this is all idiomatic Go code, and that is particularly exciting when we look at the unit test for our Greeter service/function.

Use normal Go channels in tests

Here is a unit test for our Greeter function:

func TestGreeter(t *testing.T) {

// setup a context that will stop the Greeter once this
// test finishes
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// create test channels (no need for a real queue)
names := make(chan []byte)
greetings := make(chan []byte)
errs := make(chan error)

// run the greeter
go Greeter(ctx, names, greetings, errs)

// send the name and get the greeting
names <- []byte("Vice")
greeting := <-greetings

if string(greeting) != "Hello Vice" {
t.Error("unexpected message")
}
}

To test our Greeter function, we create the channels it requires and kick it off with the go keyword, so that it runs in the background. We defer the cancellation of the context so that the greeter will stop when our test function is finished with it.

Then we send the string "Vice" through the names channel, and read the greeting from the greetings channel, which we then check and fail if it’s wrong. (We’re converting to and from []byte and string types here, but nothing more complicated than that).

This test is a perfect unit test, it requires no external dependencies and contains everything it needs to run.

Hmm, but this can block forever…

One addition we might decide to include is to cover the situation where the Greeter doesn’t do what it’s meant to to do. If it never sends the greeting down the greetings channel, our test code will block forever (or until go test decides to give up for us).

Since this is all just normal Go code, a select block in our test code is all we need to add a timeout:

select {
case <-time.After(500 * time.Millisecond):
t.Error("timed out waiting for greeting")
case greeting := <-greetings:
if string(greeting) != "Hello Vice" {
t.Error("unexpected message")
}
}

This code will block until one of two things happens; either 1/2 second passes and we report an error, or else we receive a greeting. Either way, we don’t have to wait long to know whether our code is working as expected, and we don’t have to worry about our tests taking too long to run.

Adding queues

Now we’ve got our fully tested Greeter service implemented, it’s time to add the plumbing to turn it into a service that can operate over a messaging queue.

Vice relies on underlying transports (they implement the vice.Transport interface) to do the messy job of interacting with the queues.

See the supported queues in the GitHub repo

Let’s take a quick look at the vice.Transport interface:

// Transport provides message sending and receiving
// capabilities over a messaging queue technology.
type Transport interface {
    // Receive gets a channel on which to receive messages
// with the specified name.
Receive(name string) <-chan []byte
    // Send gets a channel on which messages with the
// specified name may be sent.
Send(name string) chan<- []byte
    // some bits omitted for simplicity sake
}

Notice that the Send and Receive methods return channels; these are the channels that we can pass into our services.

All we have to do in our program is:

  1. Setup a context to handle cancellation
  2. Create a Transport through which we want our service to communicate
  3. Create the necessary channels, and call the Greeter function
import (
"github.com/matryer/vice/queues/nsq"
)
func main() {
ctx := context.Background()
ctx, cancel := context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
transport, err := nsq.New()
if err != nil {
log.Fatalln(err)
}
defer func() {
transport.Stop()
<-transport.Done()
}()
names := transport.Receive("names")
greetings := transport.Send("greetings")
Greeter(ctx, names, greetings, transport.ErrChan())

}

We’ve configured our context to last for 1 minute, but it makes more sense to have it be cancelled by catching the termination signal, like when the user presses Ctrl+C.

We create an NSQ transport (NSQ is Bitly’s messaging queue, written entirely in Go) by importing the appropriate package and ask that Transport to create our names and greetings channels.

We use defer to stop the Transport, which is important because some clients register and de-register their interest in the queue and we need to make sure everything is cleaned up. After calling Stop, we wait for the transport to fully stop by waiting for the Done() channel to close. This helps ensure the clean-up has finished before we exit our program.

To learn more about signal channels, read Starting and stopping things with a signal channel.

We then pass those channels — along with the transport error channel — into the Greeter function.

And that’s it. Now, our program will listen on the NSQ topic called “names”, and send responses to the “greetings” topic.

Each transport implementation has queue specific configuration or customisation that you can do, the details of which are in the documentation for that transport.

Trade-offs

Vice offers a very simple abstraction, but it’s worth understanding the trade-offs and compromises.

Channels blocking and closing?

If a Go channel has no buffer, it will block until someone else reads from it. We use this nature as part of the design of our programs, but the same doesn’t apply when sending those messages over message queues. When a send down a Vice channel unblocks, it doesn’t necessarily mean the message has been delivered like it does with in-process Go channels.

As for closing, the channels will close when the transport is stopped, so you will know when things have shut down.

Low-latency use cases

If you need ultra quick low-latency messages (like stock trading, or gameplay, etc.), then you’ll probably want to build something more low level and work directly with a specific queue technology. You’ll lose the benefits of Vice, but it might be a trade-off worth making.

What about more complicated use cases?

When you start to dig into your needs for a messaging queue, you realise you have a few things to consider, such as:

  • Are messages guaranteed to be delivered?
  • Can messages be delivered more than once?
  • What happens is delivery fails, and I need to retry?

David Hernandez (and actually the whole Go project) is a big advocate of sensible defaults, and this was taken to heart when Vice was designed. So by default, it just works, if you need to tweak the behaviour you can.

When you create the transport, you have the option of configuring and tuning it to suit your needs. For example, NSQ has the concept of connecting to consumers, so the nsq.Transport type exposes the ConnectConsumer function, which ships with a sensible default but you can override if you want to configure it.

Not every queue technology has the same concepts, so it makes no sense for those concepts to make it into the abstraction.

Conclusion

Vice lets you write services using only idiomatic Go channels, and later plug them into highly scalable messaging queue technologies achieving massive scale. It has a good selection of queue technologies to choose from, so you can get started today, but if your favourite is missing, you can always be a hero and add it.

Speaking of which, there’s one more feature of this project that really comes in handy when it comes to contributing…

A side note; Interface tests

One nice feature about this codebase is its use of interface tests. An interface test is test code (usually a single function) that takes an interface like vice.Transport and uses it in various ways asserting that it behaves as expected.

The test code knows nothing about the details of the implementation; it doesn’t care. It just cares that the interface methods and returned types do what they’re supposed to.

In order for any transport to be considered good, it must pass all the tests.

So if you fancy writing a Vice implementation for your favourite messaging queue technology, provided you add the following test (and it passes), it will likely be accepted.

import (
"testing"
"github.com/matryer/vice/test"
)
// TestTransport runs the Vice interface tests for this
// implementation.
func TestTransport(t *testing.T) {
transport := New()
test.Transport(t, transport)
}

What next?