Ray: 확장 가능한 고성능 분산/병렬 Machine Learning 프레임워크

Riiid Teamblog
Riiid Teamblog KR
Published in
18 min readDec 17, 2020

By 김완수

김완수 님은 Riiid의 ML Engineer로 AIOps Squad에서 AI infrastructure(GPU Cluster) 업무를 담당하고 있습니다.

확장가능한 애플리케이션과 시스템은 현대 소프트웨어 엔지니어링에서 중요한 과제이며, 그것은 머신러닝 엔지니어링에도 당연히 적용되는 말입니다. 또한, 모델이 커지고 확장 가능한 애플리케이션을 클라우드 서비스를 통해서 구현 가능하게 되면서, 하나의 거대한 모델을 다수의 노드에 분산시켜 단일 노드의 리소스 한계를 극복하려고 하거나, 병렬적인 실행으로 빠른 연구 주기를 만들어야하는 필요는 머신러닝 엔지니어링 분야에서 중요한 이슈로 자리잡고 있습니다.

Ray는 머신러닝 엔지니어링에서 필요한 확장가능한 시스템을 구축할 수 있도록 하며, 효과적이고 단순하게 분산/병렬 컴퓨팅을 수행할 수 있도록 만든 매우 강력한 프레임워크입니다. 이러한 강점으로 아직 정확한 체계가 잡히지 못한 머신러닝 엔지니어링이라는 분야의 초석이 되기에 부족함이 없는 강력한 도구입니다.

Riiid!AIOps Squad에서는 Ray를 Research GPU Cluster, Scalable AutoML System과 같은 크고 넓은 영역에 적극적으로 활용하고 있습니다.

Introduction

기본적으로 Ray는 분산/병렬 애플리케이션을 쉽게 구축하기 위한 프레임워크로 설계 되었습니다. Ray를 사용하는 개발자들은 기존의 코드 변경을 거의 하지 않고 코드를 쉽게 분산/병렬화 시킬 수 있습니다.

기존에 여러분들은 특정한 작업을 병렬 컴퓨팅을 이용하여 최적화를 시도할 때 Python STL인 multiprocessing 라이브러리를 사용해본 경험이 있을겁니다. 기존의 순차적인 코드를 multiprocessing으로 바꾸기 위해서는 병렬 컴퓨팅에 익숙하지 않으신 분이라면 생소할만한 Pool과 같은 개념들로 기존의 코드를 변경해야 했습니다.

Ray는 이런 분산/병렬 처리를 위한 특정한 코드 형태로 기존의 순차적 코드를 바꿀 필요가 없으며, 기존의 순차 코드 베이스에서 몇줄의 코드만 추가하여 병렬 처리를 정말 쉽게 수행할 수 있습니다. 간단한 예시를 살펴보겠습니다. 아래 코드는 간단히 List 데이터의 요소들에 10을 곱하는 간단한 예시입니다. 순차적인 코드를 multiprocessingPool을 사용했을 때는 기존 반복 구조를 map을 이용하는 형태로 변경해야합니다. 즉, 기존의 코드의 구조를 변경하는 것이 불가피합니다. 반면, Ray를 사용했을 경우에는 처리할 함수에 @ray.remote라는 Decorator를 추가하고 함수를 호출할 때 .remote()라는 메소드를 통해 호출하는 부분이 추가되었습니다.

즉, Ray를 사용하면 순차적인 코드를 짜듯이 병렬 컴퓨팅을 쉽게 수행할 수 있습니다.

Ray는 위 예시와 같은 Syntax Sugar 뿐 아니라 벤치마크를 보면, 대부분의 경우에서 Core 수가 증가함에 따라서 압도적으로 성능이 향상하는 것을 볼 수 있습니다. 이 성능 차이는 프로세스간의 객체 전달을 위한 과정에서 발생합니다. multiprocessing은 큰 데이터를 다른 프로세스에 전달할 때 pickle을 사용하여 직렬화 한 뒤 전달합니다. 하지만, 이 접근 방식을 사용하기 위해서는 모든 프로세스가 데이터에 대한 복사본을 만들어야하며, 큰 메모리를 할당하고 역직렬화에서 발생하는 큰 오버헤드를 가질 수 밖에 없습니다. Ray는 이 문제를 해결하기 직렬화 오버헤드가 적은 Apache Arrow를 사용하여 Zero-Copy 직렬화를 수행합니다. 또한, 직렬화된 데이터를 In-Memory Object Store인 Plasma를 이용해 직렬화된 데이터를 빠르게 공유할 수 있습니다.

History

저는 Ray 덕후로서 Ray의 멋진 역사에 대해서 이야기하지 않고 넘어갈 수 없습니다. Ray는 UC Berkeley의 RISELab에서 개발된 프로젝트입니다. RISELab은 개인적으로 21세기 가장 위대한 Computer Scientist라고 생각하는 Apache Spark의 창시자인 Ion Stoica 교수의 연구실입니다. 이 연구실에서 2018년에 Ray: A distributed framework for emerging AI applications이라는 백서를 출판하였고, 이 백서는 이 포스트에서 소개드릴 Ray의 시초가 되었습니다.

이 백서가 출판된 이후, Ray Core를 기반으로 2020년 최고의 하이퍼파라미터 최적화 라이브러리라고 할 수 있는 Tune과 Scalable Ray를 위한 Ray Cluster, 강력한 High-Level 강화학습 라이브러리인 RLlib, 여러 Major Deep Learning Framework에 쉽게 확장 가능한 분산 딥러닝 라이브러리인 RaySGD 등, 다양하고 강력한 머신러닝 엔지니어링을 위한 도구들이 통합되고 있습니다.

이후, OpenSource 생태계에서 빠르게 성장하던 Ray의 핵심 개발자들은 UC Berkeley의 박사 과정 학생인 Robert Nishihara를 중심으로 하여 Ion Stoica 교수와 Michael I. Jordan 교수가 Co-Founder로 참여한 Anyscale이라는 회사를 설립하여 빠르게 성장하고 있습니다.

Why Ray?

위에서도 언급했었지만 머신러닝 엔지니어링에서 쉽게 확장할 수 있고, 쉽게 분산시킬 수 있고, 쉽게 병렬처리 할 수 있는 시스템은 매우 중요한 요소입니다. 즉, 어떤 형태의 애플리케이션과 클러스터 시스템이던지 High-Level에서 쉽게 분산시키고 병렬적으로 실행시킬 수 있는 시스템을 구축할 수 있다는 것은 매우 큰 강점입니다. Ray는 이 모든 것을 Pythonic을 유지시키며 가능하게 합니다.

Robert Nishihara의 Modern Parallel and Distributed Python: A Quick Tutorial on Ray를 인용하자면 분산/병렬 시스템에는 아래와 같은 요구사항들을 필요로하며 Ray는 이 요구사항들을 강력하고 쉬운 방법으로 풀 수 있도록 도와줍니다.

  1. 다수의 컴퓨터에서 동일한 코드로 실행시킬 수 있어야 한다.
  2. Stateful하고 통신 가능한 Microservice 및 Actor를 구축할 수 있어야 한다.
  3. 기계 고장 및 시스템 고장을 훌륭하게 처리할 수 있어야 한다.
  4. 거대한 데이터와 수치 데이터를 효율적으로 처리할 수 있어야 한다.

사실 위의 요구사항들은 Ray Core라는 Ray의 분산/병렬 시스템에 대한 장점들이며, Ray를 그냥 단순히 조금 더 편한 Multi-Processing 라이브러리가 아닌 강력한 차세대 머신러닝 엔지니어링의 De-Facto가 될 수 있다고 생각하게 만드는 진짜 이유는 Major Machine Learning Framework들과 강력하게 통합된 Ray Echosystem들 입니다.

Ray Echosystem에는 정말 강력한 다양한 도구들이 통합되어 있으나, 이 포스팅에서는 RaySGD와 Tune에 대해서만 소개하도록 하겠습니다.

Ray Core

Ray Core의 백서에서는 Ray의 목표를 분산/병렬 컴퓨팅을 위한 General API를 만드는 것을 목표로 한다고 말하며, 이 목표를 달성하기 위해서 Ray는 간단하지만 일반적인 추상화 수준의 API를 제공 해야한다고 말합니다. 즉, Ray를 사용하는 개발자들은 기존에 Python 코드를 짜듯이 로직을 구현하면, 실제 분산/병렬 컴퓨팅을 위한 실제 복잡한 물리적인 이슈들은 Ray가 맡아서 처리합니다.

이 때문에 Ray를 사용하는 개발자들은 분산/병렬 시스템에 대해서 깊게 공부할 필요가 없고, 기존의 Python Base의 시스템에도 쉽게 적용 가능 합니다.

Ray Core를 이해하기 위해서 빼놓고 이야기 할 수 없는 핵심 요소 3가지가 있습니다.

Task — 호출자와 다른 프로세스에서 실행되는 Single Function Call입니다. Task는 Stateless한 Function 혹은 Stateful한 Class의 Method일 수 있습니다. Task는 호출자와는 비동기적으로 실행되며, Ray에서 Task를 만들기 위해서는 Remote Function이라고 불리는 함수 혹은 메소드에서 .remote()를 통해서 호출 할 수 있으며, .remote()는 즉시 ObjectRef라는 값을 반환합니다.

Object — Object는 Task에서 반환되는 실제 결과값이거나 ray.put()을 통해서 생성되는 값입니다. Object는 Ray 시스템의 Object Store에 저장되며 불변 데이터라는 특징이 있습니다. 그리고 Ray를 사용하는 개발자들은 이 Object를 Task 혹은 ray.put()이 반환하는 ObjectRef를 통해서 참조할 수 있습니다.

Actor — Actor는 Stateful한 워커 프로세스입니다. Ray에서는 Python Class를 @ray.remote()를 통해서 Actor Class로 만들 수 있으며, 이 Actor Class의 메소드 호출은 Stateful Task가 됩니다.

위 핵심 요소들을 쉽게 이해하기 위해서 간략한 설명을 붙이자면, Python Class가 있는데 그 Class를 @ray.remote를 통해서 Actor를 만들 수 있고, 그 Actor Class의 메소드 호출을 Stateful Task(Actor의 Method가 아닌 Remote Function Call은 Stateless Task)라고 불리며 Task는 실제 결과인 Object의 위치를 가리키는 ObjectRef를 즉시 반환합니다.

위 개념들이 적용된 간단한 Example을 보겠습니다.

우선, Ray를 시작하기 위해서는 import ray로 Ray를 Import하고 ray.init()을 통해 Ray Cluster를 실행합니다. ray.init()에 추가적인 인자를 전달하지 않을 경우 Cluster는 모든 CPU Core를 활용합니다.

위에서 소개된 핵심 요소중 하나인 Task는 Stateful Task와 Stateless Task로 구분됩니다. Stateful Task는 Class의 메소드이며 Stateless Task는 일반적인 Single Function이라고 생각하시면 됩니다. Ray에서는 Task를 만드는 Function/Method의 구현체를 Remote Function이라고 부릅니다. Ray에서는 이러한 Remote Function을 단순히 @ray.remote라는 Decorator로 쉽게 만들 수 있습니다. 또한 Stateful Task를 위한 Actor도 동일한 방법으로 만들 수 있습니다.

우리는 이 Remote Function을 호출시키기 위해서 아래와 같이 함수 호출 시 .remote()를 붙여야 합니다. 그러면 Task가 실행되며 .remote()는 즉시 ObjectRef를 반환합니다. ObjectRef를 통해서 실제 Object를 얻기 위해서는 ray.get()을 사용할 수 있습니다. ray.get()ObjectRef를 인자로 받아서 Object Store에 존재하는 실제 Object를 반환합니다.

각각의 Task는 모두 비동기적으로 실행되며 기본적으로 Local System의 CPU Core 수 만큼 동작하고 있는 워커에게 자동으로 할당되고 호출자와는 다른 프로세스에서 동작하기 때문에 기본적으로 Multi-Processing을 구현할 수 있으며, 아래와 같은 코드로 간단하게 병렬화 시킬 수 있습니다.

Tune

2020년 현재 많은 HyperParameter Optimization Library가 존재합니다. 제가 Maintainer로 활동하고 있는 HyperOpt를 포함해 Optuna, NerverGrad와 같은 다양한 도구들이 있으나 가장 강력한 HPO 라이브러리는 Tune이라고 확신합니다. 어떻게 보면 HyperOpt, Optuna, NeverGrad와 같은 도구들과 Tune과의 비교는 조금 어려울 수 있습니다. 그 이유는 HyperOpt와 Optuna와 같은 라이브러리들은 비교적 Low-Level HPO 알고리즘의 구현체라고 볼 수 있으며, Tune은 High-Level의 HPO 시스템이라고 볼 수 있습니다. 실제로도 Tune에서 실제 HPO 알고리즘이 직접 개발된 것은 많지 않으며, 많은 알고리즘들을 HyperOpt 혹은 Optuna의 알고리즘들을 내부적으로 Import하여 사용하고 있습니다.

즉, 기존에 산발되어 있던 HPO 알고리즘들을 통합하여 다양한 최적화 알고리즘과 스케쥴러들을 Ray Core위에서 동작시켜 분산/병렬화가 가능한 HPO 시스템을 쉽게 구축할 수 있도록 만든 것입니다. 또한 TensorFlow, PyTorch와 같은 Major Framework 뿐 아니라 PyTorch Lightning과 같은 Third Party Framework에도 쉽게 통합이 가능합니다. 다양한 Framework에 대한 모든 설명은 모두 다루기 어려우므로, 이 포스팅에서는 공식 Tutorial에 포함된 예제를 통해 살펴 보겠습니다.

우선, from ray import tune을 하고 최적화할 목적 함수를 구현합니다. 실제로 ML Task에 적용하기 위해서는 이런 방정식이 아닌 ML Model에 목적함수가 될 것입니다.

Tune을 실행시키기 위해서는 Tune의 Trainable 객체 혹은 순수 Python 함수가 필요합니다. 여기서는 Trainable 객체 대신 순수 파이썬 함수을 Trainable로 사용하겠습니다. 여기서 Python Function은 config라는 인자를 받으며, 이 인자에는 우리가 최적화할 HyperParameter들이 포함된 dictionary입니다. 또한, 우리는 최적화 알고리즘을 동작시키거나 스케쥴러로 학습을 끝내는 동작을 시키기 위해서 정확도와 같은 핵심 성능을 지속적으로 Tune에게 보고해줘야합니다. 이를 위해서 Epoch 혹은 Step과 같은 특정 주기마다 tune.report()에 성능을 보고합니다.

마지막으로, 최적의 HyperParameter 집합을 탐색하기 위해서 tune.run()을 통해 탐색을 시작합니다. 여기서, tune.run()config 인자에는 우리가 탐색할 HyperParameter의 범위가 지정된 Search Space를 입력해서 전달합니다. 해당 예제에서는 어떤 알고리즘 혹은 스케쥴러도 제공하지 않았으므로 Random Search를 수행합니다. 다만, Tune에는 TPE, PBT 같은 강력한 Baysian Optimizer나 HyperBand와 같은 스케쥴러도 제공하므로 더 효율적으로 HyperParameter 탐색을하기 위해서 여러 전략을 선택할 수 있습니다.

tune.run()을 실행하면 training_function 들은 Trial이라는 객체로써 자동으로 할당된 워커에서 동작하게 됩니다. Tune을 강력하게 만드는 무기중 하나는 Ray Core의 능력을 적극 활용한다는 점으로써 활용 가능한 리소스에서 Trial을 실행시킵니다. 이런 리소스 할당 관리는 TrialScheduler라는 객체가 책임지고 수행합니다.

만약, 우리가 여러대의 서버를 사용하고 있다면 우리는 단순히 모든 서버를 묶은 Cluster를 만들고 위 코드로 Search를 수행하면 모든 Server에서 병렬적으로 동시에 다양한 HyperParameter 탐색을 수행하게 될 것입니다.

RaySGD

RaySGD는 Distributed Deep Learning을 위한 Ray 기반의 Library입니다. 이 또한 TensorFlow, PyTorch와 같은 Framework에 쉽게 통합 가능하며, Multi GPU 뿐만이 아닌 Multi Node 학습도 코드 변경 없이 수행할 수 있습니다.

또한, Ray Echosystem에 포함된 Tune 혹은 RLlib, RayServe와도 쉽게 통합되며 DDP 혹은 DP를 쉽게 사용하여 더 큰 모델을 다수의 System에서 쉽게 동작시킬 수 있습니다. 이 포스트에서는 PyTorch를 기준으로 RaySGD를 설명하겠습니다.

RaySGD를 통해 모델을 학습시키기 위해서는 TrainingOperator라는 Abstract Class를 정의해야합니다. Abstract Class는 Train과 Validation 과정에 필요한 구성들을 포함하고 있으며, PyTorch 모델을 등록하고 Train / Validation Loop와 Initialize를 정의할 수 있습니다. TrainingOperator의 self.register()self.register_data()를 통해서 사용할 Model과 Optimizer, Loss Object, DataLoader를 등록 합니다. 이 TrainingOperator는 실제 학습을 담당하는 객체 아닌 학습에 필요한 로직과 데이터를 관리하는 역할을 수행하는 Wrapper입니다.

정의된 TrainingOperator를 기반으로 실제 학습을 수행하는 것은 TorchTrainer이며 TorchTrainer는 Ray Cluster위에 등록된 리소스를 활용하여 학습을 수행합니다. 기본적으로 다수의 GPU가 CUDA_VISIBLE_DEVICES에 등록되어 있을 경우 DDP를 기본으로 수행하며, wrap_ddp 인자를 통해서 원하지 않을 때는 DPP 활성화를 끌 수 있습니다. 즉, DDP를 위해서 어떤 추가적인 개발 작업을 필요로하지 않으며 동일한 CodeBase을 이용하여 매우 쉽게 Multi-GPU 더 나아가 Multi-Node 학습을 수행할 수 있습니다. 즉, 가장 앞에서 언급된 Ray의 핵심 목표중 하나인 다수의 컴퓨터에서 동일한 코드로 실행시킬 수 있어야 한다라는 필요 사항에 걸맞는 기능이라 볼 수 있습니다.

만약, 여러분이 RaySGD로 만들어진 Model을 Tune을 이용해 HyperParameter 검색을 수행하고 싶으시다면 .as_trainable() 메소드를 통해서 쉽게 tune.run()에서 필요로하는 인자인 Trainable 객체를 생성할 수 있으며 만약 기존 PyTorch Lightning을 기반으로 개발을 하고 계셨다면 TrainingOperator.from_ptl(MyLightningModule)을 통해 TrainingOperator로 변환할 수 있습니다.

Conclusion

소개드린 Ray Core, Tune, RaySGD 뿐만 아니라 Ray Echosystem에는 다양한 ML Task를 위한 도구가 포함되어 있습니다. Ray는 OpenSource 커뮤니티에서 굉장히 빠른 속도로 성장하고 있으며, 머신러닝 엔지니어라면 생각할만한 모든 것을 할 수 있는 강력한 프레임워크입니다. 이 강력한 도구에 대해서 더 알아보고 더 깊게 알아보고 싶으시다면 Ray 공식 문서와 백서를 참고하시면 많은 도움이 될 것입니다.

Reference

[1] https://docs.ray.io

[2] https://towardsdatascience.com/modern-parallel-and-distributed-python-a-quick-tutorial-on-ray-99f8d70369b8

[3] https://docs.ray.io/en/master/whitepaper.html

[4] https://docs.ray.io/en/master/raysgd/raysgd.html#sgd-index

[5] https://docs.ray.io/en/master/tune/index.html

[6] https://towardsdatascience.com/10x-faster-parallel-python-without-python-multiprocessing-e5017c93cce1#:~:text=Ray is designed for scalability,has preliminary support for Java.

--

--

Riiid Teamblog
Riiid Teamblog KR

교육 현장에서 실제 학습 효과를 입증하고 그 영향력을 확대하고 있는 뤼이드의 AI 기술 연구, 엔지니어링, 이를 가장 효율적으로 비즈니스화 하는 AIOps 및 개발 문화 등에 대한 실질적인 이야기를 나눕니다.