7. 중단 및 종료 Cancellation and Shutdown(1)

Eugene Lim
12 min readMay 29, 2022

--

사실 개발을 할 때, 기능 구현에 집중하다보면 exception이나 error 처리를 간과하고 넘어가기 쉽다. 하지만 꼭 필요한 기능이며, 고급개발자가 되려면 역시 깔끔한 에러처리일 것이다. 이번 장에서는 자바와 멀티스레딩 에러처리에 관해 이야기 한다.

자바에는 스레드가 작업을 실행하고 있을 때 강제로 멈추도록 하는 방법이 없다.

Thread 클래스를 보면 stop(), suspend() 함수들이 존재하고 실제로, 이런 기능을 제공하려고 시도했던 기능이다. 하지만 문제가 많다는 사실을 깨달았고, 사용하지 말아야 한다.

대신 인터럽트 interrupt라는 방법을 사용할 수 있게 되어 있는데, 인터럽트는 특정 스레드에게 작업을 멈춰 달라고 요청하는 형태이다.

스레드로 동작하는 태스크 task를 작성할 때, 멈춰달라는 요청을 받으면 진행 중이던 작업을 적절하게 정리하는 코드를 추가해야한다.

스레드를 종료하는 문제를 제대로 구현하려면 작업이나 서비스, 애플리케이션 설계가 굉장히 복잡해질 수 있다. 하지만 이런 부분을 무시해 버리는 경우가 많다. 오류가 발생하는 경우, 종료하는 경우, 작업을 취소하는 경우에 적절하게 대응하는 프로그램은 품질의 차이가 크다.

7.1 작업 중단 Task Cancellation

작업을 취소해야할 케이스

  • 사용자가 취소버튼을 클릭했을 때
  • 원하는 결과를 얻어서 나머지 실행중이던 작업을 취소해야할 때
  • 웹크롤러의 경우 특정 케이스에서 오류가 났을 때, 모든 작업을 취소해야할 때 (예를 들어, 데이터 저장소가 꽉 찼을 때)
  • 애플리케이션 또는 서비스의 종료, 현재 큐에 대기하던 태스크를 마무리해야할 때

작업을 중단할 수 있는 방법 중 하나는 “취소 요청이 들어왔다”는 플래그를 이용하는 것이다.
플래그를 두고, 실행 중인 작업은 이 플래그를 주기적으로 확인한다

using a volatile field to hold cancellation state

PrimeGenerator는 작업 task를 실행할 때마다 cancelled flag를 volatile로 선언하고 주기적으로 확인하고 있다.

PrimeGenerator를 호출해보자

aSecondOfPrime()을 보면, 1초후에 cancel() 함수를 호출하지만, PrimeGenerator에서 반드시 1초 후에 스레드가 종료되지는 않는다. cancelled = true로 세팅되기 전에 이미 작업이 시작되었고, nextProbablePrime() 계산이 오래 걸린다면 그 다음 loop 에 cancelled를 확인할 수도 있다.

그리고 generator.cancel()이 try-catch구문의 finally에서 실행되므로, sleep() 함수가 실패하더라도 반드시 generator.cancel()은 호출된다.

7.1.1 인터럽트

이전 예제에서 PrimeGenerator는 cancelled=true로 세팅되어도 이미 들어간 작업이 걸리는 시간 때문에 바로 종료되지 않는다고 설명했다. 만약 그 작업이 엄청 길다면 문제가 될 수도 있다.

Unreliable Cancellation

BlockingQueue를 갖고 있는 BrokenPrimeProducer를 보자. 만약 queue가 시작하자마자 nextProbablePrime() 계산이 consume() 보다 훨씬 빨라서 다 찼고, 현재 스레드는 더 넣을 공간을 기다리는 put() 에서 대기하게 된다. 하지만 producer.cancel()을 호출해도, queue에 자리가 나지 않는다면 BrokenPrimeProducer 스레드는 put()에서 계속 대기하므로 cancelled flag를 볼 수 없는 상황이 될 수도 있다.
queue.put() 와 같은 블로킹 함수를 쓰려면 꼭 InterruptedException를 핸들링해줘야하는데 바로 이 이유 때문이다.(5장에서 배웠다)

모든 스레드는 인터럽트 상태를 boolean으로 갖고 있다. 인터럽트에 걸리면 인터럽트 상태값이 true로 설정된다.

public class Thread {
public void interrupt() { ... }
public boolean isInterrupted() { ... }
public static boolean interrupted() { ... }
...
}

해당 스래드가 대기 중일 때(sleep()이나 Object.wait(), 혹은 put()에서 대기할 때)으로 기다리고 있다면) 인터럽트가 걸리면 바로 리턴된다. (리턴된다는 의미는 디버깅으로 봤을 때, sleep()에 걸려있던 실행라인이 바로 다음 라인으로 떨어진다) 그리고 InterruptedException을 던진다.

특정 스레드에 interrupt()를 호출한다 해도 해당 스레드가 처리하던 작업을 멈추는 것이 아니다. 단지 해당 스레드에게 인터럽트 요청이 있었다는 메시지를 전달할 뿐이다.

책에서는 설명하고 있지 않지만, interrupt()를 호출한다고 스레드가 바로 중지하는 것이 아니다. 단지, blocking method를 실행중일 때는 throw를 하므로 try-catch 구문으로 나오고, 만약 blocking method가 아닌 곳을 실행 중이라면 스레드의 interrupted=true로 변경되기만 한다. 이 때 isInterrupted()를 호출하면 true를 받을 수 있다.

인터럽트를 잘 대응하는 메소드를 만드려면,

  • 인터럽트가 걸리는 상황을 정확하게 기록해뒀다가
  • 자신을 호출한 메소드에게 인터럽트 상태에서 대응할 수 있도록 정보를 준다
  • static interrupted() 메소드는 현재 스레드의 인터럽트 상황을 초기화한다.

아까 만들었던 BrokenPrimeProducer를 수정했다.

  1. cancelled flag를 사용하지 않고 Thread.interrupt()를 직접 이용하였다. 만약 이 스레드가 put()에서 대기한다면 interrupt() 를 통해 바로 catch로 빠져나올 것이다.
  2. while() 에서 Thread.currentThread().isInterrupted() 를 확인하고 있다. 현재 스레드가 put() 이외의 코드에서 interrupt()를 받았다면 catch로 떨어지지 않기 때문에, 작업이 들어가기 전에 확인해주는 것이 좋다

7.1.2 인터럽트 정책 Interruption Policies

스레드도 인터럽트를 받았을 때, 어떻게 대응할지를 생각해야한다.

  1. 스레드 수준 혹은 서비스 수준에서 작업 중단 기능을 만들어야 한다. 스레드가 인터럽트를 받았을 때 자원을 어떻게 정리하고, 현재 자원을 정리중이다 등을 인터럽트를 요청한 스레드에게 알릴 수도 있다.
  2. 작업 task와 스레드가 인터럽트 상황에서 어떻게 동작하는게 좋은지를 구분해야한다. 우리는 대부분 task를 스레드풀에 위임하게 되는데, 만약 task가 interrupt()를 받았다면, 현재 작업하고 있는 스레드는 이 정보를 잘 보관했다가 task를 요청한 곳에 알려야 한다. 블로킹 메소드에서 InterruptedException을 던지도록 되어 있는 이유가 이것 때문이다.
  3. 인터럽트 요청을 받았을 때, 현재 진행중이던 작업을 완료하는 코드도 추가할 수 있다. 비정상적으로 종료해서 데이터가 깨지거나 날아가는 상황은 방지해야한다.
  4. Thread.currentThread().interrupt() 작업 task에서 interrupt를 받았을 때, 스레드의 인터럽트 정책을 존중해줘야 하기 때문에, 꼭 현재 실행하고 있는 스레드의 interrupt 상황을 유지해줘야 한다. (사실, 이미 thread.interrupt()를 해서 isInterrupted=true로 세팅될 줄 알았는데, exception만 발생한다.
interrupt() 를 써보자

여기서 다음과 같은 결과를 볼 수 있다.

--- t.status : RUNNABLE -> //Thread 상태 
--- before : false -> //TaskRunnable에서 InterruptedException으로 빠졌지만 현재 스레드는 false
--- after : true -> //명시적으로 interrupt()를 호출해줬다.
--- t.status : TERMINATED -> // 스레드 상태가 끝났다

7.1.3 인터럽트에 대한 대응

InterruptedException을 처리할 수 있는 방법

  1. exception을 호출한 상위 메소드로 전달한다. 이렇게 하면 상위 메소드가 blocking method가 된다.
  2. 호출한 상위 메소드가 직접 처리할 수 있도록 인터럽트 상태를 유지한다.
BlockingQueue<Task> queue; ... 
public Task getNextTask() throws InterruptedException {
return queue.take();
}

getNextTask()도 blocking method가 되었다.

하지만 InterruptedException을 상위 메소드로 전파할 수 없거나(예를 들어, runnable로 구현했을 때) 직접 처리하려고 할 때, TaskRunnable처럼 interrupt() 메소드를 다시 호출하는 것이다.

만약 interrupt가 왔을 때, 현재 진행하는 작업을 완료하고 싶다면

getNextTask()는 InterruptedException을 체크해뒀다가 함수가 끝나기 전에 interrupt 상황을 알리고 있다.

7.1.4 예제: 시간 지정 실행

timedRun()은 ScheduleExecutorService를 사용해 주어진 시간만큼 작업을 하고, 해당 시간이 되면 taskThread에 interrupt()를 한다. 하지만! 스레드에 인터럽트를 걸 때 이 스레드의 인터럽트 정책을 알 수가 없다.
만약 r.run()이 timeout보다 일찍 끝나서 taskThread는 다른 작업을 처리하고 있을 때 interrupt를 받을 수도 있기 때문이다.

특정 시간동안 기다리는 작업은 cancelExec.schedule()로 실행한 것은 이전 코드와 같지만 여기서는 RethrowableTask를 실행시킨다음에 main thread(혹은 timedRun()을 실행시킨)에서 taskThread가 끝나기를 join()을 통해 기다려준다.

❗️하지만 여기서도 Runnable r이 정상적으로 끝난 것인지, join(timeout) 때문에 타임아웃으로 끝난 것인지 알 수가 없다.

7.1.5 Future를 사용해 작업 중단

다시 Future를 이용해 timedRun() 함수를 구현해보자.
ExecutorService.submit()을 실행하면 Future를 리턴 받는다.

<T> Future<T> submit(Callable<T> task) 
Future<?> submit(Runnable task)
<T> Future<T> submit(Runnable task, T result)

Future.cancel(bool mayInterruptIfRunning)

  • true — 어느 스레드에서 실행되건 해당 스레드에 interrupt가 걸린다
  • false — 아직 실행시키지 않았다면, 실행시키지 말아라. 즉, 인터럽트 대응이 되지 않도록 만들어진 task는 false를 넘겨주는 것이 좋다

그리고, thread-pool에 있는 thread에 직접 인터럽트를 걸면 안된다. 인터럽트가 걸리는 시점에 어떤 작업을 실행하고 있을지는 알 수 없다. 즉, thread-pool에 submit()으로 받은 future에 cancel()함수를 호출하여 적절히 인터럽트를 걸어줘야 한다.

Future를 이용하여, 이전 코드들 보다 훨씬 깔끔하게 구현할 수 있다.

Future.get() 에서 InterruptedException, TimeoutException이 발생했을 때 작업결과가 필요없다면, Future.cancel()을 호출해서 작업을 중단시키자

7.1.6 인터럽트에 응답하지 않는 블로킹 작업 다루기

소켓 I/O를 실행하는 스레드에 인터럽트를 호출했을 때, 락을 기다린다던가 스레드가 대기 상태라면 InterruptedException에 반응하지만, 소켓 연결을 기다리는 등의 작업 중에는 isInterrupted=true로 설정되는 것 밖에는 효과가 없다.

  • java.io 동기적 소캣 I/O — 소켓에서 데이터를 읽어오거나 쓰는 부분.
    InputStream.read()와 OutputStream.write()는 인터럽트에 반응하지 않는다.
    연결된 소켓을 직접 닫으면 read나 write 메소드가 중단되면서 SocketException이 발생한다.
  • java.nio 동기적 I/O — InterruptibleChannel에서 대기하고 있는 스레드에 인터럽트를 걸면 ClosedByInterruptedexception이 발생한다.
  • Selector를 이용한 비동기적 I/O — 스레드가 Selector 클래스(java.nio.Channels) 의 select()에서 대기중이라면 close() 호출하면 ClosedSelectorException이 발생한다.
  • 락 확보 — 스레드가 암묵적 락을 확보하기 위해 대기 상태에 들어가 있으면, 락이 확보될 것이라는 보장을 하지 못 하고 인터럽트에도 반응하지 않는다. → 나중에 Lock 인터페이스를 구현한 락 클래스로 대체할 수 있다. lockInterruptibly() 메소드를 사용해서 락 확보에 대기하면서 인터럽트에도 반응할 수 있다.

ReaderThread는 소켓을 받아서 소켓으로 들어오는 내용을 계속 읽어들인다. 그리고 ReaderThread를 종료할 수 있게 interrupt()을 override 해서 socket.close()를 호출해주면 결국 read()함수에서 IOException이 발생하고, ReadThread는 자연스럽게 종료된다.

7.1.7 newTaskFor 메소드로 비표준적인 중단 방법 처리

ThreadPoolExecutor에서 submit()을 호출하면 Future를 주는 것은 이전 챕터에서 공부했다. 여기 새로 추가된 함수는 newTaskFor(Callable)을 호출한다면 RunnableFuture를 리턴해준다. 이렇게 CallableTask<T>를 상속받아서 cancel()을 오버라이드하면 작업 중단 과정을 변경할 수 있다.

CancellingExecutor는 ThreadPoolExecutor를 상속받고, newTaskFor()를 override 했다. 우리가 정의한 CancellableTask가 들어오면 기존에 있던 newTask()를 호출해준다. 그 다음 cancel()에서 소캣을 닫아주는 코드를 넣으면 태스크가 종료되는 시점에 소켓도 닫히게 된다.

--

--