Ray 로 내 파이썬 코드 10배 빠르게 만들기

t.k.woo
네이버 쇼핑 개발 블로그
25 min readJan 13, 2021

Ray 를 이용한 python 병렬처리 방법과 딥러닝 모델 추론에 응용하는 방법 대해 알아봅니다.

목차

  • Ray 소개와 주요 컨셉
  • Task 사용 방법과 팁 — ray.put(), ray.wait()
  • Actor 로 만드는 pytorch model inference

Ray

Ray 는 parallel and distributed application 제작을 위한 universal API 입니다.
Ray provides a simple, universal API for building distributed applications.

  • 최근 분산 학습, 분산 AutoML(Tune) 등 AI 학습과 관련된 엔지니어링으로 많이 소개 되지만 이 글에서는 조금 더 일반적인 병렬처리 관점에서 설명 드립니다.
  • Ray 의 전반적인 기본 개념은 여기 를 참고해주세요.

주요 컨셉

Ray 는 dynamic task graph computation model 로 구현되어 있습니다. 이 모델은 크게 TaskActor라는 2가지 추상화 타입으로 제공됩니다. state 의 유무가 가장 큰 차이를 나타냅니다.

  • Task 는 stateless worker 의 remote 함수 실행을 의미합니다.
    multiprocessing 모듈과 비슷한 역할을 수행합니다. (실제로는 다르지만..)
    ‘함수’ 의 역할과 대응됩니다.
  • Actor 는 state 를 저장하는 계산을 의미합니다. (stateful actor)
    ‘클래스’ 의 역할과 대응 됩니다.
Ray 의 Task 와 Actor 모델

Task 와 Actor 모두 ‘worker’ 로 ray 에서는 remote 함수를 통해 실행할 수 있습니다. remote 함수는 파이썬 객체 대신 future 를 리턴하며 이 값은 다른 remote 함수의 인자로 사용할 수 있습니다.

Worker 를 자세히 이해하기 위해서는 Ray 의 아키텍처를 이해하면 좋습니다.

크게는 system layer 와 app layer, 2개의 파트로 구성됩니다. Application layer 는 API 와 계산 모델을 구현하고 system layer 는 backend 로써 task 의 스케쥴링과 데이터 관리를 수행합니다.

Task 실행 시 아키텍쳐에서 동작 흐름을 살펴보면 driver(user program) 에서 작성된 ray 함수와 GCS 의 table 에 등록하고 스케쥴러를 작성합니다. 작성된 스케쥴을 system layer 에서 global scheduler 로 관리되며 객체에 대한 정보도 GCS table 에 등록됩니다. 실제 동작할 worker 에서는 global 스케쥴러와 GCS 에 등록된 execution / object 를 받아와서 작업을 수행하게 됩니다.

더 자세한 정보는 논문을 참고해주세요.

Task 사용 방법과 팁

Task 의 사용 방법과 multiprocessing 대비 이점

3000x3000 이미지에 4x4 kernel 을 convolution 하는 프로그램을 만들어 봅시다.

먼저 serial program 으로 구현해보겠습니다. 4x4 kernel 48 개를 3000x3000 이미지에 convolution 합니다.

같은 프로그램을 ray 로 병렬화 합니다. 48 개의 core 가 있다고 가정합니다.

convolution 하는 함수 f 를 만들고 해당 함수를 ray.remote 데코레이터로 지정합니다. 이제 f 는 ray 의 task 가 되었습니다. task 를 사용할 때는 f.remote() 를 통해 사용할 수 있습니다. remote 의 리턴값은 python object 가 아닌 reference 가 출력됩니다. python object 로 받아오기 위해 ray.get() 을 사용해야합니다.

.remote() 의 사용만으로 기존 코드를 병렬화 시킬 수 있는 것을 기억해주세요! 아주 편리하게 사용할 수 있습니다.

multiprocessing 기반 코드는 어떨까요?

pool 로 process 를 만들고 .map() 을 통해 각 프로세스에 convolution 을 적용하도록 만들 수 있습니다.

위 3가지 방법의 실행시간을 확인해보겠습니다.

+--------------+--------+-----------------+--------+
| process time | serial | multiprocessing | ray |
+--------------+--------+-----------------+--------+
| cv2.filter2D | 4.569s | 17.393s | 0.522s |
+--------------+--------+-----------------+--------+

serial 에 비해 ray 가 약 9배 정도 빠른 것이 확인되었습니다. 반면 multiprocessing 은 오히려 4배 정도 느려진 것을 볼 수 있었습니다.

분명 multiprocessing 으로 병렬처리를 하는데 왜 느려질까요?

multiprocessing 은 큰 객체를 직렬화하기 위해 pickle 을 사용합니다. 각 프로세스마다 고유한 데이터 복사본을 만들어야하기 때문에 3000x3000 이미지의 복사가 일어났고 이 데이터를 직렬화 / 해제하기 위해 오버헤드가 추가되었기 때문입니다. 즉, 작업할 데이터가 매우 크면서 병렬화하는 작업의 크기가 작은 경우 serial program 보다 느려질 수 있습니다.

Ray 도 매우 작은 task 의 경우 병렬화 작업을 위한 오버헤드가 존재하기 때문에 적합하지 않습니다. (convolution 예제에서 48 core 로 병렬화를 했지만 48배 만큼 빨라지지 않은 이유와 관련이 있습니다.)

하지만 Ray 는 task latency 를 최소화 하기 위해 값을 복사하거나 역직렬화하지 않고 shared memory 가 지원하는 배열을 만듭니다. 이는 system layer 의 In-memory distributed storage 구현 덕분이며 내부적으로 Apache arrow 의 plasma object store 기반으로 최적화 되어 있는 것을 확인할 수 있습니다.

Ray 사용 팁

Ray.put()

앞서 설명드린 것처럼 Ray 는 in-memory object storage 를 가지고 있습니다. 이 메모리를 shared memory 로 사용할 수 있기 때문에 큰 객체를 shared memory 에 업로드 해두면 worker 나 actor 에서 빠르게 접근할 수 있습니다.

### 일반적인 경우large_image = np.zeros((3000, 3000))
for i in range(num_cpus):
result_ref = f.remote(large_image, filter[i])

위 코드에서는 large_image가 num_cpu 만큼 프로세스에 할당되기 때문에 오버헤드가 발생합니다.

### ray.put() 을 사용한 data id 공유large_image = np.zeros((3000, 3000))
image_ref = ray.put(large_image)
for i in range(num_cpus):
result_ref = f.remote(image_ref, filter[i])

ray.put() 을 이용해서 ray 를 object store 에 업로드한 후 referece id 정보를 task 에 넘겨줄 수 있습니다. task 는 shared memory 에 저장된 image 를 이용해서 computational graph 를 만들게 됩니다.

Ray.wait()

ray.get() 을 사용하면 모든 worker 가 작업을 종료할때 까지 기다려야합니다. 할당되는 작업의 크기가 차이가 나는 경우 아래 그림처럼 마지막에 종료되는 worker 4 기준으로 작업이 마무리 되기 때문에 비효율이 발생하게 됩니다.

[그림 출처] https://rise.cs.berkeley.edu/blog/ray-tips-for-first-time-users/

Task 의 작업시간이 차이가 나는 경우 ray.wait() 를 활용하면 좋습니다. 그림의 (b) 와 같이 사용할 수 있기 때문에 전체 실행시간을 줄일 수 있을 뿐 아니라 작업이 끝나지 않는 예외 상황에도 유연하게 대처하게 만들 수 있습니다.

실제 사용 사례를 간단하게 만들어 보겠습니다.

Q. 15만개의 문서에서 미리 지정한 100개의 키워드가 들어가 있는지 체크를 하고 싶어졌습니다.

문서는 NSMC 를 사용하고 키워드는 임의의 단어 100개를 골랐습니다.

['여성', '케이스', '남성', '세트', ... '귀걸이', '핑크', '램프', '야외']

문서마다 해당 단어가 들어가는지 체크하는 배열을 만들어 보겠습니다.

이 문제는 간단한 코드로 해결할 수 있지만 검색해야 하는 corpus 의 크기가 매우 방대하기 때문에 병렬처리가 꼭 필요합니다. (10억개의 문서, 1000만개의 키워드라면..?)

먼저 데이터를 가지고 옵니다.

wget https://raw.githubusercontent.com/e9t/nsmc/master/ratings_train.txt
wget https://gist.githubusercontent.com/tkwoo/e56620864b02c60ff9b77bcac1e7997a/raw/54bec580b4ae62755ec33c09c39b07073ef693f3/word_list.txt

serial 로 구현하는 방법입니다.

다음은 ray.get() 을 이용한 구현 방법입니다.

ray.get() 을 ray.wait() 로 변경합니다.

ray.wait() 는 작업이 끝난 ref id 와 작업이 끝나지 않은 ref ids 를 출력합니다. loop 를 통해서 작업이 끝나지 않은 ref list 를 반복수행하면 됩니다.

실행시간은 아래와 같습니다.

+----------------+------------+------------+------------+
| process time | serial | ray.get | ray.wait |
+----------------+------------+------------+------------+
| is kwd in nsmc | 5013.975ms | 2011.628ms | 1665.469ms |
+----------------+------------+------------+------------+

ray 를 사용해서 병렬처리 하면 작업속도를 2.5배 향상 시킬 수 있고 get() 대신 wait() 를 사용하는 방법만 사용하더라도 20% 성능 향상을 기대할 수 있습니다.

Actor 로 만드는 Pytorch model inference 병렬화

시나리오 및 조건

  1. query data 는 list 형태의 pool (또는 queue)에 쌓인다.
  2. query pool 에 데이터가 쌓이는 속도가 단일 1번 gpu 처리량 보다 크다.
  3. 특정 임계치가 넘어갈 때 2번 gpu 에서 병렬로 처리하고 싶다.
  4. query pool 의 크기가 줄어들면 2번 gpu memory 를 release 한다.

Faster RCNN inference 병렬화 하기

torchvision 에서 제공하는 faster rcnn 모델을 위 시나리오에 맞춰 제작하겠습니다.

필요한 라이브러리를 불러옵니다. multi gpu 사용 확인을 위해 gpu 2개로 제한한 후 테스트 합니다.

import os
import cv2
import numpy as np
import matplotlib.pyplot as plt
from collections import defaultdict

import torch
import torchvision
import ray

os.environ['CUDA_VISIBLE_DEVICES'] = '5,6'
ray.init(num_cpus=8, num_gpus=2)

ray 를 초기화 할때 사용할 gpu 개수를 미리 지정합니다.

추론 속도를 최적화하기 위해서 model weight 를 gpu 에 캐시해두어야 합니다. 즉, stateful 한 정보가 저장되고 이 상태를 이용할 필요가 생기는데 해당 기능을 Ray Actor 가 제공합니다.

ray 에 대한 생각을 잠시 잊고 추론 클래스만 디자인 한다고 생각해보겠습니다.

class FasterrcnnInference(object):
def __init__(self, batch_size=16, target_image_shape=(400, 300, 3)):
print(torch.cuda.is_available())

self.use_cuda = False
self.batch_size = batch_size
self.target_image_shape = target_image_shape
self.confidence_value = 0.8

self.model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
if torch.cuda.is_available():
self.use_cuda = True
self.model = self.model.cuda()
self.model.eval()

...(생략)

Inference 객체는 생성될 때 model 을 state 로 가지게 됩니다. 추론 과정에서는 객체의 멤버로 캐시된 모델과 weight 를 사용해서 추론할 수 있습니다.

Task 와 비슷하게 state model 을 ray actor 로 바꾸는 것은 매우 간단합니다. 아래 예시를 보겠습니다.

@ray.remote(num_gpus=1)
class GPUActor(object):
def __init__(self, batch_size=16, target_image_shape=(400, 300, 3)):
print(torch.cuda.is_available())

self.use_cuda = False
self.batch_size = batch_size
self.target_image_shape = target_image_shape
self.confidence_value = 0.8

self.model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
if torch.cuda.is_available():
self.use_cuda = True
self.model = self.model.cuda()
self.model.eval()

...(생략)

@ray.remote(num_gpus=1) 을 추가한 것 만으로 actor 로 만들 수 있습니다.

클래스의 나머지 함수도 작성하면 아래와 같이 만들 수 있습니다.

@ray.remote(num_gpus=1)
class GPUActor(object):
def __init__(self, batch_size=16, target_image_shape=(400, 300, 3)):
print(torch.cuda.is_available())

self.use_cuda = False
self.batch_size = batch_size
self.target_image_shape = target_image_shape
self.confidence_value = 0.8

self.model = torchvision.models.detection.fasterrcnn_resnet50_fpn(pretrained=True)
if torch.cuda.is_available():
self.use_cuda = True
self.model = self.model.cuda()
self.model.eval()
def _read_img_with_preprocess(self, f):
img = cv2.imread(f, 1)
img = cv2.resize(img, self.target_image_shape[:2])
img = cv2.cvtColor(img, cv2.COLOR_BGR2RGB)
img = img.astype(np.float32) / 255.

return img

def preprocess(self, list_filepath):
if len(list_filepath) > self.batch_size:
np_input = np.zeros((self.batch_size, 300, 400, 3), np.float32)
else:
np_input = np.zeros((len(list_filepath), 300, 400, 3), np.float32)

for idx in range(np_input.shape[0]):
np_img = self._read_img_with_preprocess(list_filepath[idx])
np_input[idx] = np_img

return torch.from_numpy(np_input.transpose(0,3,1,2))

def predict(self, pt_input):
x = pt_input
if self.use_cuda:
x = x.cuda()
pred = self.model(x)

list_pred_valid = []
for i in range(x.size()[0]):
bbox = pred[i]['boxes'].cpu().detach().numpy()
labels = pred[i]['labels'].cpu().detach().numpy()
scores = pred[i]['scores'].cpu().detach().numpy()

dict_result = defaultdict(list)
for j in range(len(bbox)):
if scores[j] < self.confidence_value: continue
dict_result['boxes'] += [bbox[j]]
dict_result['labels'] += [labels[j]]
dict_result['scores'] += [scores[j]]
list_pred_valid.append(dict_result)
#print (dict_result)
return list_pred_valid

def release_cuda_memory(self):
# [중요] pytorch에서 점유하고 있던 해당 gpu 메모리(캐시된 메모리)를
# 해제하는 기능을 아래 방법으로 제공함
torch.cuda.empty_cache()
return 1

def print_gpu_ids(self):
# gpu id 확인용 테스트 코드
return "This actor is allowed to use GPUs {}.".format(ray.get_gpu_ids())
  • Actor 에는 상태값 저장이 가능하므로 생성자에서 model을 load 해둡니다. (실제 작업에서 모델 자체의 크기가 매우 큰 경우라면 모델 생성 method 를 새로 관리하는 함수가 필요할 수 있습니다.)
  • 모델 설명 : faster-rcnn 으로 object detection 을 수행하는 모델입니다.
    (테스트를 위해 일부러 약간 크고 복잡한 모델로 지정)
  • preprocess 도 ray 단에서 병렬화 수행 가능하도록 actor method 로 제작합니다.
    preprocess 는 아래 기능을 제공합니다
    *batch size 개의 파일명 리스트 로 부터 파일 읽기
    * image channel 변경 & normalize
    * numpy batch 생성 & torch.tensor 로 변환
  • preprocess 에서 생성된 torch.tensor 를 predict() 함수 입력으로 사용합니다.
    * predict 에서 confidence value 이상의 값만 출력하는 후처리를 포함하고 있습니다.

Object detection actor 사용하기

  1. ray 에 할당된 gpu 수 만큼 actor 를 정의합니다.
actors = [GPUActor.remote() for _ in range(2)]

actors 는 실제 class object 가 아닌 ray actor 에 대한 id 값이라는 기억하면 좋겠습니다. 데이터는 ray GCS table 에 존재합니다.

actors 생성과 동시에 2개의 actor 가 state 로 각각의 모델을 가지고 있기 때문에 모델 크기만큼 gpu 메모리를 점유합니다.

테스트를 위해 gpu 가 잘 할당 되었는지 확인해봅시다.

print(ray.get([act.print_gpu_ids.remote() for act in actors]))
  • 출력결과
["This actor is allowed to use GPUs ['6'].", "This actor is allowed to use GPUs ['5']."]

actor 마다 gpu 1개씩 잘 할당된 것을 볼 수 있습니다.

2. pool 에 32개의 test 이미지가 쌓였습니다.
* actor 1개당 16개의 batch 를 처리할 수 있다고 가정합니다.

list_filepath = ['./test.jpg'] * 32
batchsize = 16

3. 2개 actor 를 모두 사용해야하는 양이라고 가정하고 각 actor 에서 처리하는 코드를 수행합니다.

preprocessed_ids = [actor.preprocess.remote(list_filepath[batchsize*i:batchsize*(i+1)]) for i, actor in enumerate(actors)]
pred_ids = [actor.predict.remote(preprocessed_id) for preprocessed_id, actor in zip(preprocessed_ids, actors)]

Computational graph 가 그려지고 actor 가 동작시켰기 때문에 입력크기 만큼 gpu 메모리 점유가 일어납니다.

파이썬 객체로 가지고 와서 결과를 확인해봅니다.

pred = ray.get(pred_ids)

print(len(pred)) # 2
print(len(pred[0])) # 16
print(len(pred[0][0]) # 3
print(pred[0][0])
'''
defaultdict(<class 'list'>, {'boxes': [array([ 33.787403, 85.52091 , 355.84128 , 277.0531 ], dtype=float32), array([ 46.00315 , 1.179451, 126.538864, 104.60486 ], dtype=float32), array([236.96844 , 48.664734, 386.70996 , 158.20596 ], dtype=float32), array([263.8288 , 34.89019 , 286.16147 , 57.371853], dtype=float32), array([234.1952 , 54.04161, 390.54178, 78.19034], dtype=float32)], 'labels': [2, 64, 67, 62, 67], 'scores': [0.9975768, 0.9889059, 0.8583329, 0.83268917, 0.80249286]})
'''

잘 수행된 것을 볼 수 있습니다.

4. gpu 메모리 해제하기

pytorch 에서는 사용한 gpu 메모리를 cache 해 두고 있기 때문에 사용하지 않을 때는 cache 메모리를 해제해야 효율적으로 운용할 수 있습니다. prediction 후 현재까지는 gpu 메모리가 점유되어 있는 상태입니다.

32개 pool 을 모두 처리하고 actor 전체를 사용할 필요가 없어지면 actor 중 일부의 cache memory 를 release 합니다.

ray.get(actors[1].release_cuda_memory.remote())
# 1 출력
  • model weight 를 제외한 현재 사용하고 있지 않은 입력 크기만큼 메모리가 해제 됩니다. actor 설계시 actor 순서와 gpu id 를 잘 매칭 시켜두면 원하는 gpu 만 해제할 수 있습니다. (지금은 안되어 있어서 actor1 과 actor2 선택이 명시적으로 드러나 있지 않습니다.)

5. pool 에 다시 대량의 데이터가 쌓인다면?

i) pool size 가 16개 수준이라면 actors[1] 만을 이용해서 3번을 돌리면 됩니다.
ii) 32개가 넘는 수준이라면 actors 전체를 이용하면 됩니다.

list_filepath = ['./test.jpg'] * 32
batchsize = 16

preprocessed_ids = [actor.preprocess.remote(list_filepath[batchsize*i:batchsize*(i+1)]) for i, actor in enumerate(actors)]
pred_ids = [actor.predict.remote(preprocessed_id) for preprocessed_id, actor in zip(preprocessed_ids, actors)]

pred = ray.get(pred_ids)

Ray actor 의 lifetime

모델 관리를 위해 actor 를 사용하기 때문에 actor 의 lifetime 에 대해 숙지할 필요가 있습니다. 일반적으로 lifetime 을 지정하지 않는 경우 driver 가 실행중인 동안은 actor lifetime 이 유지됩니다.

job 과 별개로 분리하여 actor를 관리하고 싶다면,

actor1 = Actor.options(name="CounterActor", lifetime="detached").remote()

처럼 lifetime 을 명시적으로 지정해서 job exit 후에도 actor 를 영구 보존할 수 있고

actor1 = ray.get_actor("CounterActor")
print(ray.get(actor1.get_counter.remote()))

get_actor() 를 통해 보존된 actor 를 다시 driver 로 가지고 올 수도 있습니다.

일반적으로 job 종료와 함께 actor 도 종료되지만
수동으로 조절해야할 경우,

  1. actor handle (actor_id) 가 범위를 벗어나는 경우
    actor = Actor.remote() 후 actor = 'others'
  2. 강제종료가 필요한 경우 (일반적으로 garbage collected 되지만..)
    ray.actor.exit_actor() -> ray.get() 이 RayActorError 를 일으킨 경우
    ray.kill(actor_id)

를 사용해서 actor 를 종료할 수 있습니다. 예외로 인한 강제종료시 cached gpu memory 를 해제한 후 actor 종료를 수행하는 것이 좋습니다.

마치며

지금까지 ray 를 사용한 전처리 방법과 모델 추론 방법에 대해 살펴보았습니다.

Ray 는 multiprocessing 과 다르게 데코레이터와 remote() 의 조합만으로 serial 코드의 구조를 유사하게 사용할 수 있는 장점이 있어 유지보수가 편한 것을 확인할 수 있었습니다.

apache arrow, plasma object store 를 사용할 수 있어 성능적으로도 이점이 있고 multiprocessing 에서 구현하기 힘든 state 를 쉽게 다룰 수 있어 gpu 모델 추론을 간단하게 만들 수 있는 것도 큰 장점이라고 생각합니다.

딥러닝 추론 모델을 병렬화하기 위해서는 batch_size * num_ray_gpu 만큼 queue를 분배하는 컨트롤러의 구현할 필요가 있습니다. 데이터만 적절히 분배된다면 각 actor 에 할당된 gpu 에서 추론과 메모리 관리까지 쉽게 다룰 수 있는 방법을 다뤄보았습니다.

Actor 를 관리하는 actor 를 만들 수도 있기 때문에 ray 의 응용분야는 더 넓다고 생각합니다. 병렬화로 고통 받고 있다면 ray 를 적용해보는 것을 적극 추천 드립니다.

참고자료

--

--