자바병렬프로그래밍 6. 작업 실행 Task Execution

Eugene Lim
18 min readMay 5, 2022

--

이번 챕터에서는 간단한 웹서버를 만들어보면서 Executor를 이용해 어떻게 병렬처리를 하면 좋은지 설명하고 있다.

이전 챕터 보기

2. 스레드 안정성
3. 객체 공유
4. 객체 구성
5. 구성 단위 Build Blocks

6.1 스레드에서 작업 실행

서버애플리케이션을 만들 때, 가장 큰 목표는 부하가 걸리지 않은 상황에서는 빠른 응답 속도, 가능한 많은 사용자 task 처리이다.

6.1.1 작업을 순차적으로 실행

SingleThreadWebServer는 가장 기본적인 웹서버의 틀을 보여준다. handleRequest() 여기서 데이터를 처리하고, client socket까지 끊어주는 역할을 한다고 가정하자.
그리고 한 번에 하나의 요청만 처리할 수 있다. 소켓 연결을 하고 데이터를 주고 받는 부분을 IO자원라고 하고, 데이터를 받아서 가공하고 처리하는 과정을 CPU자원이라고 한다면, IO를 처리할 때는 CPU가 대기하고, CPU가 처리할 때는 IO가 대기해야한다.

6.1.2 작업마다 스레드를 직접 생성

이제는 main-thread는 socket.accept()로 요청이 들어오길 기다리고, 요청이 들어오면 새로운 스레드를 만들어서 handleRequest()를 처리하도록 한다.

  • 여러개의 요청을 동시에 처리할 수 있다
  • 만약 handleRequest()에서 공유자원이 있다면 동기화 작업을 해줘야 한다

6.1.3 스레드를 많이 생성할 때 문제점

하지만 ThreadPerTaskWebServer에는 문제점이 있다.

  • 스레드를 생성하고 제거하는 작업에 많은 자원이 소모된다.
    (스레드를 생성하는 과정에 일정량의 시간이 필요하다.)
  • 실행 중인 스레드는 시스템의 자원(메모리)를 소모한다.
    (스레드는 메모리캐시를 사용하므로 idle 상태에 있다 하더라도 메모리를 점유하게 된다.)
  • 모든 시스템에는 생성할 수 있는 스레드 개수가 제한되어 있다 → 결국 많이 만들면 OutOfMemery

6.2 Executor

이전 챕터에서 제한된 큐 bounded queue를 사용해 부하가 크게 걸리는 애플리케이션에서 메모리를 모두 소모해버리지 않도록 통제하는 방법을 살펴봤다.

이 말은 queue를 사용해서 producer와 consumer를 컨트롤 했었다.
즉, 스레드풀 thread pool 도 thread를 동일한 방식으로 컨트롤 해준다(왜냐하면 스레드풀 내부에서도 queue를 사용한다). concurrent 패키지에 Executor를 사용하여 스레드풀이 만들어져있다.

public interface Executor {
void execute(Runnable command);
}
  • Executor interface는 작업 등록 task submission과 작업 실행 task execution을 분리했고, 각 작업은 runnable이 된다.
  • 각 작업의 라이프사이클도 관리
  • Producer-Consumer Pattern
    (작업 등록은 프로듀서가 되고, 작업 실행은 컨슈머가 하는 모양이 된다)

6.2.1 Executor를 사용한 웹서버

ThreadPerTaskWebServer를 Executor를 이용하여 수정해보자.

100개의 스레드를 만드는 Executor를 사용했고,
(Executors.newFixedThreadPool()은 나중에 설명한다)
작업을 등록하는 부분과 실행하는 부분이 분리되어 있다.
그리고 Thread.start()를 단지 executor.execute()로 대체되었다.

ThreadPerTaskWebServer와 다른 점은 요청 때마다 매번 thread를 만들지 않는다. 그리고, 만약 100개의 thread가 현재 작업 중이라면 새로운 요청은 queue 담겨서 실행되지 않고 대기를 할 것이다.

만약 요청마다 새로운 thread를 만들고 싶다면 newFixedThreadPool 부분을 수정하여 쉽게 Executor를 수정할 수 있다.

만약 SingleThreadWebServer처럼 새로운 스레드를 만들지 않게 하고 싶다면,
Executor exec = WithinThreadExecutor()로 대체하여 작업할 수 있다.

6.2.2 실행정책 Execution Policies

Executor를 사용하면, 아래 정책들에 따라 설정을 수정할 수 있다.

  • 작업을 어느 스레드에서 실행할 것인가
  • 작업을 순서대로 할 것인가?(FIFO, LIFO ..)
  • 동시에 몇 개의 작업을 병렬로 실행할 것인가
  • 최대 몇 개까지 작업큐에 대기시킬 것인가
  • 시스템 부하가 심할 때, 어떤 작업을 희생할 것이며 이것을 어떻게 알릴 수 있을까
  • 작업 실행 전이나 후에 어떤 동작이 있어야 하나

6.2.3 스레드 풀 Thread Pools

스레드풀의 메인 컨셉 main concept은 스레드로 처리할 작업 task를 쌓아둬야 하기 때문에 작업 큐 task queue 와 굉장히 밀접한 관련이 있다.

  1. 먼저 큐에서 실행할 작업은 가져오고
  2. 작업을 실행하고
  3. 다음 실행할 작업이 나타날 때까지 대기한다

스레드 풀을 사용하면, 다음과 같은 장점이 있다.

  • 이전에 사용했던 스레드를 재사용하기 때문에, 스레드를 계속 생성할 필요가 없다
  • 작업이 생겼을 때, 스레드가 이미 만들어져있기 때문에 시간 딜레이가 발생하지 않는다.
  • 스레드풀의 크기를 적절히 하면, 프로세서가 쉬지 않고 동작할 수 있고, 메모리를 효율적으로 사용할 수 있다.

일반적으로 사용하는 스레드풀의 종류를 미리 만들어놓았다.(Executors)

  • newFixedThreaddPool
    생성할 수 있는 스레드의 최대 개수가 제한되어 있고, 제한된 개수까지 스레드를 생성하면 더이상 늘어나지 않는다. (예외 상황으로 스레드가 종료되었다면, 다시 하나를 만든다)
  • newCachedThreadPool
    현재 갖고 있는 스레드 수보다 처리할 작업 수가 많아지면 스레드를 새로 생성한다.스레드수에 제한이 없다
  • newSingleThreadExecutor
    단일 스레드 Executor. 만약 작업 중에 exception이 발생해서 종료되면, 새로운 스레드를 만든다.
    등록된 큐에 따라(FIFO,LIFO 등) 순차적으로 실행한다.
  • newScheduledThreadPool
    일정 시간 후에 실행하거나 주기적인 작업을 할 때
    Executor.Timer 클래스와 유사하다(나중에 설명)

ThreadPoolExecutor를 설정하는 방법은 Chapter 8에서 설명한다.

6.2.4 Executor 동작 주기 Lifecycle

Executor를 실행하는 방법은 위에서 보았고, 종료하는 방법은 다음과 같다.

  • Executor는 모든 스레드가 종료되어야 종료되고, Executor가 종료되지 않으면 JVM에 올라간 프로세스도 종료되지 않는다
  • Executor는 비동기로 실행되기 때문에, 작업이 언제 끝나는지도 알기 힘들다
  1. 안전한 종료 graceful shutdown
    작업을 새로 등록하지 못하도록 막고, 모든 작업이 끝나기를 기다리는
  2. 강제적 종료 abrupt shutdown
    컴퓨터 플러그가 빠져서 전원이 꺼지는 경우

이런 라이프사이클에 필요한 ExecutorService의 인터페이스가 있다.

우리가 최종적으로 사용할 ThreadPoolExecutor hierarchy

ExecutorService의 내부적으로 갖고 있는 lifecycle은 크게 3가지

  • 실행 중 running
  • 종료 중 shutting down
  • 종료 terminated

-shutdown() 메소드를 호출하면, 새로운 작업을 받지 않고, 이전 등록되었던 작업까지 실행 한 후 마친다.

-shutdownNow() 를 호출하면, 강제 종료 절차를 진행하는데, 현재 진행 중인 작업도 취소시키고 실행되지 않고 대기중이던 작업을 List<Runnable>로 반환해준다.

  • RejectedExecutionHandler
    ThreadPoolExecutor는 이미 종료 절차가 시작되거나 종료된 시점에 새로운 작업을 등록하면 rejected execution handler를 통해 오류처리를 한다 → 8.3.3 에 설명
    RejectedExecutionHandler의 설정에 따라 다르겠지만 처리 방법은
    1. 등록하려던 작업을 그냥 무시할 수도 있고
    2. RejectedExecutionException을 발생시킬 수도 있다

ExecutorService가 종료될때까지 기다리고자 한다면, awaitTermination()을 호출하면 된다.

웹서버에 종료기능을 추가한 LifecycleWebServer

  1. 기존에 while(true) → while(exec.isShutdown()) 으로 바꿈
  2. 그리고 while 로 들어왔는데, exec.execute()를 실행할 때, 이미 스레드풀이 종료상태로 진입해서, RejectedExecutionException이 나온다면 try-catch로 해당 task가 실행이 안됐음을 알려줄 수 있다.
  3. stop() 을 추가하여, shutdown()을 호출할 수 있게 해준다.
  4. 만약 request에 웹서버를 스탑시키는 Request 프로토콜이 있다면, 체크해서 stop() 시킬 수도 있다.

6.2.5 지연/주기적 작업 Delayed and Periodic Tasks

자바에 포함되어 있는 Timer 클래스는 단점이 있다.

  • 스레드가 하나여서, 이전 작업이 너무 오래걸리면 다음 시각에 예정된 작업이 밀린다.
  • Timer 클래스의 절대시각 기능을 사용하면, 시스템 하드웨어 시각을 변경시면 Timer에 스케줄된 작업도 함께 변경된다.
  • Timer 스레드는 예상치 못한 Exception이 발생했을 때, 스레드 자체가 멈춰버릴 수도 있다.

그러므로, ScheduledThreadPoolExecutor로 교체해서 사용하자.

  • 스레드를 여러개 사용하므로 작업이 밀릴 걱정을 하지 않아도 되고
  • 절대시각을 지원하지 않는다
  • 예상치 못한 Exception이 발생해서 Thread가 종료된다면, 새로운 스레드를 만들어서 예정된 작업을 실행해준다.

OutOfTime 클래스에서 ThrowTask() 에서 예상치 못한 에러가 발생한다면 두번째 ThrowTask는 아예 실행이 안된다.

6.3 병렬로 처리할 만한 작업 Finding Exploitable Parallelism

이번 챕터에서는 Executor를 사용하면서 HTML을 받아와서 이미지 파일도 같이 렌더링할 수 있는 웹서버를 만들어본다.

6.3.1 순차적 페이지 렌더링 Sequential Page Renderer

우리가 웹브라우저를 만든다고 생각해보자, 처음에는 이렇게 간단하게 만들어 볼 수 있다.

먼저 텍스트를 렌더링 하고, 그 사이사이에 들어있는 이미지를 네모박스로 표시한 다음, 이미지를 차례대로 다운로드 받아서 표시하는 방법도 있다. SingleThreadRenderer는 이 모든 작업을 하나의 스레드에서 실행하고 있다.

여기서 imageInfo.downloadImage() 는 대부분 IO 작업이고 시간도 상당히 걸릴 것이다. 그리고 CPU는 대기하게 된다. 이 부분을 동시에 실행한다면 CPU 활용도 꽤 높고, 응답시간도 개선될 수 있을 것이다.

6.3.2 결과가 나올 때까지 대기: Callable and Future

우리가 많이 써왔던 Runnable은 run() 후에 결과값을 리턴해줄 수도 없고, 예외가 발생할 수 있다고 throws 를 명시할 수도 없다.
(결과를 만들어내면 다른 곳에 저장해야하고, 오류가 발생하면 로그 파일에 기록해야만 한다.)

시간이 걸리는 작업 후에 결과를 받아와야 할 때는 (Runnable을 대체하여) Callable을 사용하는게 좋다. 이런 작업들이 ExecutorService 에서 실행된다면 다음 상태를 가질 수 있다.

  • 생성 created
  • 등록 submission
  • 실행 started
  • 종료 completed

그리고 작업은 시간이 걸리기 때문에 중간에 취소할 수 있는 기능이 있어야 한다.

  • 등록은 되었지만(submitted) 실행이 되지 않았을 때 → 취소
  • 이미 시작되었을 때(started) → 취소

Future는 특정 작업이 정상적으로 완료했는지, 취소됐는지 정보를 갖고 있다.

public interface Callable<V> {
V call() throws Exception;
}

public interface Future<V> {
boolean cancel(boolean mayInterruptIfRunning);
boolean isCancelled();
boolean isDone();
V get() throws InterruptedException, ExecutionException,
CancellationException;
V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException,
CancellationException, TimeoutException;
}

즉, 작업 task를 ExecutorService에 넣을 때, Callable과 Future의 조합으로 구현할 수 있다.

인터페이스를 보면, Callable은 call()을 실행하면 결과값 V를 주고 exception도 명시할 수 있다(Runnable과 다르게).

Future는

  • complete 혹은 cancel로 상태가 바뀌었을 때 되돌릴 수 없다
  • get() 은 상황에 따라 동작이 다르다
    1. 완료된 상태라면, 즉시 결과값을 준다.
    2. 작업을 시작하기 전이라면, 작업이 완료될 때까지 기다리는 blocking method
    3. 작업이 모두 끝난 상태에서 exception이 발생했었다면, ExecutionException에 원인을 담아서 준다.
    4. 작업이 중간에 취소되었다면 CancellationException이 발생한다

Future를 만들려면,

  • ExecutorService.submit()을 호출하면 Future로 리턴 받는다.
    (submit에는 callable, runnable 둘다 들어갈 수 있다)
  • 직접 Runnable 이나 Callable을 FutureTask에 넣어서 만들 수도 있다.
    (AbstractExecutorService의 newTaskFor()를 override 해서 callable, runnable을 task로 넣었을 때 FutureTask를 컨트롤 할 수도 있다)
FutureTask의 hierarchy

6.3.3 Future를 사용한 페이지 렌더링

SingleThreadRendere를 병렬로 진행될 수 있는 태스크를 나눠보자

  1. HTML 중간에 이미지를 넣는 부분이고
  2. 이미지 파일을 다운로드 받는 작업이다

FutureRenderer는 Future와 Callable을 어떻게 사용하는지 보여준다. 다운로드한 이미지 정보리스트를 결과로 받는 Callable을 생성하였고, executor.submit(task) 로 callable을 실행하였다.
그리고 renderText()는 같은 스레드(예를 들어, main에서 호출되었으면 main)에서 표시하고 그 후에 future.get()으로 이미지 리스트를 받으려고 대기한다.

  1. 이미 다운로드가 끝났으면 바로 List<ImageData> 를 반환한다.
  2. 만약 다운로드 중이라면 future.get()에서 대기하고, 다운로드가 끝나면 List<ImageData>를 반환한다.
  3. 다운로드 중에(future.get()에서) InterruptedExcetpion이 발생하면 현재 스레드에도 그 사실을 알리고 future.cancel()을 요청한다.
  4. InterruptedException을 제외한 나머지 에러는 ExcecutionException의 cause에 에러가 담겨온다.(이전 챕터 5.2.2 FutureTask를 다시 보자)

여기서 HTML 이 먼저 렌더링되고, 이미지가 다운로드 되는대로 화면에 나타날 것이다. 하지만 이 코드의 단점은 이미지가 10개가 포함되어 있다면 10개를 전부 다운로드 해야 이미지들이 보인다는 것이다.

결국, FutureRenderer는 텍스트를 렌더링하는 작업은 훨씬 빠르게 끝나지만, 이미지를 다운로드 하는 작업이 제일 오래 걸리므로 성능의 이점이 크게 없다.

6.3.5 CompletionService: Executor와 BlockingQueue의 연합

결국 이미지마다 다운로드받는 Task를 exec.submit()을 하고, 여기서 나온 future를 List<Future>에 추가한다고 생각해보자.

List<Future> futures = ArrayList<Future>();
while (isAllDone()) {
for (index = 0; index++; index < futures.size) {
futures[index].get(timeout = 0)
}
}

여기서 futures를 계속 pooling 하면서 futures[index].get(timeout = 0)을 하면 이미지를 다운로드 받는 즉시 렌더링 할 수 있을 것이다. 이런 기능을 제공해주는게 Completionservice.

❗️또 다른 방식은 task가 끝나면 작업할 수 있는 listener를 등록하는 것도 하나의 방법이다

public interface CompletionService<V> {     
Future<V> submit(Callable<V> task);
Future<V> submit(Runnable task, V result);
Future<V> take() throws InterruptedException;
Future<V> poll();
Future<V> poll(long timeout, TimeUnit unit) throws InterruptedException;
}

CompletionService는 Executor와 BlockingQueue의 기능을 하나로 모은 인터페이스이다.
필요한 Callable을 실행시킬 수 있고, poll()을 이용해 완료된 작업들을 반환받을 수 있다.
ExecutorCompletionService를 보면 좀더 이해하기 쉽다.

ExecutorCompletionSerivce도 똑같이 submit(callable)을 호출하여 작업을 실행시킬 수 있다. 여기서 executor.execute() 를 할 때, QueueingFuture로 감싸서 executor에게 실행을 넘긴다.

QueueingFuture는 실행할 task(runnableFuture)와 BlockingQueue를 받아오게 된다. 그리고 task는 작업이 끝나면 done() 이 호출되서 completeQueue에 추가가 되는 것이다. 그리고 poll() 혹은 take()를 호출하면 작업이 끝나는 순서대로 바로바로 결과를 넘겨줄 것이다.

QueueingFuture는 FutureTask를 상속받고, 또 생성자에서 futureTask를 인자로 받는다. 그리고 대부분의 기능은 인자로 받은 futureTask가 역할을 담당하고, 그 중에 done()만 override를 받아서 추가작업을 추가하였다. 이런 기법은 decorator pattern이라고 했으며, Chapter 4.2의 synchronizedXXX를 볼 때도 이 패턴에 대해 공부하였다.

6.3.6 CompletionService를 활용한 페이지 렌더링

이제 이미지별로 다운로드를 할 수 있도록 수정해보자

CompletionService<ImageData>로 생성자로 들어온 executor를 넣어줬다. 하지만, 여기서는 연관되는 작업이 없어서, newCachedThreadPool 로 새로 만들어서 ExecutorCompletionService를 만들 수도 있다.
이미지를 얻어와야 할 정보를 loop을 돌면서 completionService.submit() 을 하고 있다.
그리고 info 리스트만큼 loop을 돌면서 take()가 나오기를 기다리고 있고, 실제로 값이 나오면 바로 이미지 lendering을 하고 있다.

6.3.7 작업 실행 시간 제한

타임아웃을 걸어, 시간제한을 할 수 있다.
Future.get() 에 시간제한을 걸면, TimeoutException을 던지면서 실행을 멈춘다.

만약, Ad를 다른 서버에서 가져온다면, Ad를 불러오는 작업은 executor에 등록해두고, 페이지를 먼저 rendering 하는 것이 중요하다.
그 후에, f.get() 남은 시간을 넘겨서 타임아웃을 걸 수 있다.
그리고 TimeoutException이 발생하면 기본 광고 DEFAULT_AD를 넘겨주고, f.cancel(true)을 실행하여 취소시킨다.

6.3.8 여행 예약 포털

여기서는 입찰정보로 표현했지만, 여러 항공사에서 비행기 가격을 가져오는 작업을 수행해야 한다고 생각해보자. 여러 서버에서 비행기 가격을 가져오는 작업 n개를 스레드 풀에 등록하고, 등록한 작업마다 future 객체를 확보하고, 타임아웃을 지정한 get()으로 정보를 가져오도록 하려고 한다.

  • TravelInfo → 어디서 어디로, 며칠에 이동할지에 대한 정보
  • TravelCompany → 항공사
  • 항공사 리스트로 travelInfo를 넘겨서 비행기 가격 TravelQuote를 얻어와야 한다.

invokeAll()에 List<QuoteTask> → Callable list를 넘겨주고, 타임아웃을 동시에 적용할 수 있다.
invokeAll()은 모든 작업이 완료되거나 인터럽트가 걸리거나, 지정된 시간을 지날 때까지 대기하다가 리턴된다.

마지막 futures loop을 돌 때는 이미 모든 작업들이 성공 혹은 exception이 결정이 난 상태이므로 적절한 로직을 추가해준다.

--

--