Webflux + JPA(Blocking call) 비동기 처리 테스트

김콜라
15 min readNov 7, 2022

--

다음 포스팅
Webflux + Non-blocking call 테스트

두 번에 걸쳐 Webflux 에서 Non-blocking I/O 를 지원하는 라이브러리를 사용해야 하는 이유를 알아보려 합니다.

이번 포스팅에서는

  1. MVC 와 Webflux 에 대해 간단히 알아보기
  2. Webflux 내에서 Blocking call 을 호출한 결과 확인
  3. Blocking call 을 Reactor, Coroutine 으로 감싼 후 호출한 결과 확인

에 대한 내용을 다룹니다.

먼저 기존의 MVC 에 대해 간단히 얘기해보겠습니다.

아래 그림처럼 톰캣은 기본적으로 Thread Per Request 모델로, 매 요청 당 스레드 하나를 할당하고, 해당 요청에 대한 작업은 이 스레드가 전부 담당합니다.

출처: https://livenow14.tistory.com/59

즉, 요청에 할당되는 스레드는 응답을 주기전까지는 스레드를 반납하지 않는데, 여기서 아래와 같은 문제가 발생합니다.

  1. 스레드는 요청에 대한 응답이 반환될때까지 스레드 풀에 반환되지 않습니다. 즉, Blocking call 로 인해 CPU 를 사용하지 않는 상태에서도 스레드를 점유하고 있어, 필요한 요청에 스레드를 할당할 수 없습니다.
  2. 스레드 내에서 Blocking 요청이 많아질수록 Context-switching 이 많아집니다.

이를 개선하기 위해 서블릿 API 3.1 부터는 Non-blocking I/O 가 지원하지만, 제공된 API 이외의 동기적으로 처리하는 모듈(Filter, Servlet)과 블로킹 방식의 API(Database / Http call) 때문에 완전한 Non-blocking 환경이라 할 수 없었습니다.

Webflux 의 동작

Spring 5 부터는 Non-blocking 으로 동작하는 서버를 작성하기 위해 서블릿을 완전히 걷어낸 Spring Webflux 를 활용할 수 있습니다.

Webflux 는 기본적으로 Non-blocking I/O 를 제공하는 Netty 를 사용하는데(정확히는 reactor-netty 입니다.), 적은 스레드를 효율적으로 활용하여 요청을 처리합니다.

기본적으로 Netty 를 활용한 Webflux 의 동작과정은 아래와 같습니다.

출처: https://dzone.com/articles/spring-webflux-eventloop-vs-thread-per-request-mod
  1. 이벤트 루프(워커 스레드) 는 request (event) 를 하나씩 읽으며 처리합니다.
  2. 워커 스레드는 요청을 처리하다가 db/api call 등 I/O 작업을 만나면 다른 이벤트루프 혹은 비동기 작업을 수행하는 곳에 작업을 던지고 스레드를 반환합니다.
  3. 반환된 워커 스레드는 다른 이벤트를 위와 동일하게 처리합니다.
  4. 도중에 비동기 작업에 대한 결과값이 반환되면 다시 워커 스레드가 받아서 마무리하고 클라이언트에게 반환됩니다.

즉, 기존 MVC 와 달리, I/O 과정에서 워커 스레드가 Blocking 되지 않고 다른 요청을 받을 수 있으므로 비교적 적은 수의 스레드로 모든 요청을 처리할 수 있습니다.

이에 대한 테스트는 다음 포스팅에서 진행합니다.

Webflux 기본 스레드 확인

워커 스레드는 코어 갯수 만큼 생성됩니다 .실제로 Webflux 서버를 실행하면 reactor-http-nio 로 시작하는 스레드 10개가 생성된걸 확인할 수 있습니다.

밑에서는 기본 스레드를 워커 스레드라 부릅니다.

Webflux default thread 확인.

현재 CPU 의 코어 개수는 터미널에서 sysctl -n hw.ncpu 명령으로 확인할 수 있습니다. (https://www.baeldung.com/spring-webflux-concurrency#1-reactor-netty)

이제부턴 실제로 Webflux 의 스레드가 Blocking call 을 만나면 어떻게 되는지 확인해보겠습니다. 테스트 환경은 다음과 같습니다.

  • Kotlin + Webflux + JPA
  • JPA 요청은 SLEEP(1) 로 1초동안 처리되도록 함
  • Reactor, Coroutine

Blocking call

JPA 는 기본적으로 JDBC 구현체를 이용하여 데이터베이스에 접근하므로 Blocking 방식으로 동작합니다.

참고: https://gmlwjd9405.github.io/2018/12/25/difference-jdbc-jpa-mybatis.html

JDBC 에 대한 내용은 아래에 잘 정리되어있습니다.

Non-Blocking 방식의 서버 프레임워크는 적은 스레드로 많은 요청을 처리해야합니다. 만약 서버에 JDBC 와 같은 Blocking call 만 제공하는 라이브러리가 추가되면, 스레드를 Blocking 하게 되고 전체 처리량은 낮아집니다.

아래는 요청을 처리하는데 1초정도 걸리는 API 에 요청 100개를 보낸 결과입니다.

코드

/**
* 해당 코드는 워커 스레드를 블록합니다.
* @return
*/
@GetMapping("block")
fun getOneWithBlock(): FileResponse {
logger.info("Do Worker Thread.")
return FileResponse.createFromFile(fileRepository.findOneAfterASecond())
}

결과

1초 요청 100개 처리에 약 10초 정도 소요.

100개 요청을 처리하는데 약 10초 정도 소요되고, 1초에 10개의 요청만 처리되는걸 볼 수 있습니다.

JPA Repository api 에서 Blocking 되어 다른 요청을 받아서 처리하지 못하기 때문입니다.

이를 개선하기 위해 Blocking api 를 비동기 처리할 수 있는데, 여기서는 아래 두 가지 방법을 활용해보겠습니다.

  1. Reactor
  2. coroutine

Reactor

Reactive streams 는 비동기 스트림 처리의 표준입니다.

Webflux 에 사용되는 Reactor 는 이러한 Reactive streams 를 구현한 구현체 중 하나입니다.

이를 이용하면 Blocking call 을 비동기로 처리할 수 있습니다.

아래 코드로 요청 101개를 보낸 결과입니다.

코드

/**
* boundedElastic 대신 사용할 스케줄러입니다.
* @return
*/
@Bean
fun jdbcScheduler(): Scheduler {
return Schedulers.fromExecutor(Executors.newFixedThreadPool(300, object : CustomizableThreadFactory() {
override fun getThreadNamePrefix(): String {
return "Custom-"
}
}))
}

...
/**
*
* @return
*/
@GetMapping("reactor")
fun getOneWithReactor(): Mono<FileResponse> {
logger.info("Do Worker Thread.")
return Mono.fromCallable {
logger.info("Do other Thread.")
FileResponse.createFromFile(fileRepository.findOneAfterASecond())
}
.subscribeOn(jdbcScheduler)
}

boundedElastic 은 고정된 스레드 풀을 유지합니다. 스레드 풀을 따로 생성하여 진행합니다.

결과

비동기 처리를 위한 스레드 생성
1초 요청 101개 처리에 약 1초 정도 소요.

이번에는 워커 스레드를 블록하지 않아 101개의 요청이 거의 동시에 처리되었습니다.

Coroutine

코루틴은 일종의 경량 스레드로, 하나의 스레드 내에서 작업(루틴)을 나누어 처리하면서 동시성을 제공해줍니다. (참고2)

코루틴의 withContext 는 CoroutineContext 를 만들어서 새로운 코루틴을 실행해주는데, 이를 이용하면 Blocking call 을 비동기처리할 수 있습니다. (참고3)

아래는 동시 요청 64개를 보낸 결과입니다.

코드

/**
*
* @return
*/
suspend fun findAllWithCoroutine(): List<FileResponse> {
logger.info("Do Worker Thread.")
return withContext(Dispatchers.IO) {
logger.info("Do other Thread.")
FileResponse.createFromFile(fileRepository.findOneAfterASecond())
}
}

결과

IO Dispatcher 와 Default Dispatcher 가 스레드를 공유. 참고: https://sandn.tistory.com/110
1초 요청 64개 처리에 약 1초 정도 소요.

Dispatchers.IO 와 Dispatchers.Default 는 스레드 풀에 고정된 스레드 수를 유지합니다. 실행 시, 총 74개의 스레드를 볼 수 있고, 그 중 64개가 Dispatchers.IO 작업을 수행합니다.

만약 65개의 요청을 보내게 되면 비동기 처리할 스레드가 부족하여 Blocking 됩니다. 즉, 총 처리시간은 2초 정도 소요됩니다.

마무리

Webflux 내에 Blocking call 이 있어도 비동기처리하면 정상적으로 동작하는걸 확인할 수 있었습니다.

다만 위 솔루션에는 한 가지 문제점이 있는데요,

비동기처리를 위해 별도의 스레드를 이용해야 하고, 해당 스레드는 Blocking call 이 종료될때까지 Block 되는 것을 볼 수 있었습니다.

이를 완벽히 해결하기 위해선, Non-blocking IO 를 지원하는 라이브러리를 이용해야 합니다.

예를 들어, Spring 4.0 에서 나온 AsyncRestTemplate 은 기본적으로 스레드 하나를 할당하여 http call 을 비동기처리하지만, AsyncRestTemplate 의 AsyncClientHttpRequestFactory 를 Netty 로 설정하면 Non-blocking 으로 동작합니다.

AsyncRestTemplate 는 deprecated 될 예정입니다. 대안으로 나온 Webclient 역시 Netty 를 이용합니다.

마찬가지로, non-blocking call 을 지원하는 R2DBC driver 를 살펴보면 Netty 기반이라는 것을 확인할 수 있습니다.

다음에는 Non-blocking call 에 대해 테스트해보고, 실제로 이러한 부분이 개선되는지 확인해보겠습니다.

Reference

참고1. 서블릿.

서블릿은 서블릿 인터페이스를 구현한 요청을 받고 그에 대한 응답을 반환하는 객체라 생각하고 작성합니다.

톰캣은 서블릿을 관리해주는 서블릿 컨테이너로, 요청을 받고 응답을 보낼 수 있게 웹 서버와 소캣을 만들어서 통신을 해줍니다.

참고2. 동시성.

동시성(Concurrency) 은 동시에 실행되는것 처럼 보일때를 의미합니다.
병렬성(Parallelism) 은 멀티스레드처럼 실제로 동시에 실행되는 것을 의미합니다.

참고3. 코루틴.

CoroutineContext 는 코루틴이 실행되는 환경으로, Dispatcher 는 이의 일부입니다.

Dispatcher 는 코루틴이 어떤 스레드 위에서 실행되게 할 지를 명시합니다. Dispatcher 는 아래와 같이 정의되어 있습니다. (참고)

  1. Dispatchers.Default : CPU 사용량이 많은 작업에 사용합니다. 주 스레드에서 작업하기에는 너무 긴 작업 들에게 알맞습니다.
  2. Dispatchers.IO : 네트워크, 디스크 사용 할때 사용합니다. 파일 읽고, 쓰고, 소켓을 읽고, 쓰고 작업을 멈추는것에 최적화되어 있습니다.
  3. Dispatchers.Main : 메인스레드입니다.

코드를 기본 스레드 외부에서 실행하려면 Default 또는 IO Dispatcher에서 작업을 실행하도록 CoroutineContext 에 명시해야합니다.

suspend 키워드를 메서드 앞에 사용하면 스레드가 놀지 않고 Dispatcher 에 의해 스레드가 다른일에 투입됩니다. 같은 시간 실행되어야할 다른 중요한 함수가 스레드를 사용하게 됩니다.

참고4. 자바에서 Non-blocking 작성.
https://parkcheolu.tistory.com/33

참고5. 서블릿.
https://jusungpark.tistory.com/15

참고6.
https://github.com/chang-chao/spring-webflux-reactive-jdbc-sample

참고7. Future.

ExecutorService 의 submit 메서드는 Future 를 반환합니다. 해당 작업은 비동기 작업이라서 실행 즉시 결과를 반환하는데, 그 당시에는 어떠한 결과를 갖는지 알 수 없습니다. 그래서 자바에서는 비동기 처리 결과를 표현하기 위해 Future 객체를 사용합니다.

<T> Future<T> submit(Callable<T> task);

하지만 실제 로직에는 Future 가 아닌, 비동기 작업이 완료된 이후의 값을 이용해야 하는데요, Future 는 get 이라는 Blocking 메서드로 꺼낼 수 밖에 없습니다.

이를 개선하기 위해 콜백 형식으로 Blocking 없이 결과값을 얻을 수 있는 ListenableFuture, CompletableFuture 를 이용할 수있습니다.

DeferredResult 가 반환된 이후에 백그라운드에서 non-blocking 으로 동작하기 때문에 스레드를 많이 사용하지 않습니다.

fun listenableFutureTest(): DeferredResult<String> {
val ast = AsyncRestTemplate()
val result = DeferredResult<String>()

res.addCallback({ s ->
val res2 = ast.getForEntity("/health", String::class.java)

res2.addCallback({ s2 ->
val res3 = asyncService.work("hello")

res3.addCallback({ s3 ->
result.setResult(s!!.body + s2!!.body + s3!!)
}, { e -> })

}, { e ->

})
}, { e ->


return result
}
fun completableFutureTest(): DeferredResult<String> {
val ast = AsyncRestTemplate(Netty4ClientHttpRequestFactory(NioEventLoopGroup(1)))
val result = DeferredResult<String>()

ListenableFuture.toCompletableFuture(ast.getForEntity("health", String::class.java))
.thenCompose {
ListenableFuture.toCompletableFuture(ast.getForEntity("/health", String::class.java))
}
.thenAccept {
result.setResult(it)
}

return result
}

즉, 자바에서도 비동기처리를 쉽게 할 수 있도록 지원하고 있습니다.

Unlisted

--

--