Adding interceptor into Golang gRPC server
Previously, we discussed how to build a Golang gRPC server, which you can read about in the following link: Step-by-Step Making a Golang gRPC Server in 5 Minutes.
Now, let’s talk about interceptors in gRPC and explore how to use them. An interceptor in gRPC acts as a middleware component, intercepting and potentially modifying the stream of requests and responses between a gRPC client and server. These interceptors operate in a specific order, forming a chain. They are applicable to both the client and server sides of a gRPC connection.”
Here are some use cases interceptors are used for:
- Logging and Monitoring: Capture and log information about incoming requests and outgoing responses.
- Authentication and Authorization: Enforce security policies by verifying user identity and checking permissions.
- Tracing: Add trace information to requests for monitoring and profiling purposes.
- Timeouts and Deadlines: Enforce deadlines and timeouts for requests.
- Rate Limiter: Control the rate at which requests are allowed to be processed, preventing abuse and ensuring fair resource allocation.
- Panic Handler: Capture and handle panics that may occur during the execution of gRPC methods.
Now in this article, we are going to add logging, authentication, and panic handler interceptors to our gRPC server using go-grpc-middleware.
To proceed with the code from the previous articles, use the go get
command to download and install the required libraries. Once the libraries are successfully downloaded, navigate to the cmd
folder and edit the main.go
file.
go get github.com/go-kit/log@latest
go get go.opentelemetry.io/otel@latest
go get github.com/grpc-ecosystem/go-grpc-middleware/v2@latest
cd cmd
Authentication Interceptor
To begin, create an authenticator function. This function is responsible for reading a token or secret, depending on the authentication method used, from the Metadata. In this case, we use Bearer authentication. If the bearer token is invalid, return the ‘Unauthenticated’ error; otherwise, return the context with any additional data as needed and a nil error.
func authenticator(ctx context.Context) (context.Context, error) {
token, err := auth.AuthFromMD(ctx, "bearer")
if err != nil {
return nil, err
}
// TODO: This is example only, perform proper Oauth/OIDC verification!
if token != "yolo" {
return nil, status.Error(codes.Unauthenticated, "invalid auth token")
}
// NOTE: You can also pass the token in the context for further interceptors or gRPC service code.
return ctx, nil
}
Second, create ‘authMatcher.’ This function is used to determine which services or methods need authentication.
func authMatcher(ctx context.Context, callMeta interceptors.CallMeta) bool {
return healthpb.Health_ServiceDesc.ServiceName != callMeta.Service
}
Next, add ChainUnaryInterceptor
, which includes selector.UnaryServerInterceptor
. This selector
interceptor allows us to set check rules to allowlist or blocklist middleware such as Auth interceptors to toggle behavior on or off based on the request path. Then, use the authenticator
function as an argument for the auth.UnaryServerInterceptor
function.
s := grpc.NewServer(grpc.ChainUnaryInterceptor(
// Order matters e.g. tracing interceptor have to create span first for the later exemplars to work.
selector.UnaryServerInterceptor(
auth.UnaryServerInterceptor(authenticator),
selector.MatchFunc(authMatcher),
),
))
Logging Interceptor
Let’s create the interceptorLogger
function, a utility designed to bridge the gap between gRPC's logging system and the underlying logger typically provided by your application. It takes log.Logger
instance as an argument, representing the existing logger, and returns a gRPC-compatible logging.Logger
.
func interceptorLogger(l log.Logger) logging.Logger {
return logging.LoggerFunc(func(_ context.Context, lvl logging.Level, msg string, fields ...any) {
largs := append([]any{"msg", msg}, fields...)
switch lvl {
case logging.LevelDebug:
_ = level.Debug(l).Log(largs...)
case logging.LevelInfo:
_ = level.Info(l).Log(largs...)
case logging.LevelWarn:
_ = level.Warn(l).Log(largs...)
case logging.LevelError:
_ = level.Error(l).Log(largs...)
default:
panic(fmt.Sprintf("unknown level %v", lvl))
}
})
}
Now, let’s create a utility function named generateLogFields
that generates log fields based on tracing information from the context. This function utilizes the OpenTelemetry's trace.SpanContext
to extract the Trace ID, and if the span is sampled, it returns a map containing the Trace ID. These log fields can be added to log entries, providing valuable tracing information.
func generateLogFields(ctx context.Context) logging.Fields {
if span := trace.SpanContextFromContext(ctx); span.IsSampled() {
return logging.Fields{"traceID", span.TraceID().String()}
}
return nil
}
Next, we create a new variable rpcLogger
as the log.Logger
that will be used later.
var (
rpcLogger log.Logger
)
Now, initialize the rpcLogger
and add logging.UnaryServerInterceptor
to the ChainUnaryInterceptor
. Use our interceptorLogging
function as the handler to intercept the log, using rpcLogger
as the argument to be the logger that records various events. Use the generateLogFields
function to generate log fields like traceId
. Finally, use WithLogOnEvents
to add more events to be logged. By default, only StartCall
and FinishCall
events will be logged, but in this example, we also add PayloadReceived
and PayloadSent
events. Here is the complete code for our changes:
logger := log.NewLogfmtLogger(os.Stderr)
rpcLogger = log.With(logger, "service", "gRPC/server", "component", "grpc-example")
s := grpc.NewServer(grpc.ChainUnaryInterceptor(
// Order matters e.g. tracing interceptor have to create span first for the later exemplars to work.
logging.UnaryServerInterceptor(
interceptorLogger(rpcLogger),
logging.WithFieldsFromContext(generateLogFields),
logging.WithLogOnEvents(
logging.StartCall,
logging.PayloadReceived,
logging.PayloadSent,
logging.FinishCall,
),
),
selector.UnaryServerInterceptor(
auth.UnaryServerInterceptor(authenticator),
selector.MatchFunc(authMatcher),
),
))
Panic Handler Interceptor
For the panic handler, we need to create a single function called recoveryHandler
. This function will be triggered after a panic occurs and is recovered. Inside this function, we can perform various actions using the panic interface, such as logging the message and stack trace. Then, we return an error with the preferred error code and message.
func recoveryHandler(p any) (err error) {
level.Error(rpcLogger).Log("msg", "recovered from panic", "panic", p, "stack", debug.Stack())
return status.Errorf(codes.Internal, "%s", p)
}
Finally, we add recovery.UnaryServerInterceptor
and recovery.WithRecoveryHandler
with recoverHandler
as the function that will be triggered later.
s := grpc.NewServer(grpc.ChainUnaryInterceptor(
// Order matters e.g. tracing interceptor have to create span first for the later exemplars to work.
logging.UnaryServerInterceptor(
interceptorLogger(rpcLogger),
logging.WithFieldsFromContext(generateLogFields),
logging.WithLogOnEvents(
logging.StartCall,
logging.PayloadReceived,
logging.PayloadSent,
logging.FinishCall,
),
),
selector.UnaryServerInterceptor(
auth.UnaryServerInterceptor(authenticator),
selector.MatchFunc(authMatcher),
),
recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(recoveryHandler)),
))
Below is the complete code for main.go
var (
rpcLogger log.Logger
)
func main() {
logger := log.NewLogfmtLogger(os.Stderr)
rpcLogger = log.With(logger, "service", "gRPC/server", "component", "grpc-example")
s := grpc.NewServer(grpc.ChainUnaryInterceptor(
// Order matters e.g. tracing interceptor have to create span first for the later exemplars to work.
logging.UnaryServerInterceptor(
interceptorLogger(rpcLogger),
logging.WithFieldsFromContext(generateLogFields),
logging.WithLogOnEvents(
logging.StartCall,
logging.PayloadReceived,
logging.PayloadSent,
logging.FinishCall,
),
),
selector.UnaryServerInterceptor(
auth.UnaryServerInterceptor(authenticator),
selector.MatchFunc(authMatcher),
),
recovery.UnaryServerInterceptor(recovery.WithRecoveryHandler(recoveryHandler)),
))
pb.RegisterEmployeeServer(s, &employee.Server{})
reflection.Register(s)
port := "1531"
lis, err := net.Listen("tcp", ":"+port)
if err != nil {
level.Error(logger).Log("failed to listen: %v", err)
}
level.Info(logger).Log("server listening at %v", lis.Addr())
if err := s.Serve(lis); err != nil {
level.Error(logger).Log("failed to serve: %v", err)
}
}
func authMatcher(ctx context.Context, callMeta interceptors.CallMeta) bool {
return healthpb.Health_ServiceDesc.ServiceName != callMeta.Service
}
func authenticator(ctx context.Context) (context.Context, error) {
token, err := auth.AuthFromMD(ctx, "bearer")
if err != nil {
return nil, err
}
// TODO: This is example only, perform proper Oauth/OIDC verification!
if token != "yolo" {
return nil, status.Error(codes.Unauthenticated, "invalid auth token")
}
// NOTE: You can also pass the token in the context for further interceptors or gRPC service code.
return ctx, nil
}
func interceptorLogger(l log.Logger) logging.Logger {
return logging.LoggerFunc(func(_ context.Context, lvl logging.Level, msg string, fields ...any) {
largs := append([]any{"msg", msg}, fields...)
switch lvl {
case logging.LevelDebug:
_ = level.Debug(l).Log(largs...)
case logging.LevelInfo:
_ = level.Info(l).Log(largs...)
case logging.LevelWarn:
_ = level.Warn(l).Log(largs...)
case logging.LevelError:
_ = level.Error(l).Log(largs...)
default:
panic(fmt.Sprintf("unknown level %v", lvl))
}
})
}
func generateLogFields(ctx context.Context) logging.Fields {
if span := trace.SpanContextFromContext(ctx); span.IsSampled() {
return logging.Fields{"traceID", span.TraceID().String()}
}
return nil
}
func recoveryHandler(p any) (err error) {
level.Error(rpcLogger).Log("msg", "recovered from panic", "panic", p, "stack", debug.Stack())
return status.Errorf(codes.Internal, "%s", p)
}
Now, after completing the interceptors, we need to update the existing GetById
method so that we can test the interceptors. Navigate to the services
folder and edit the service.go
file.
cd ../services/employee
func (s *Server) GetById(ctx context.Context, req *pb.GetByIdRequest) (*pb.GetByIdResponse, error) {
switch req.Id {
case 1000:
return nil, status.Error(codes.InvalidArgument, "id can't be 1000")
case 0:
panic("id can't be 0")
default:
return &pb.GetByIdResponse{
Id: req.Id,
Name: "John Doe",
}, nil
}
}
As seen in the code above, we’ve changed the logic using a switch case. You can test this method later by inputting any number, but to test our panic handler, we need to use case 0. Run the server and grpcui
with the following commands using two terminals.
Terminal 1:
cd ../..
go run cmd/main.go
Terminal 2:
grpcui --plaintext localhost:1531
Input 0
in the id
field on the grpcui
page that opens in your browser. Then, click "Invoke." Currently, we will receive an unauthenticated error, as shown in the picture below.
This indicates that our auth interceptor is working. The unauthenticated error occurs because we haven’t added our token to the Metadata. To authenticate our request, navigate to the “Request Form” tab. In the “Request Metadata” section, input “authorization” as the name and “Bearer yolo” as the value. The page will look like this:
Click “Invoke.” Now, we won’t get the unauthenticated error, indicating that our request has been successfully authenticated after adding the “authorization” metadata with a valid token. However, we will still receive an internal error with the message “id can’t be 0.” This message originally came from the panic set in case 0 in the service.go
. This signifies that the panic handler worked successfully, allowing the service to continue running and returning an internal error with the panic message.
Next, navigate back to the “Request Form” tab, change the id
to 1, and click "Invoke." Now, you will receive a success response, which is:
{
“id”: “1”,
“name”: “John Doe”
}
Open Terminal 1, which you used to run the gRPC server. You can also check the log stack trace in Terminal 1. The last four lines shown in the terminal are actually the logs printed by the logging interceptor. This indicates that our logging interceptor is also working fine.
level=info service=gRPC/server component=grpc-example msg="started call" protocol=grpc grpc.component=server grpc.service=com.codespade.proto.Employee grpc.method=GetById grpc.method_type=unary peer.address=[::1]:58391 grpc.start_time=2024-01-12T22:11:06+07:00 grpc.time_ms=0.054
level=info service=gRPC/server component=grpc-example msg="request received" protocol=grpc grpc.component=server grpc.service=com.codespade.proto.Employee grpc.method=GetById grpc.method_type=unary peer.address=[::1]:58391 grpc.start_time=2024-01-12T22:11:06+07:00 grpc.recv.duration=54.708µs grpc.request.content=id:1
level=info service=gRPC/server component=grpc-example msg="response sent" protocol=grpc grpc.component=server grpc.service=com.codespade.proto.Employee grpc.method=GetById grpc.method_type=unary peer.address=[::1]:58391 grpc.start_time=2024-01-12T22:11:06+07:00 grpc.send.duration=196.625µs grpc.response.content="id:1 name:\"John Doe\""
level=info service=gRPC/server component=grpc-example msg="finished call" protocol=grpc grpc.component=server grpc.service=com.codespade.proto.Employee grpc.method=GetById grpc.method_type=unary peer.address=[::1]:58391 grpc.start_time=2024-01-12T22:11:06+07:00 grpc.code=OK grpc.time_ms=0.469
Congratulations! We have successfully implemented logging, authentication, and panic handler interceptors into our gRPC server. Feel free to continue experimenting with these interceptors by adding more rules and options. Happy coding~
References:
- https://pkg.go.dev/github.com/grpc-ecosystem/go-grpc-middleware
- https://github.com/grpc-ecosystem/go-grpc-middleware