Project Reactor + MDC logging
This post provides technique for propagating MDC context over reactive pipeline of Mono
/Flux
handlers with small impact on performance and code readability. Suggested solution is implemented using Kotlin (as everything should be), but it can be adapted for Java with some readability loss.
For the moment of writing there are two solution described:
1. https://simonbasle.github.io/2018/02/contextual-logging-with-reactor-context-and-mdc/ which looks ugly and not very useful;
2. https://www.novatec-gmbh.de/en/blog/how-can-the-mdc-context-be-used-in-the-reactive-spring-applications/ which instruments every onNext
call and must be very expensive.
The problem
Slf4j provides MDC (Mapped Diagnostic Contexts) feature that allows to put arbitrary Map<String, String>
to some thread-local storage and it will be attached to every log message application prints. It can be used to log context-specific data like session-ids, OpenTracing ids, and other meta-information that is too meta- to be passed around as an argument.
In classical thread-per-request paradigm it works nice, all you need to do is fill the MDC context somewhere in a servlet interceptor and write logs as usual. But when it comes to Reactor, processing may be rescheduled between different threads, so thread-local storages are forbidden.
Reactor provides its own Context
abstraction that allows to reliably propagate context over a pipeline of reactive operators. Context
is being propagated from subscriber to publisher, i.e. bottom-up over a pipeline (more information in reference guide).
The solution
Let’s say we have some WebFlux controller that proxies call to some other microservice and stores response to Redis before returning it to caller:
We want to attach caller’s traceId
to all messages our controller logs. First of all, we need to put this traceId
in the context.
Next, we need to extract traceId
from Reactor’s context and put it to MDC context. There are several Reactor operators that provide opportunity to use a context
subscriberContext()
. It returnsMono<Context>
;handle { element: T, sink: SynchronousSink<T> -> ... }
. It can be used as more flexible replacement ofmap
andfilter
, but most importantly,sink
contains current context;doOnEach { signal: Signal<T> }
. It works similarly todoOnNext
, but uses signals. Signal represents single event from upstreamPublisher
’s side, it can be eitheronNext
,onError
,onComplete
oronSubscribe
.onNext
signals contain elements,onError
signals containThrowable
, and all of them containContext
;materialize()
/dematerialize()
. These methods convertFlux
(orMono
) of elements toFlux
(orMono
) ofSignal
s and back.
Using them we can implement custom Flux and Mono extensions (here withLoggingContext
is a standard Kotlin way of filling MDC context):
Extensions for Flux
are equivalent.
Now we can rewrite our controller’s method by simply adding Logging
to each handler:
Logging versions for other operators can be implemented using same reactive handlers.