Spark Internal Part 2. Spark의 메모리 관리(2)

Unified Memory Management in Spark 1.6(1)

이용환
16 min readOct 7, 2018

이 글은 Andrew Or 와 Josh Rosen 두 분께서 Apache Spark Jira [SPARK-10000] Consolidate storage and execution memory management 에 첨부한 문서를 번역한 글입니다.

해당 문서는 Spark 1.5 이하 버전에서의 메모리 관리 방식과 단점에 대해 설명한 후(Existing memory management), 이를 해결하기 위해 도입할 예정인 Unified Memory Manager의 설계(Design)와 구현(Implementation)으로 전개됩니다.

문서 자체가 10 페이지가 넘기 때문에 하나의 글로 작성하기 보단 Spark 1.5 이하 버전과 이후 버전을 나누어 번역하려 합니다.

이 글에서는 Existing memory management(1.5버전 이하에서 사용되는) 내용에 대해 서술하고, Unrolling이라는 과정에 대해 코드 분석을 진행해 보았습니다.

Unrolling에 대해 다룬 글이 별로 없고, 몇몇 글에서 언급한 내용과 문서의 내용이 일치하지 않아 최대한 코드로만 접근해보았습니다.

Overview

Spark에서의 메모리 사용은 크게 2개(Execution, Storage)로 나눌 수 있습니다.

Execution 메모리는 Shuffle, Join, Sort, Aggregation에서 사용되는 메모리를 지칭하며, Storage 메모리는 Caching이나 클러스터 전반에 내부 데이터를 전파하는데에 사용되는 메모리를 지칭합니다.

Spark 1.5를 포함한 하위 버전에서는 위 2개의 메모리 공간이 정적으로 구성되었기 때문에, 서로의 공간에 할당된 메모리 공간을 가져와 사용할 수 없었습니다. 이러한 엄격한 분리로 인해 몇몇 제약사항들이 발생했습니다.

  • 모든 Workload에 사용할만한 적절한 기본값이 없다.
  • Memory Fraction을 튜닝할 때 사용자가 내부 구조를 잘 알고 있어야 한다.
  • Cache를 사용하지 않는 프로그램의 경우 사용가능한 메모리 중 매우 적은 영역만을 사용한다.

이 글은 기존의 메모리 영역을 통합하여 이러한 제약사항을 없애는 것이 목표입니다.

최종 결과물은 성능 개선과 사용자가 최적의 메모리 사용량을 튜닝하는 필요성을 없애는 것입니다. 또한, 메모리 할당을 더이상 어플리케이션 단위로 고정 설정하지 않기 때문에 단일 어플리케이션이 과도한 Spilling 없이도 여러 종류의 Workload를 지원할 수 있어야 합니다.

Existing memory management

Spark의 기존 메모리 관리는 정적인 메모리 분할(Memory fraction)을 통해 구조화 되어 있습니다.

메모리 공간은 3개의 영역으로 분리되어 있습니다. 각 영역의 크기는 JVM Heap 크기를 Spark Configuration에 설정된 고정 비율로 나누어 정해집니다.

  • Execution: 이 영역은 Shuffle, Join, Sort, Aggregation 등을 수행할 때의 중간 데이터를 버퍼링하는데에 사용됩니다. 이 영역의 크기는 spark.shuffle.memoryFraction(기본값: 0.2)를 통해 설정됩니다.
  • Storage: 이 영역은 주로 추후에 다시 사용하기 위한 데이터 블록들을 Caching하기 위한 용도로 사용되며, Torrent Broadcast(?)나 큰 사이즈의 Task 결과를 전송하기 위해서도 사용됩니다. 이 영역의 크기는 spark.storage.memoryFraction(기본값: 0.6)을 통해 설정됩니다.
  • Other: 나머지 메모리 공간은 주로 사용자 코드에서 할당되는 데이터나 Spark에서 내부적으로 사용하는 메타데이터를 저장하기 위해 사용됩니다. 이 영역은 관리되지 않는 공간이기 때문에 더이상 언급하지 않을 것이며, 기본값은 0.2입니다.

각 영역 메모리에 상주된 데이터들은 자신이 위치한 메모리 영역이 가득 찬다면 Disk로 Spill됩니다. Storage 영역의 경우 Cache된 데이터는 전부 Drop되게 됩니다. 모든 경우에서 데이터 Drop이 발생하면 I/O 증가 혹은 Recomputation으로 인한 성능 저하가 나타나게 됩니다.

Memory fraction breakdown

잠재적인 OOM(Out of memory error)을 피하기 위해 Spark의 메모리 관리는 예상치 못한 큰 Item이 데이터 내에 들어오는 것과 같은 상황에 ‘매우’ 유의합니다. 이러한 이유때문에 각 영역에서는 Safety Fraction영역을 Data Skew를 위한 추가 버퍼로 제공합니다.

-> 내가 알고 있었던 Data Skew는 데이터가 특정 노드로 편중되는 현상이고 단순 번역을 해 보아도 동일한 뜻으로 해석되는데, 구글 번역에 넣고 돌려보면 위 문장에서의 Data Skew는 ‘데이터 분출’이라고 해석된다. 사실 ‘데이터 분출’이 더 맥락에 맞긴한데 정확히 해석되지 않기 때문에 원문 표현을 그대로 가져다 쓴다.

이는 Spill이 발생하는 기본 제한이 전체 Heap 공간 중 16%(spark.shuffle.memoryFraction * spark.shuffle.safetyFraction)밖에 되지 않는다는 것을 의미합니다. 어떠한 데이터도 Cache하지 않는 어플리케이션들은 Heap 공간을 낭비하게 됩니다. 이 과정에서 작업들은 중간 데이터들을 불필요하게 Disk에 Spill한 후 즉시 읽어들입니다. Unroll Fraction에 대해서는 다른 Section에서 언급할 예정입니다.

Execution memory management

Execution 메모리는 JVM에서 실행되는 Active Task 작업들이 나누어 가집니다. 위의 영역들에 정적으로 메모리가 할당되는 것과 달리 Task들에게 메모리를 할당하는 것은 동적입니다.

Spark은 각 Slot에 고정된 Chunk를 할당하는 방법을 사용하지 않습니다. Spark은 동일한 JVM에서 동작하고 있는 Active Task가 없는 경우 하나의 Task에 사용가능한 모든 Execution 메모리를 할당하기도 합니다.

이러한 동작을 할 수 있도록 몇몇 메모리 매니저들이 존재합니다.

  • ShuffleMemoryManager: Global Accounting과 Policy Enforcement를 담당합니다. ShuffleMemoryManager는 얼마나 많은 메모리를 Task에게 할당할지를 결정하는 중앙 중재자입니다. JVM 당 1개씩 존재합니다.
  • TaskMemoryManager: Task 별로 메모리 할당과 Bookkeeping을 담당합니다. TaskMemoryManager는 On-Heap Block을 추적하기 위한 Page Table과 Task가 종료될 때 모든 Page가 해제되지 않는 경우 Exception을 발생시켜 Memory Leak을 추적하는 기능이 구현되어 있습니다. 내부적으로 TaskMemoryManager는 ExecutorMemoryManager를 사용하여 실제 Allocation과 Free를 수행합니다.
  • ExecutorMemoryManager: ExecutorMemoryManager는 On-Heap과 Off-Heap 메모리 할당을 담당합니다. 또한 ExecutorMemoryManager에는 Task 간에 Free된 Page 재사용을 허용하기 위한 Weak Reference Pool이 구현되어 있습니다.

이러한 메모리 매니저들은 아래와 같이 상호작용합니다.

  1. Task가 메모리에 큰 공간을 할당받기를 원한다면 ShuffleMemoryManager에게 X byte 할당을 요청합니다.
  2. ShuffleMemoryManager가 요청을 허용했다면, Task는 TaskMemoryManager에게 X byte 할당을 요청합니다.
  3. TaskMemoryManager는 요청을 수신한 후 Page Table을 업데이트합니다.
  4. ExecutorMemoryManager에게 X byte 할당을 요청합니다.
  5. ExecutorMemoryManager는 Task가 사용할 수 있도록 실제로 X byte를 할당합니다.

Memory allocation policy

각 Task는 ShuffleMemoryManager로부터 Execution 메모리의 1/N(N은 Active Task의 갯수)를 얻을 수 있습니다. 만일 할당 요청의 일부만이 받아들여졌다면, Task는 메모리에 상주해있는 데이터를 Disk로 Spill합니다.

과도한 Spill을 피하기 위해 Task는 전체 메모리의 1/(2N)을 획득할 수 있을 때까지 Spill을 수행하지 않습니다. 만일 1/(2N)을 얻을 수 없을 정도로 메모리가 부족한 경우, 메모리 할당 요청은 다른 Task가 메모리의 데이터를 Spill하여 남는 공간을 공유하기 전까지 Block 됩니다.

예시

  • Executor가 시작된 후 Task A가 먼저 실행됨
  • Task A는 다른 Task들이 존재하지 않기 때문에 전체 Execution 메모리를 할당받음
  • 두번째 Task인 Task B가 실행됨. N이 2가 되었기 때문에, B는 Execution 메모리의 1/4(=1/2N)을 획득하기 전까지 Block 됨
  • Task A가 메모리에 상주하고 있던 데이터를 Spill 하여 Execution 메모리의 1/4가 확보되는 경우 Task B가 실행되고, 이후에는 두 작업 모두 Spill이 가능해짐

Note

Task A는 메모리 매니저로부터 메모리 할당을 받는데에 실패할 때까지(더이상 할당해줄 메모리가 없을 때까지) Spill을 수행하지 않습니다. 그동안에는 새로운 Task들은 이미 동작 중인 Task들이 모든 메모리를 점유하고 있기 때문에 Starvation 상태에 빠지게 됩니다. 이것은 Spilling mechanism을 강제하는 방식으로 해결할 수 있지만 이 문서의 범위에서 벗어나 있습니다.

Storage memory management

Storage 영역은 BlockManager에 의해 관리된다. 주 사용 목적은 RDD Partition을 Caching하는 것이지만, Torrent broadcast나 Driver로 대규모 작업 결과를 보내는데에도 사용된다.

Storage Level

각 Block은 해당 Block이 Memory, Disk, Off-Heap 중 어디에 저장될지 명시하는 Storage Level에 연관되어 있다. Block은 Memory가 부족할 때 Memory에서 Disk로 Evict되어 Memory와 Disk 모두에 존재하는 경우도 있다.

Storage Level은 또한 Block이 Serialized 된 형태로 저장되는지 아닌지에 대해서도 명시합니다. MEMORY_AND_DISK_SER Storage Level은 특히 주목해야합니다. MEMORY_AND_DISK_SER Storage Level에서는 Block이 이미 Serialized 된 ByteArray 상태로 Memory에 존재하기 때문에 Disk로 Evict할 때 Serialize하지 않아도 되서 Evict 비용이 저렴합니다.

Eviction policy

현재 Eviction policy는 LRU(Least Recently Used) 알고리즘이 적용되어 있습니다. 여기에는 2가지 예외가 있습니다.

  • RDD의 Block을 Cache하기 위해서 해당 RDD를 Evict하지 않습니다.
  • Unrolling이 실패할 경우, 해당 블록은 바로 Evict 됩니다.

Unrolling

BlockManager는 Iterator 형태로 Block 데이터를 수신한 후, Block이 메모리에 상주해야하는 경우에는 Iterator를 Array 형태로 Unroll 합니다.

그러나 전체 Iterator가 메모리에 들어갈 수 없을 경우가 있기 때문에(메모리 공간이 데이터 크기보다 작을 경우), BlockManager는 OOM(Out of memory error)를 피하기 위해 Array로 Unroll하기 위한 공간이 적당한지 체크하며 점진적으로 Unroll을 수행합니다.

Unrolling을 위해 사용되는 메모리는 Storage 공간입니다. Block이 존재하지 않는 경우에 Unrolling은 Storage 공간을 모두 사용할 수 있습니다. 반면에 Unrolling은 메모리 공간이 부족한 경우 spark.storage.unrollFraction 값에 의해 20%까지 줄어들 수 있습니다.

Unrolling에 대하여

Unrolling이라는 과정이 있다는 것을 이번에 처음 알게 되었다. 출판된 서적에서는 본적이 없었던 것 같고, 해외 블로그에 작성된 글에서는 간간히 등장했다. 결국은 코드에서 Unroll 과정을 찾아보기로 했고, Spark Internal Part 1. RDD의 내부동작 에서 RDD가 연산되는 과정을 따라갔었는데, 해당 코드 이후에 Unroll 과정이 등장하는 것을 확인하였다. 다시 한번 해당 코드부터 추적을 시작하여 Unrolling이 어떤 과정인지 알아보도록 한다.

RDD.scala의 getOrCompute 함수

위 코드를 보면 332 Line에서 RDD를 구성하는 Partition의 Block ID를 가져오고, 335 Line에서 해당 Block을 연산하기 위해 BlockManager의 getOrCompute 함수를 호출한다.

BlockManager.gerOrElseUpdate 함수

getOrElseUpdate 의 819 Line에서는 우리가 연산할 RDD Block을 Local 혹은 Remote에서 가져온다.

826 Line에서는 doPutIterator 함수를 호출하여 Block에 대해 우리가 map, reduce 를 통해 전달한 함수 f를 적용하게 된다.

BlockManager.doPutIterator 함수

doPutIterator 함수에서는 StorageLevel에 따라 MemoryStore, DiskStore 객체를 통해 Unrolling을 수행하게 된다.

1109 Line을 보면 MemoryStore 객체의 putIteratorAsValues 함수를 호출하고, 이 함수는 다시 MemoryStore 객체의 putIterator 함수를 호출한다.

MemoryStore.putIterator 함수 정의

putIterator 함수의 167 Line 주석을 보면 Iterator가 메모리보다 큰 경우에 OOM이 발생할 수 있기 때문에 충분한 메모리 공간을 확보하며 Iterator를 점진적으로 Unroll한다고 쓰여 있다.

그렇다면 실제 Unroll 이 어떻게 수행되는지 코드를 통해 확인해보자.

MemoryStore.putIterator 함수의 194 ~ 217 Line 초기화 부분

putIterator 함수의 194 ~ 217 Line은 Unrolling을 하기 위한 설정들을 로드한다.

198 Line의 initialMemoryThreshold은 Unrolling에 사용되는 메모리의 초기값인데, 이 값은 아래와 같이 spark.storage.unrollMemoryThreshold 속성값을 사용하며, 초기 값은 1024*1024 이다.

200 Line의 memoryCheckPeriod는 Iterator를 돌며 몇개의 요소를 로드했을 때마다 남은 메모리 공간을 체크할 것인지에 대한 값이다.

spark.storage.unrollMemoryCheckPeriod를 사용하며 기본 값은 16이다. 즉, 16개의 요소를 로드할 때마다 남은 메모리 공간을 체크한다.

memoryGrowthFactor는 메모리 할당 요청을 할 때마다 몇배씩 증가하여 요청할 것인가에 대한 값이다.

209 Line에서는 reserveUnrollMemoryForThisTask 함수를 호출하여 Unrolling에 필요한 메모리 공간을 할당받는다.

551 Line에서와 같이 MemoryManager 객체의 acquireUnrollMemory를 호출하여 필요한 메모리를 할당받으며 기본적으로 1.6 이상부터는 UnifiedMemoryManager를 사용하고, LegacyMode를 사용할 경우에만 StaticMemoryManager를 사용한다.

220 Line부터 기존 Block에 f를 적용한 결과(values)를 저장하는 과정을 보여준다.

부모 RDD Block에 f를 적용한 결과인 values의 크기가 현재 사용가능한 메모리 공간보다 작은 경우에는 문제가 없겠지만, 만일 큰 경우에는 Iterator의 모든 요소들을 메모리로 올리는 과정에서 OOM이 발생할 것이다.

따라서 Iteration 과정 중 일정 주기마다 메모리를 체크하여, 할당받은 메모리 공간을 초과할 경우 추가적으로 메모리를 할당받는 과정을 거친다.

222 Line을 보면 memoryCheckPeriod마다 메모리 체크를 수행하는 것을 확인할 수 있고, 225 Line에서는 연산 결과를 저장하는 ValuesHolder(Vector) 객체의 크기와 할당받은 메모리 크기를 비교하여 메모리가 부족한 경우 228 Line에서와 같이 메모리를 재할당받은 후 Iteration을 진행해나가는 것을 볼 수 있다.

3년 전 Spark 1.6 버전에서 RDD를 사용할 때만해도 OOM이 두려웠는데, 요즘은 Spark SQL을 사용하고 GroupBy 관련 코드보다는 Join 위주로 작성하는 경우가 대부분이라서 메모리 관련 오류를 겪어본 적이 없었던 것 같다.

이 글을 번역하면서 메모리 관리 자체보다는 다른 개발자들이 문제 상황에 직면해서 이를 어떻게 풀어나가는지에 대해 더 많이 공감하게 되었던 것 같다.

특히 Unrolling에 대해 코드분석을 진행하는 과정에서 Iterator 패턴 사용과 메모리 관련 코드 설계를 얼마나 치밀하게 했는지 느낄 수 있었고, 향후에 이러한 접근방식을 내 프로젝트에도 적용할 수 있을 것이라 생각한다.

--

--