gRPC Interceptors in Python
A Quick Introduction (which you can skip)
Like many other startups, as we’ve grown in scale we needed to change tactics and move from a monolith to a microservices architecture. We’ve chosen gRPC as the leading protocol since we liked its high performance, strongly typed contracts, support of multiple languages, support of async, and bi-directional streaming (which we eventually didn’t use because of performance issues specific to gRPC in Python… But more on that in another time).
With Python as our primary coding language, we’ve worked on formalizing the internal communication between the different gRPC services and needed a way to implement behaviors that will apply to all RPC methods — that’s where we introduced gRPC interceptors to our microservices ecosystem.
In this article, we’ll go over what gRPC interceptors are and how to implement them both as sync and async implementations using a simple example. By the end of the article, you’ll also get a real-life example of a monitoring interceptor that you can use and view its code.
What are Interceptors?
You may already be familiar with the concept of middleware — adding a “bridge” of custom functionality between two points.
gRPC Interceptors are just that but for gRPC — they intercept the requests and responses on the client and server and allow adding additional behaviors and manipulations. You can configure multiple interceptors and they will be executed in order. Common usages for interceptors are authentication, authorization, logging, error handling, and metrics.
As gRPC has 2 implementations, sync and async (using asyncio
), interceptors have 2 different implementations for sync and async clients and servers.
Implementing Interceptors In Python
There are 2 types of interceptors — channel and server. They differ in where they’re implemented:
- Channel interceptors are on the client side and intercept each call made by the client.
- Server interceptors are on the server side and intercept each call the server receives.
Here’s a rough diagram of where the channel and server interceptors are placed in the path of requests and responses from client to server:
Now let’s see how to implement them!
We’ll implement a simple retry interceptor that waits exponentially on gRPC errors and stops after a given count. We’ll use the following code to manage the waiting:
import asyncio
import grpc
import time
class WaitExponential:
def __init__(self, multiplier, min, max):
self.multiplier = multiplier
self.min = min
self.max = max
def calculate_exponential_waiting_time(self, attempt: int) -> int:
exp = 2 ** attempt
result = self.multiplier * exp
return max(0, self.min, min(result, self.max))
def wait(self, seconds: int):
time.sleep(seconds)
async def wait_async(self, seconds: int):
await asyncio.sleep(seconds)
class InterceptorRetry:
def __init__(self, wait: WaitExponential, stop: int):
self.wait = wait
self.stop = stop
Class WaitExponential
's calculate_exponential_waiting_time
returns how long we need to wait depending on the attempt count, and wait
does the actual waiting, while wait_async
does the same for async implementations. Class InterceptorRetry
will help us with the interceptor initialization.
Channel Interceptor
Here’s the complete code for the interceptor. Don’t worry, we’ll go over what it does in a bit:
import grpc
import InterceptorRetry
class RetryOnErrorClientInterceptor(grpc.UnaryUnaryClientInterceptor,
grpc.StreamUnaryClientInterceptor,
grpc.UnaryStreamClientInterceptor,
grpc.StreamStreamClientInterceptor):
def __init__(self, interceptor_retry: InterceptorRetry):
self.waiting = interceptor_retry.wait
self.stop = interceptor_retry.stop
def _intercept_call(self, continuation, client_call_details, request_or_iterator):
response = continuation(client_call_details, request_or_iterator)
for retry_attempt in range(0, self.stop - 1):
if isinstance(response, Exception):
waiting_time = self.waiting.calculate_exponential_waiting_time(
retry_attempt)
self.waiting.wait(seconds=waiting_time)
response = continuation(client_call_details,
request_or_iterator)
return response
def intercept_unary_unary(self, continuation, client_call_details, request):
return self._intercept_call(continuation, client_call_details, request)
def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
return self._intercept_call(continuation, client_call_details, request_iterator)
def intercept_unary_stream(self, continuation, client_call_details, request):
return self._intercept_call(continuation, client_call_details, request)
def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
return self._intercept_call(continuation, client_call_details, request_iterator)
With channel interceptors, we need to explicitly intercept the types of calls we want out of the possible options: unary-unary, stream-unary, unary-stream, and stream-stream.
We want our interceptor to intercept all possible calls, so RetryOnErrorClientInterceptor
implements all four gRPC client interceptor interfaces: UnaryUnaryClientInterceptor
, StreamUnaryClientInterceptor
, UnaryStreamClientInterceptor
, and StreamStreamClientInterceptor
. The four interfaces provide us with four interceptor methods to implement: intercept_unary_unary
, intercept_stream_unary
, intercept_unary_stream
, and intercept_stream_stream
.
Each interceptor method gets the following arguments:
continuation
— continues the RPC call and makes the call to the server.client_call_details
— holds metadata on the RPC call which can be inspected and manipulated before sending it to the server incontinuation
.request
/request_iterator
— the request received from the client, depending if the client's request is unary or streaming.
RetryOnErrorClientInterceptor
also gets retry configurations of type InterceptorRetry
in interceptor_retry
when it’s initialized.
In each interceptor method, we call _intercept_call
which holds our implementation for intercepting the call. First, it makes the server call with continuation
and gets a response, then, wrapped in a loop in the range of stop
count from interceptor_retry
, it checks if the response is an exception, in which case it calculates the waiting time depending on the attempt and tries the call again. The response is returned from the interceptor either when the call is successful or when all attempts fail.
Intercepting the channel:
interceptor_retry = InterceptorRetry(wait=WaitExponential(multiplier=0.5, min=0.5, max=3),
stop=5)
interceptor = RetryOnRpcErrorClientInterceptor(interceptor_retry)
channel = grpc.insecure_channel('localhost:50051')
intercept_channel = grpc.intercept_channel(channel, interceptor)
We’re initializing RetryOnRPCErrorClientInterceptor
with InterceptorRetry
, and after we get the gRPC channel we intercept it with our interceptor by calling intercept_channel.
The async interceptor version:
import grpc
from grpc import aio
import InterceptorRetry
class RetryOnErrorClientInterceptorAsync(aio.UnaryUnaryClientInterceptor,
aio.StreamUnaryClientInterceptor,
aio.UnaryStreamClientInterceptor,
aio.StreamStreamClientInterceptor):
def __init__(self, interceptor_retry: InterceptorRetry):
self.waiting = interceptor_retry.wait
self.stop = interceptor_retry.stop
async def _intercept_call(self, continuation, client_call_details, request_or_iterator):
response = await continuation(client_call_details, request_or_iterator)
for retry_attempt in range(self.stop):
try:
await response.wait_for_connection()
return response
except grpc.RpcError as error:
waiting_time = self.waiting.calculate_exponential_waiting_time(
retry_attempt)
await self.waiting.wait_async(seconds=waiting_time)
response = await continuation(client_call_details, request_or_iterator)
return response
def intercept_unary_unary(self, continuation, client_call_details, request):
return self._intercept_call(continuation, client_call_details, request)
def intercept_stream_unary(self, continuation, client_call_details, request_iterator):
return self._intercept_call(continuation, client_call_details, request_iterator)
def intercept_unary_stream(self, continuation, client_call_details, request):
return self._intercept_call(continuation, client_call_details, request)
def intercept_stream_stream(self, continuation, client_call_details, request_iterator):
return self._intercept_call(continuation, client_call_details, request_iterator)
After RetryOnErrorClientInterceptor
this should look familiar. The concept is the same, RetryOnErrorClientInterceptorAsync
implements four gRPC client interceptor interfaces, only this time we’re importing them from grpc.aio
.
_intercept_call
is also slightly different. We now need to await
for the response from continuation
, and we call response.wait_for_connection()
which ensures the RPC has been successfully connected, and if not, it raises an exception that we can catch with except
instead of checking the response type. We also wait asynchronously with InterceptorRetry
’s self.waiting.wait_async
.
The async version of intercepting the channel:
channel = grpc.aio.insecure_channel('localhost:50051', interceptors=[interceptor])
The initialization of interceptor
is the same as the unary interceptor. The difference is in creating the channel — in an async channel, we can configure the interceptors when creating the channel with grpc.aio.insecure_channel
.
Server Interceptor
The server interceptor implementation is more straightforward. Our interceptor needs to implement the interface ServerInterceptor
and implement the method intercept_service
to intercept the call. Calling continuation
with handler_call_details
will make the RPC server call. You already know how to wrap it with the retry code so I’ll only show you the important bits:
import grpc
from grpc import ServerInterceptor
class RetryOnErrorServerInterceptor(ServerInterceptor):
def __init__(self, interceptor_retry: InterceptorRetry):
...
async def intercept_service(self, continuation, handler_call_details):
# Retrying stuff...
response = continuation(handler_call_details)
# Retrying stuff...
return response
Intercepting the server:
interceptor_retry = InterceptorRetry(wait=WaitExponential(multiplier=0.5, min=0.5, max=3),
stop=5)
interceptor = RetryOnRpcErrorClientInterceptor(interceptor_retry)
server = grpc.server(futures.ThreadPoolExecutor(),interceptors=(interceptor))
Async server interceptor:
In async we’ll get ServerInterceptor
from grpc.aio
and do:
response = await continuation(handler_call_details)
Intercepting an async server:
interceptor_retry = InterceptorRetry(wait=WaitExponential(multiplier=0.5, min=0.5, max=3),
stop=5)
interceptor = RetryOnRpcErrorClientInterceptor(interceptor_retry)
server = grpc.aio.server(interceptors=[interceptor])
Real-World Example
While simple examples are great for learning, it’s always nice to have a real-world example where you can see production-ready code. Well, we got you covered! We’ve created an interceptor for monitoring Python async gRPC services using Prometheus which you can view here: https://github.com/taranisag/py-async-grpc-prometheus, and we’ve also made an article on how to use the interceptor in your service.
We’ve seen how to implement channel and server interceptors, both sync and async, with a simple retry example, and we’ve also seen a real-world use case of an elaborate interceptor for monitoring gRPC services which you can view and use.
Thank you for reading and I hope you found this article useful in your gRPC endeavors!