Spring WebFlux: WebFilter, Decorator (Kotlin)

Jongho Jeon
Jongho’s Tech Blog
16 min readSep 18, 2021

본 문서에서는 WebFlux에서 삽질을 해보며 사용하게 된 WebFilter와 3가지 Decorator 클래스들을 소개한다. WebFilter는 Request 정보에 접근하고, 이 정보들을 Request Context에 저장하기 위해 활용할 수 있다. 또한 Request Authentication에도 활용할 수 있다. WebFlux를 사용한다면 WebFilter는 필수적으로 사용하게 되기 때문에, 알아보도록 하자.

How to use WebFilter in Spring WebFlux

WebFilter는 Spring WebMVC의 Filter에 대응되는 클래스라고 생각하면 된다. WebFilter는 Spring Web package에 있는 클래스이고, WebFlux를 사용할 때만 동작한다. (왜 Spring WebFlux가 아니라 Web에 있는걸까)

이번 문서에서는 코드 위주로 설명한다 (Kotlin 1.4.x 코드이다)

RequestContextWebFilter

import org.springframework.web.server.ServerWebExchange
import org.springframework.web.server.WebFilter
import org.springframework.web.server.WebFilterChain
import reactor.core.publisher.Mono
class RequestContextWebFilter(
private val requestContext: RequestContext,
private val requestContextModelFactory: RequestContextModelFactory,
private val instantGenerator: DateTimeGenerator,
) : WebFilter {
override fun filter(
exchange: ServerWebExchange,
chain: WebFilterChain
): Mono<Void> {
val request = exchange.request
requestContext.setContext(
requestContextModelFactory.create(
requestId = request.id,
requestHeaders = request.headers.toMap(),
requestMethod = request.method?.name.toString(),
requestPath = request.path.value(),
userId = request.queryParams.getFirst("userId")?.toString() ?: "null",
requestQueryParams = request.queryParams.toSingleValueMap().toMap(),
requestInstant = instantGenerator.now(),
)
)
return chain.filter(exchange)
}
}

위와 같이 WebFilter를 작성할 수 있다. WebFilter는 reactor-http-nio-$N라는 id의 thread에서 실행된다. (N은 랜덤 숫자이다)

WebFlux 코드를 작성할 때 Mono를 return하는 함수에서 Mono chain이 아니라 Mono chain 외부에서 blocking IO 함수를 작성하지 않도록 주의를 해야 한다. 위 코드도 Mono chain 외부에서 함수들을 호출하고 있지만 blocking IO 함수는 없으므로 아직은 괜찮다.

filter 함수를 호출한 thread와 filter 함수가 return한 Mono를 subscribe하는 thread는 다를 수도 있다. local 환경에서는 동일했으나, request가 몰리는 상황에서는 어떨지 추후에 확인해볼 예정이다.

BasicAuthenticationWebFilter

class BasicAuthenticationWebFilter(
private val authenticationService: AuthenticationService,
) : WebFilter {
companion object {
const val allowedAuthScheme = "basic"
private val base64Decoder = Base64.getDecoder()
}
override fun filter(exchange: ServerWebExchange, chain: WebFilterChain): Mono<Void> {
val request = exchange.request
val (authenticationScheme, encodedCredential) = request
.headers.getFirst(RequestHeaderKeys.authorization)
?.split(" ", limit = 2)
?: throw ResponseStatusException(HttpStatus.UNAUTHORIZED, "Authorization header not found.")
if (authenticationScheme.lowercase() != allowedAuthScheme)
throw ResponseStatusException(
HttpStatus.UNAUTHORIZED,
"$authenticationScheme is not allowed. Use $allowedAuthScheme."
)
return chain.filter(exchange)
.doOnSubscribe { authenticationService.authenticate(decodeOrThrow(encodedCredential)) }
}
private fun decodeOrThrow(s: String) =
base64Decoder
.runCatching { decode(s).decodeToString() }
.getOrElse { t ->
if (t is IllegalArgumentException)
throw ResponseStatusException(
HttpStatus.UNAUTHORIZED,
"Invalid credential. base64 decoding failed."
).initCause(t)
else
throw ResponseStatusException(
HttpStatus.INTERNAL_SERVER_ERROR,
"Server error occurred."
).initCause(t)
}
}

RFC7235 (HTTP/1.1 Authentication), RFC7617 (Basic HTTP Authentication Scheme) Spec에 따라 간단히 구현한 Authentication WebFilter이다.

코드는 WebFilter 개념 중점으로 보면 좋을 것 같다. 특별히 설명할 부분은 없다. authenticate() 등이 blocking call일 경우에만 주의하자.

WebFilter Order

이제 2개의 WebFilter가 생겼다. WebFilter를 2개 이상 사용할 경우 순서가 생길 것이다. 이 순서는 때때로 중요하다. (RequestContext→Logging과 같이 불가피하게 순서 의존적일 때)

가장 좋은 방법은 순서 독립적으로 WebFilter를 구현하는 것이고, 대안은 WebFilter의 순서를 지정하는 것이다.

WebFilter를 하나로 뭉쳐서 구현할 수도 있지만, 코드의 재사용성과 유연성이 떨어진다. (business requirement를 빠르고 반영할 수 없다느니, agile하지 않다드니, 이러쿵저러쿵…)

순서는 Spring @Order 어노테이션으로 지정할 수 있다. 이 Order는 하나의 Config 파일에서 확인할 수 있도록 작성하는 것이 좋다.

import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.core.annotation.Order
@Configuration
class WebFilterConfig {
@Order(100)
@Bean
fun requestContextWebFilter() =
RequestContextWebFilter(
requestContext(),
requestContextModelFactory(),
dateTimeGenerator(),
)
@Order(200)
@Bean
fun authenticationWebFilter(authenticationService: AuthenticationService) =
AuthenticationWebFilter(requestContext(), authenticationService)
@Bean
fun requestContext(): ThreadLocalRequestContext =
ThreadLocalRequestContext()
@Bean
fun requestContextModelFactory() =
RequestContextModelFactory()
@Bean
fun dateTimeGenerator() =
DateTimeGenerator()
}

Order는 lower is higher이다. WebFilter는 이 정도만 이해하고 있으면 충분히 활용할 수 있을 것이다.

WebFlux Decorator classes

이번에 많은 삽질을 하면서 3개의 Decorator 클래스들을 알게 되었다.

  • ServerWebExchangeDecorator
  • ServerHttpRequestDecorator
  • ServerHttpResponseDecorator

이 파트는 설명없이 코드로 첨부한다.

ServerWebExchangeDecorator example

class LoggingWebFilter(
private val requestContext: RequestContext,
private val dateTimeGenerator: DateTimeGenerator,
) : WebFilter {
companion object : InsideLoggerProvider()
override fun filter(exchange: ServerWebExchange, chain: WebFilterChain): Mono<Void> =
chain.filter(loggingDecoratedExchange(exchange, request, response))
.doOnError { t ->
val context = requestContext.getContextOrThrow()
.apply { responseTime = dateTimeGenerator.now() }
log.error(t, LoggingType.errorResponseType, context)
}
}
private fun loggingDecoratedExchange(exchange: ServerWebExchange): ServerWebExchange =
object : ServerWebExchangeDecorator(exchange) {
override fun getRequest(): ServerHttpRequest =
LoggingDecoratedRequest(exchange.request, requestContext.getContext())
override fun getResponse(): ServerHttpResponse =
LoggingDecoratedResponse(
exchange.response,
requestContext.getContext(),
dateTimeGenerator,
)
}
}

ServerHttpRequestDecorator

class LoggingDecoratedRequest(
delegate: ServerHttpRequest,
private val contextOrNull: RequestContextModel?,
) : ServerHttpRequestDecorator(delegate) {
override fun getBody(): Flux<DataBuffer> =
super.getBody().doOnNext { dataBuffer ->
val body = DataBufferUtil.readDataBuffer(dataBuffer)
contextOrNull?.requestPayload = body
logRequest()
}
}

ServerHttpResponseDecorator example

class LoggingDecoratedResponse(
delegate: ServerHttpResponse,
private val contextOrNull: RequestContextModel?,
) : ServerHttpResponseDecorator(delegate) {
companion object : InsideLoggerProvider()
override fun writeWith(body: Publisher<out DataBuffer>): Mono<Void> =
super.writeWith(
Mono.from(body).doOnNext { dataBuffer ->
contextOrNull?.responsePayload = DataBufferUtil.readDataBuffer(dataBuffer)
statusCode?.name?.let { contextOrNull?.statusCode = it }
}
)
}

DataBufferUtil

object DataBufferUtil {
fun readDataBuffer(dataBuffer: DataBuffer): String {
val baos = ByteArrayOutputStream()
return Channels.newChannel(baos)
.runCatching { write(dataBuffer.asByteBuffer().asReadOnlyBuffer()) }
.map { baos.toString() }
.onFailure { t -> if (t is IOException) t.printStackTrace() }
.getOrThrow()
// Closing a ByteArrayOutputStream has no effect.
}
}

위 코드로 WebFlux Logging을 완전히 구현할 수 있다는 의미는 아니므로, Decorator들의 example 수준으로만 참고하길 바란다. 특히 request, response body를 읽는 부분은 stackoverflow의 도움을 받았다. 링크를 저장해놓지 않았는데 나중에 찾으면 첨부해놓겠다.

WebFlux Logging Trailer

설명하려고 했던 내용은 모두 작성하였다. Logging을 구현하는 데에 있어서 여러 challenge들이 있었다.

  1. ThreadLocal to ReactorContext/CoroutineContext
  2. Passing the context while thread switching
  3. Various logging cases
  4. implementation ways (Decorators, AOP, Reactor, …)

Logging Cases

Case of API Success: general case

  • 일반적인 상황은 아래 케이스들을 만족하면서 동시에 만족되어야 한다.

Case of exception throwing by a request handler

  • WebFilter에서 Mono chain doOnError로 잡히지 않는다 (분석 예정)

Case of exception throwing by WebFilter

  • Request Decorator로 구현했더니 request body가 read되기 전에 API가 abort되므로 logging이 어려웠다
  • 작성하다보니 생각났는데, abort 시점이 Mono subscribe 전이냐 후냐에 따라서도 상황이 다를 것 같다.
  • Controller method를 pointcut으로 했을 때 catching 할 수 없는 이슈가 있다

Logging을 비교적 쉽게 보고 덤벼들었는데, 구현을 시작하기 전에 이런 case들을 먼저 고려해볼 것을 그랬다.

그리고 이런 문제도 있었다 (body를 읽는 것이 문제이고, Spring WebFlux에 버그가 있던 것도 문제이긴 했다 😂)

Spring WebFlux 에서 coRouter filter를 이용하여 request, response 로깅하기

위 버그는 아래에서 fix되었다

Release v5.3.7 · spring-projects/spring-framework

Fix Kotlin filter parameter bug in Router DSLs · Issue #26838 · spring-projects/spring-framework

Fix parameter bug of the handler that Kotlin DSL defines as a parameter for the filterFunction. by ShindongLee · Pull Request #26857 · spring-projects/spring-framework

또한, Controller method가 blocking function인지, Mono(or Flux) function인지, Kotlin coroutine의 suspending function인지에 따라서도 구현이 달라진다.

이 문제들을 해결한 WebFlux Logging을 위한 best practice를 찾아볼 예정이다.

Conclusion

WebFlux에서 사용하기 위한 WebFilter와 Decorator의 예제 코드들을 살펴보았다. use case로서 request context와 authentication을 살펴보았다. WebFlux Logging을 구현하기 위해 존재하는 challenge도 미리 살펴보았다.

조만간 WebFilter와 AOP를 사용하여 Logging을 구현해볼 예정이다. Context switching은 꽤나 골칫거리가 될 것 같다. 아마 WebFlux를 사용하는 사람들이 Logging에서 많이 삽질을 할 것 같은데, 이 글이 도움이 되길 바란다.

--

--