gRPC Interceptors in Python

Shay Yacobinski
Taranis Tech
Published in
6 min readAug 18, 2024

A Quick Introduction (which you can skip)

Like many other startups, as we’ve grown in scale we needed to change tactics and move from a monolith to a microservices architecture. We’ve chosen gRPC as the leading protocol since we liked its high performance, strongly typed contracts, support of multiple languages, support of async, and bi-directional streaming (which we eventually didn’t use because of performance issues specific to gRPC in Python… But more on that in another time).

With Python as our primary coding language, we’ve worked on formalizing the internal communication between the different gRPC services and needed a way to implement behaviors that will apply to all RPC methods — that’s where we introduced gRPC interceptors to our microservices ecosystem.

In this article, we’ll go over what gRPC interceptors are and how to implement them both as sync and async implementations using a simple example. By the end of the article, you’ll also get a real-life example of a monitoring interceptor that you can use and view its code.

What are Interceptors?

You may already be familiar with the concept of middleware — adding a “bridge” of custom functionality between two points.

gRPC Interceptors are just that but for gRPC — they intercept the requests and responses on the client and server and allow adding additional behaviors and manipulations. You can configure multiple interceptors and they will be executed in order. Common usages for interceptors are authentication, authorization, logging, error handling, and metrics.

As gRPC has 2 implementations, sync and async (using asyncio), interceptors have 2 different implementations for sync and async clients and servers.

Implementing Interceptors In Python

There are 2 types of interceptors — channel and server. They differ in where they’re implemented:

  1. Channel interceptors are on the client side and intercept each call made by the client.
  2. Server interceptors are on the server side and intercept each call the server receives.

Here’s a rough diagram of where the channel and server interceptors are placed in the path of requests and responses from client to server:

Now let’s see how to implement them!

We’ll implement a simple retry interceptor that waits exponentially on gRPC errors and stops after a given count. We’ll use the following code to manage the waiting:

import asyncio
import grpc
import time


class WaitExponential:
def __init__(self, multiplier, min, max):
self.multiplier = multiplier
self.min = min
self.max = max

def calculate_exponential_waiting_time(self, attempt: int) -> int:
exp = 2 ** attempt
result = self.multiplier * exp

return max(0, self.min, min(result, self.max))

def wait(self, seconds: int):
time.sleep(seconds)

async def wait_async(self, seconds: int):
await asyncio.sleep(seconds)


class InterceptorRetry:
def __init__(self, wait: WaitExponential, stop: int):
self.wait = wait
self.stop = stop

Class WaitExponential's calculate_exponential_waiting_time returns how long we need to wait depending on the attempt count, and wait does the actual waiting, while wait_async does the same for async implementations. Class InterceptorRetry will help us with the interceptor initialization.

Channel Interceptor

Here’s the complete code for the interceptor. Don’t worry, we’ll go over what it does in a bit:

import grpc
import InterceptorRetry


class RetryOnErrorClientInterceptor(grpc.UnaryUnaryClientInterceptor,
grpc.StreamUnaryClientInterceptor,
grpc.UnaryStreamClientInterceptor,
grpc.StreamStreamClientInterceptor):
def __init__(self, interceptor_retry: InterceptorRetry):
self.waiting = interceptor_retry.wait
self.stop = interceptor_retry.stop

def _intercept_call(self, continuation, client_call_details, request_or_iterator):
response = continuation(client_call_details, request_or_iterator)

for retry_attempt in range(0, self.stop - 1):
if isinstance(response, Exception):
waiting_time = self.waiting.calculate_exponential_waiting_time(
retry_attempt)

self.waiting.wait(seconds=waiting_time)

response = continuation(client_call_details,
request_or_iterator)

return response

def intercept_unary_unary(self, continuation, client_call_details, request):
return self._intercept_call(continuation, client_call_details, request)

def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
return self._intercept_call(continuation, client_call_details, request_iterator)

def intercept_unary_stream(self, continuation, client_call_details, request):
return self._intercept_call(continuation, client_call_details, request)

def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
return self._intercept_call(continuation, client_call_details, request_iterator)

With channel interceptors, we need to explicitly intercept the types of calls we want out of the possible options: unary-unary, stream-unary, unary-stream, and stream-stream.

We want our interceptor to intercept all possible calls, so RetryOnErrorClientInterceptor implements all four gRPC client interceptor interfaces: UnaryUnaryClientInterceptor, StreamUnaryClientInterceptor, UnaryStreamClientInterceptor, and StreamStreamClientInterceptor. The four interfaces provide us with four interceptor methods to implement: intercept_unary_unary, intercept_stream_unary, intercept_unary_stream, and intercept_stream_stream.

Each interceptor method gets the following arguments:

  • continuation — continues the RPC call and makes the call to the server.
  • client_call_details — holds metadata on the RPC call which can be inspected and manipulated before sending it to the server in continuation.
  • request/request_iterator — the request received from the client, depending if the client's request is unary or streaming.

RetryOnErrorClientInterceptor also gets retry configurations of type InterceptorRetry in interceptor_retry when it’s initialized.

In each interceptor method, we call _intercept_call which holds our implementation for intercepting the call. First, it makes the server call with continuation and gets a response, then, wrapped in a loop in the range of stop count from interceptor_retry, it checks if the response is an exception, in which case it calculates the waiting time depending on the attempt and tries the call again. The response is returned from the interceptor either when the call is successful or when all attempts fail.

Intercepting the channel:

interceptor_retry = InterceptorRetry(wait=WaitExponential(multiplier=0.5, min=0.5, max=3),
stop=5)
interceptor = RetryOnRpcErrorClientInterceptor(interceptor_retry)

channel = grpc.insecure_channel('localhost:50051')
intercept_channel = grpc.intercept_channel(channel, interceptor)

We’re initializing RetryOnRPCErrorClientInterceptor with InterceptorRetry, and after we get the gRPC channel we intercept it with our interceptor by calling intercept_channel.

The async interceptor version:

import grpc
from grpc import aio
import InterceptorRetry


class RetryOnErrorClientInterceptorAsync(aio.UnaryUnaryClientInterceptor,
aio.StreamUnaryClientInterceptor,
aio.UnaryStreamClientInterceptor,
aio.StreamStreamClientInterceptor):
def __init__(self, interceptor_retry: InterceptorRetry):
self.waiting = interceptor_retry.wait
self.stop = interceptor_retry.stop

async def _intercept_call(self, continuation, client_call_details, request_or_iterator):
response = await continuation(client_call_details, request_or_iterator)

for retry_attempt in range(self.stop):
try:
await response.wait_for_connection()
return response
except grpc.RpcError as error:
waiting_time = self.waiting.calculate_exponential_waiting_time(
retry_attempt)

await self.waiting.wait_async(seconds=waiting_time)

response = await continuation(client_call_details, request_or_iterator)

return response

def intercept_unary_unary(self, continuation, client_call_details, request):
return self._intercept_call(continuation, client_call_details, request)

def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
return self._intercept_call(continuation, client_call_details, request_iterator)

def intercept_unary_stream(self, continuation, client_call_details, request):
return self._intercept_call(continuation, client_call_details, request)

def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
return self._intercept_call(continuation, client_call_details, request_iterator)

After RetryOnErrorClientInterceptor this should look familiar. The concept is the same, RetryOnErrorClientInterceptorAsync implements four gRPC client interceptor interfaces, only this time we’re importing them from grpc.aio.

_intercept_call is also slightly different. We now need to await for the response from continuation, and we call response.wait_for_connection() which ensures the RPC has been successfully connected, and if not, it raises an exception that we can catch with except instead of checking the response type. We also wait asynchronously with InterceptorRetry’s self.waiting.wait_async.

The async version of intercepting the channel:

channel = grpc.aio.insecure_channel('localhost:50051', interceptors=[interceptor])

The initialization of interceptor is the same as the unary interceptor. The difference is in creating the channel — in an async channel, we can configure the interceptors when creating the channel with grpc.aio.insecure_channel.

Server Interceptor

The server interceptor implementation is more straightforward. Our interceptor needs to implement the interface ServerInterceptor and implement the method intercept_service to intercept the call. Calling continuation with handler_call_details will make the RPC server call. You already know how to wrap it with the retry code so I’ll only show you the important bits:

import grpc
from grpc import ServerInterceptor

class RetryOnErrorServerInterceptor(ServerInterceptor):
def __init__(self, interceptor_retry: InterceptorRetry):
...

async def intercept_service(self, continuation, handler_call_details):
# Retrying stuff...
response = continuation(handler_call_details)
# Retrying stuff...

return response

Intercepting the server:

interceptor_retry = InterceptorRetry(wait=WaitExponential(multiplier=0.5, min=0.5, max=3),
stop=5)
interceptor = RetryOnRpcErrorClientInterceptor(interceptor_retry)

server = grpc.server(futures.ThreadPoolExecutor(),interceptors=(interceptor))

Async server interceptor:

In async we’ll get ServerInterceptor from grpc.aio and do:

response = await continuation(handler_call_details)

Intercepting an async server:

interceptor_retry = InterceptorRetry(wait=WaitExponential(multiplier=0.5, min=0.5, max=3),
stop=5)
interceptor = RetryOnRpcErrorClientInterceptor(interceptor_retry)

server = grpc.aio.server(interceptors=[interceptor])

Real-World Example

While simple examples are great for learning, it’s always nice to have a real-world example where you can see production-ready code. Well, we got you covered! We’ve created an interceptor for monitoring Python async gRPC services using Prometheus which you can view here: https://github.com/taranisag/py-async-grpc-prometheus, and we’ve also made an article on how to use the interceptor in your service.

We’ve seen how to implement channel and server interceptors, both sync and async, with a simple retry example, and we’ve also seen a real-world use case of an elaborate interceptor for monitoring gRPC services which you can view and use.

Thank you for reading and I hope you found this article useful in your gRPC endeavors!

--

--

Shay Yacobinski
Taranis Tech

Father, Husband, Coder, Occasional Writer of Medium articles. Backend Architect @ Taranis