Testing gRPC methods

John Doak
6 min readApr 28, 2023

I was recently asked by a colleague on the best way to test a gRPC method for streaming. They were trying to find a simple way to test their code without spinning up a gRPC server.

gRPC testing is similar to testing the net/http packages HandleFunc and Handler types. You can use net/http/httptest , but most of the time there is no need. We don’t want to test gRPC or an HTTP server itself, we simply want to test our method’s logic.

The simple answer to this question is to de-couple gRPC’s work from the actual work.

Let’s pretend we need to receive a request and then stream back a response. This might look something like this in proto3:

service Authority {
// ProcessExporter instructs the Processor to process some data from an exporter and
// stream the results back.
rpc Servers(ServersReq) returns (stream ServerMsg) {}
}

message ServersReq {
string name_filter_re = 1;
repeated string datacenter_filter = 2;
}

message ServerMsg {
string name = 1;
}

The above code represents a simple authoritative data store for server information. You can query with a ServerReq that optionally takes a name filter (an RE2 regular expression) and a filter for data centers the servers are in.

The Servers() call returns a stream of server information held in the ServerMsg type.

In gRPC for Go, this would require you to implement this:

type AuthorityServer interface {
Servers(*ServersReq, Authority_ServersServer) error
mustEmbedUnimplementedServersServer()
}

This is autogenerated by the protobuf compiler. If you are using Google’s proto compiler, I suggest switching to the http://buf.build compiler, which is much easier to use (if you are in a single repo for all protocol buffer definitions. If not you do need their pay product, BSR).

For simplicity, this example will have all our server data in a file we access instead of the more common use case of a database.

The Naive way

The naive way to implement this is to do it in the gRPC implementation directly:

// Authority implements the gRPC AuthorityServer interface.
type Authority struct {
dataFilePath string

pb.UnimplementedAuthorityServer // required by gRPC
}

type serversRec struct {
Name string
Datacenter string
}

// Servers implements AuthorityServer.Servers .
func (a *Authority) Servers(req *pb.ServersReq, stream pb.Authority_ServersServer) error {
f, err := os.Open(a.dataFilePath)
if err != nil {
return err
}
defer f.Close()

// Compile our name filter if it was given.
var nameFilter *regexp.Regexp
if req.NameFilterRe != "" {
var err error
nameFilter, err = regexp.Compile(req.NameFilterRe)
if err != nil {
return status.Errorf(codes.InvalidArgument, "name_filter_re did not compile: %s", err)
}
}

// dcs if not nil has keys that are the datacenters we want to filter for.
var dcs map[string]bool
if len(req.DatacenterFilter) > 0 {
dcs = map[string]bool{}
for _, dc := range req.DatacenterFilter {
dcs[dc] = true
}
}

// Stream our entries if they match the various filters in the request.
dec := json.NewDecoder(f)
for {
if stream.Context().Err() != nil {
return status.Errorf(codes.DeadlineExceeded, stream.Context().Err().Error())
}

var r serversRec
if err := dec.Decode(&r); err == io.EOF {
break
} else if err != nil {
return err
}

// If the name filter was defined, reject any entries that don't match.
if nameFilter != nil && !nameFilter.MatchString(r.Name) {
continue
}

// If the DC filter was provided, reject any entries that don't match.
if dcs != nil && !dcs[r.Datacenter] {
continue
}

// If we get here, stream the ServerMsg to the caller.
if err := stream.Send(&pb.ServerMsg{Name: r.Name}); err != nil {
return err
}
}
return nil
}

This methodology is pretty straight forward. Stream all entries from a data file (encoded with JSON entries on disk) and return any entries that match our filters.

Testing this requires either faking pb.Authority_ServersServer or running the gRPC service and using the generated gRPC client. And while those are valid ways to test, it is more bootstrapping than needed and it tests gRPC, which isn’t needed (gRPC already has an entire test suite).

A Simpler Way

Since we don’t want to test gRPC and want to make this easier for ourselves to test, we simply need to divide this into two methods:

func (a *Authority) Servers(req *pb.ServersReq, stream pb.Authority_ServersServer) error {
...
}
func (a *Authority) servers(req *pb.ServersReq) chan StreamMsg[*pb.ServerMsg] {
...
}

Servers() is what is going to handle gRPC streaming. It won’t need tests.

servers() is going to hold the bulk of our code and can be easily tested.

StreamMsg is a new generic type that can hold any data and signal a stream error.

Let’s look at StreamMsg :

type StreamMsg[T any] struct {
Data T
Err error
}

This is a generic holder of streaming information that can be used for any channel. Most channels need a way to signal an error and this is an excellent way to do that using generics.

Here is our simplified Servers() implementation:

func (a *Authority) Servers(req *pb.ServersReq, stream pb.Authority_ServersServer) error {
for msg := range a.servers(stream.Context(), req) {
if msg.Err != nil {
return msg.Err
}
if err := stream.Send(&pb.ServerMsg{Name: msg.Data.Name}); err != nil {
return err
}
}
return nil
}

Now Servers() simply handles error detection and sending on the gRPC stream. This pattern is simple enough that it doesn’t require a test.

Let’s have a look at servers():

func (a *Authority) servers(ctx context.Context, req *pb.ServersReq) <-chan StreamMsg[*pb.ServerMsg] {
ch := make(chan StreamMsg[*pb.ServerMsg], 1)

go func() {
defer close(ch)

f, err := os.Open(a.dataFilePath)
if err != nil {
ch <- StreamMsg[*pb.ServerMsg]{Err: err}
return
}
defer f.Close()

// Compile our name filter if it was given.
var nameFilter *regexp.Regexp
if req.NameFilterRe != "" {
var err error
nameFilter, err = regexp.Compile(req.NameFilterRe)
if err != nil {
ch <- StreamMsg[*pb.ServerMsg]{
Err: status.Errorf(codes.InvalidArgument, "name_filter_re did not compile: %s", err),
}
return
}
}

// dcs if not nil has keys that are the datacenters we want to filter for.
var dcs map[string]bool
if len(req.DatacenterFilter) > 0 {
dcs = map[string]bool{}
for _, dc := range req.DatacenterFilter {
dcs[dc] = true
}
}

// Stream our entries if they match the various filters in the request.
dec := json.NewDecoder(f)
for {
if ctx.Err() != nil {
ch <- StreamMsg[*pb.ServerMsg]{Err: ctx.Err()}
return
}

var r serversRec
if err := dec.Decode(&r); err == io.EOF {
return
} else if err != nil {
ch <- StreamMsg[*pb.ServerMsg]{Err: err}
return
}

// If the name filter was defined, reject any entries that don't match.
if nameFilter != nil && !nameFilter.MatchString(r.Name) {
continue
}

// If the DC filter was provided, reject any entries that don't match.
if dcs != nil && !dcs[r.Datacenter] {
continue
}

select {
case <-ctx.Done():
ch <- StreamMsg[*pb.ServerMsg]{Err: ctx.Err()}
return
case ch <- StreamMsg[*pb.ServerMsg]{Data: &pb.ServerMsg{Name: r.Name}}:
}
}
}()
return ch
}

This model allows testing without using gRPC directly. Calling this method allows reading from the returned channel until you encounter an error or the channel is closed. If an error is found, the stream will be closed.

Simply switching out the dataFilePath allows using test data and calling servers() allows you to avoid spinning up a service and connecting to it with a client.

This methodology of work division can be used to test code that handles http.HandleFunc or http.Handler types as well, avoiding the necessity to spin up an HTTP server or use the httptest package.

If you’d like to see this code in action, you can see it here: code path.

Note: This package differs from our example in that I have divided the code into smaller methods. It adds a little more text to allow for better maintainability.

Until next time!

Shameless plug

If you like articles like this and want to know how to leverage Go for DevOps, checkout my book:

--

--

John Doak

Principal SWE@Microsoft, Ex-Google, Ex-Lucasfilm, Author of “Go For DevOps”, Photographer and general trouble maker