Spark Internal Part 2. Spark의 메모리 관리(1)

이용환
17 min readSep 19, 2018

--

이번에는 Spark의 메모리 관리를 공부하며 글을 작성해 보았습니다.

Spark 사용 시 여러가지 문제를 겪을 수 있는데, 그 중 가장 많이 겪게 되는 부분이 메모리 관련(주로 GroupBy 계열 Transformation을 사용할 때 발생하는 Out Of Memory Exception)문제입니다.

Spark 공식 문서를 보면 메모리 구조나 옵션 등에 대해 상세히 나와 있지만, 사용하다보면 옵션을 잘 적용했다고 해도 종종 오류가 발생하는 경우가 있습니다. 모든 내용이 Spark 문서에 기재되어 있지 않을 뿐더러(물론 Databricks에서는 많은 강의를 통해 내부 구조에 대해 설명하고 있긴 합니다), 써드파티 라이브러리(Parquet 등)에서의 문제(아래에서 기술할 예정입니다)도 있기 때문입니다.

물론 이 글에서도 전체 내용을 다루지는 못하며(워낙 Spark 코드가 방대하고 현재도 변화하고 있기 때문에), 일부 내용의 경우 잘못 이해하고 작성했을 수도 있습니다.

만일 글 내용 중 틀렸거나 부족한부분이 있다면 피드백 부탁드립니다. 글을 쓰고 공개적인 장소에 공유하는 이유는 이 글을 읽는 다른 분들께 약간이나마 도움을 드릴 수 있고, 저도 피드백을 받아 더 성장할 수 있다고 생각하기 때문입니다.

그럼, 시작하겠습니다.(아래부터는 말투가 바뀌어도 이해해주시기 바랍니다)

Spark의 메모리 관리를 알아보기 전에, JVM Object Memory Layout, Garbage Collection, Java NIO, Netty Library 등에 대한 이해가 필요하다.

첫 두 용어(JVM Object Memory Layout, Garbage Collection)는 Spark의 메모리 구조나 관리가 왜 현재와 같이 설계되었는지에 대한 원인에 가까우며, 나머지 두 용어(Java NIO, Netty Library)의 경우 이러한 원인의 해결책에 가깝다고 할 수 있다.

물론 이러한 주제를 모른다고 해서 Spark의 메모리 구조를 파악하는데 큰 어려움이 있는 것은 아니며, 대부분의 Spark 사용자들은 많은 메모리 관련 오류를 겪으며 어렴풋이 메모리 구조에 대한 이해가 생겼을 수도 있다고 생각한다.

하지만 이 내용들을 이해하고 넘어간다면, 좀 더 확실히 오류의 원인을 파악할 수 있고 빠르게 문제를 해결해 나갈 수 있다고 믿는다.

JVM Object Memory Layout

이 내용은 IBM developerWorks의 From Java code to Java Heap을 참고하여 작성하였습니다.

32비트 OS에서 java 명령어를 사용하여 Java Application을 실행 시켰을 때, 점유하는 메모리 공간을 크게 2가지 영역으로 나누어 볼 수 있다.

  • Native Heap 영역(JVM 자체는 Native Heap 영역의 일부를 사용한다)
  • Java Heap 영역

우리가 ‘일반적인’ 자바 프로그램을 작성할 떄 사용하는 부분은 Java Heap 영역이다.

Java에는 원시 타입(Primitive Type | int, boolean 등 기본형)과 참조 타입(Reference Type | Integer, Boolean, 사용자 정의 클래스 등)의 2가지 타입을 제공한다.

기본적으로 참조 타입은 원시 타입에 비해 많은 메모리를 사용한다. 참조 타입은 값에 대한 메모리 공간에 객체메타정보에 대한 메모리 공간도 추가적으로 사용하기 때문이다.

위 글에서는 int와 Integer를 기준으로 비교가 이루어졌는데, Integer의 경우 int보다 약 4배 정도의 메모리 공간을 소모한다고 기술되어 있다.

JVM 벤더에 따라 다를 수 있겠지만 Object 메타데이터는 보통 3가지로 이루어져 있다고 한다.

  • Class 포인터: 객체가 자신이 속한 클래스를 가리키는 포인터. Integer 객체의 경우 java.lang.Integer 클래스를 가리키고 있을 것이다(설명은 나와 있지 않지만, ClassLoader에 의해 PermGen 영역에 로드된 Class 객체를 가리킬 것이라 생각한다)
  • Flags: 객체 상태에 대한 정보(객체의 hashcode나 이 객체가 Array인지 아닌지 등)
  • Lock: 동기화(Synchronization)에 사용되는 정보(Object 기반 Lock을 건다면 사용될 듯 하다)
Integer 타입 객체의 Memory 구조

위 그림을 보면 Integer의 실제 값을 저장하는 공간을 32bit(4byte)인데 메타데이터가 96bit(12byte)를 사용하고 있는 것을 알 수 있다.

Array[int] 타입 객체의 Memory 구조

Array 객체(여기서는 primitive를 Element로 가진 Array로 설명했다)의 경우 Array Length를 알리는 Size 필드가 추가된다.

String 타입 객체의 Memory 구조

String과 같은 객체의 경우 내부적으로 Array[char]를 사용하기 때문에 위와 같이 2개의 메모리 구조를 사용하게 된다.

JVM 메모리 구조를 다루는 글은 아니기 때문에(후에 좀 더 깊이 공부하고싶은 내용이긴 하다) 이 정도까지의 내용을 기술하도록 하겠다.

여기까지 읽으신 분께서는 ‘생각보다 참조 타입이 메모리를 많이 사용하는구나’ 라고 생각하셨을 것이다. 더 나아가, ‘Spark과 같은 대용량 데이터 처리 프레임워크에서는 이러한 JVM Object의 Overhead를 줄이는 방향을 연구하겠구나’ 라는 생각도 하셨을 것이라 생각한다.

Garbage Collection

사실 Java의 Garbage Collection은 너무나 많은 국내/외 문서나 블로그에 잘 정리되어 있어, 이 글에 작성하는 것보다 그 글들을 보는 것이 더 효율적일 것이다.(Naver D2의 Java Garbage Collection이 잘 정리되어 있다고 생각하여 공유드린다)

이 내용은 Databricks에서 2015년 5월 28일에 작성한 Tuning Java Garbage Collection for Apache Spark Applications의 내용과 내 생각(은 이텔릭체로 표기하겠습니다)을 바탕으로 작성하였다.

Garbage Collection은 ‘weak generational hypothesis’에 기반하여 만들어졌으며, ‘weak generational hypothesis’의 내용은 아래와 같다.

  • Most of the objects become unused quickly
  • The ones that do not usually survive for a (very) long time

대부분의 객체는 생성된 후 얼마가지 않아 사용되지 않으며, 살아남는 객체의 경우 매우 시간을 살아남는다는 것이다.

위의 객체를 RDD에 대응하여 생각해본다면 어떨까?

RDD[T]에 대해 map(f)를 호출하여 RDD[T]를 만든다고 생각해보자. 특정 Executor에 RDD의 파티션들이 할당되고, Executor는 연산을 위해 메모리에 파티션 데이터를 로드하고, 이 데이터들을 우리가 넘긴 함수 f(T->U)에 적용하여 새로운 RDD의 파티션을 만들어 낼 것이다.

함수 f에서 T에 해당하는 이전 데이터들은 U로 변환되고 나면 더이상 참조되어지지 않을 것이다. T에 해당하는 데이터들은 Minor GC가 발생했을 때 Survive하지 못하고 없어질 것이다.(여기까지는 ‘stop the world’가 발생하지 않기 때문에 큰 성능 이슈는 발생하지 않는다)

만일 만들어진 RDD들에 대해 cache를 호출한다면 어떻게 될까?

Spark의 Default Storage Level은 MEMORY_ONLY이기 때문에 cache를 호출하고, 다른 RDD들을 만들어나갈 때마다 cache된 RDD들은 Eden -> Survive -> Old로 이동해나갈 것이다.

결과적으로 메모리 공간이 부족해지면 Full GC가 발생하고, 이에 우리의 Executor JVM은 ‘stop the world’에 직면하여 작업이 지연될 것이다.

JVM Object Memory Layout 특성에 따라 하나의 값을 표현하는데 많은 메모리가 소요되고, 이러한 메모리 소요에 의해 부족해진 공간을 확보하기 위해 Garbage Collection이 발생한다.

대용량 데이터를 처리하는 프로그램에서는 이러한 특징들이 처리 속도를 저하시켰을 것이고, Spark 개발진 또한 이러한 문제를 해결하기 위해 고민했을 것이다. 현재 Spark에 적용되어 있는 다양한 기술(Project Tungsten, Netty)들은 이 고민에 대한 답이라 생각할 수 있을 것이다.

지금부터는 Spark 개발진이 이러한 문제를 해결하기 위하여 적용한 Project Tungsten과 Netty에 대해 알아보려 한다.

Project Tungsten

Project Tungsten은 Spark의 실행 엔진 개선(CPU와 메모리 위주)을 위해 2014년에 실행되었던 Umbrella Project의 코드네임이다.

위의 Project Tungsten 링크로 들어가보면, 7가지 주요 특징에 대해 서술해 놓았는데, 이 중 Software Level에 가까운 3개를 추려보자면 아래와 같다.(사실 Code Generation 또한 Hardware Level에 가깝다고 생각하지만 위 페이지를 들어가 나머지 3개를 보면 왜 Code Generation이 Software Level에 가깝다고 말한지 이해할 것이다)

  • Memory Management and Binary Processing: 메모리를 좀 더 명시적으로 관리하며 JVM 객체와 Garbage Collection의 Overhead를 없애는 것
  • Cache-aware computation: 메모리 계층 구조(L1, L2, Memory, Disk 순으로 느려지니까)를 활용하는 알고리즘과 데이터 구조 도입
  • Code generation: 모던 컴파일러와 CPU를 활용할 수 있는 코드 생성

위 내용을 좀 더 자세히 설명해놓은 자료는 2015년 4월 28일에 올라온

이며, 이를 통한 Spark SQL의 성능 개선은

페이지에 서술되어 있다.

첫번째로 링크한 페이지의 내용 중 1. Memory Management and Binary Processing 을 정리해보았다.

처음 내용의 대부분은 이 글의 위에서 기재한 JVM Object Memory Layout과 Garbage Collection에서 오는 성능 저하에 대한 내용을 다루고 있다.

여기서부터 글 내용에 나오는 부분에 대해서는 일반적인 글씨체, 제 생각이 추가된 부분은 이텔릭체로 작성하겠습니다.

Spark의 경우 데이터가 어떤 방식으로 구성(Job, Stage, Task)되어 연산되는지, 데이터의 Life Cycle은 어떻게 되는지에 대한 정보를 가지고 있기 때문에 JVM Garbage Collector보다 더 효율적으로 메모리를 관리할 수 있다.

범용적인 목적의 Application을 가정하고 만들어진 ‘weak generational hypothesis’이 Spark에는 잘 적용되지 않는 모델이라고 생각하는 것 같다.

JVM 객체와 Garbage Collection의 오버헤드를 없애기 위해 JVM Object가 아니라 Binary Data로 직접 연산할 수 있도록 Spark 연산자를 변환하는 메모리 관리자(현재 Spark Code에서 확인할 수 있는 Unified Memory Manager를 가리키는 것 같다)를 도입했다.

사실 이 부분이 가장 애매하게 해석했던 문장인 것 같습니다. 제가 코드를 확인했을 때는(제가 못보거나 잘못 봤을 수도 있겠지만) 연산자를 바꾸는 동작은 확인하지 못했었으니까요. 이 부분에 대해서는 추후 소스 코드 분석 글에서 더 자세히 다루도록 하겠습니다.

이 기능은 C 스타일의 메모리 접근(malloc, dealloc 등)을 허용하는 sun.misc.UnSafe 기반으로 구현되었습니다.

대부분의 Java Class들은 구글에 검색해보면 Oracle JavaDoc이 나오는데, sun.misc.UnSafe의 경우에는 문서가 나오지 않는다. OpenJDK7 JavaDoc을 Hosting하는 http://www.docjar.com/html/api/sun/misc/Unsafe.java.html 페이지에서 겨우 발견할 수 있었다. 클래스 주석을 보면 다음과 같이 적혀 있다.

sun.misc.UnSafe 클래스 주석

Low-Level의 Unsafe한 기능을 수행하는 함수들을 모아놓은 클래스이며, 이 클래스와 클래스에 속한 모든 methods는 Public 접근자이지만 Trusted Code에서만 이 객체를 얻을 수 있는 제한이 있습니다.

그렇다면 JVM Object를 사용하는 것과 Unsafe를 사용하는 것이 얼마나 차이가 있을까? 직접 테스트를 수행해 보았다.

  1. JVM Object를 사용하는 버전(Java로 작성)

출력 결과

Total Memory: 128974848
Max Memory: 1908932608
Free Memory: 127611672

Elapsed: 30192
Total Memory: 1547698176
Max Memory: 1908932608
Free Memory: 547102752

2. Unsafe를 사용하는 버전(Scala로 작성, 왜인지 모르겠지만 Java 환경에서 sun.misc.Unsafe가 import되지 않음…)

출력 결과

Total Memory: 128974848
Max Memory: 1908932608
Free Memory: 127611672
Elapsed: 200
Total Memory: 128974848
Max Memory: 1908932608
Free Memory: 126929656

눈여겨 보아야 할 점은 2가지이다.

  1. Elapsed의 차이

Unsafe를 사용할 경우 200ms, Java Object를 사용했을 경우 30192ms가 소요되었다.(맥북에어 2014 기준)

언어의 차이를 배제하고도 거의 15배 정도의 차이가 발생하는 것을 확인할 수 있다.

2. Memory 사용량의 변화

위 두 코드 모두 Xmx, Xms 등의 Heap 영역 설정을 주지 않고 실행하였으며, Java의 경우 힙 사용량이 거의 0MB에서 800MB 정도로 늘어난 것을 확인할 수 있고, Unsafe를 사용한 Scala의 경우 힙 공간에 변동이 거의 없는 것을 확인할 수 있다. 이는 Unsafe가 위의 JVM Object Memory Layout에서 언급했던 Native Heap 영역을 사용하기 때문이다. Native Heap 영역의 경우 Garbage Collection 의 영향을 받지 않기 때문에 ‘stop the world’ 또한 발생하지 않는다.

이렇게 sun.misc.Unsafe를 사용하게 되면 JVM 에서 연산하는 것보다 속도/메모리 관리 면에서 더 많은 이점을 가지게 된다.

Unsafe에 대한 더 많은 예제는 https://www.baeldung.com/java-unsafe 를 참고하기 바란다.

다시 Spark의 Tungsten 글로 돌아와서, Spark에서 java.util.HashMap을 UnSafe를 이용한 Binary Map으로 변경하여 성능 최적화 한 부분을 보여준다.

Project Tungsten의 HashMap 최적화(제가 그린 그림 아니에요 ㅠ)

Java HashMap(LinkedHashMap인 것으로 추정된다)을 도식화 해놓았는데, 사실 좀 더 자세히 그렸다면 HashMap 자체의 Object 메타데이터, 각 Entry의 Object 메타데이터, Key의 Object 메타데이터, Value의 Object 메타데이터 등이 포함되어 매우 많은 메모리를 소모하는 것을 예상할 수 있다.

이에 반해 Spark Binary Map의 경우 선형적 메모리 공간에 Key,Value를 연속적으로 할당하였으며(mmap과 비슷한…), Java Object가 아니기 때문에 메타데이터 공간도 절약되고 Garbage Collection의 영향도 안 받게 된다.

따라서 아래와 같은 그래프에서 극적인 성능 개선이 이루어지는 것이다.

개인적으로 위의 도식과 비교 방식이 좀 아쉽다고 생각한다. 일단 HashMap의 경우 Linked List기반의 Linked HashMap이고 Binary Map의 경우 Array에 더 가까운 모습이다. 물론 이를 제외하고라도 성능 차이가 극명한 것은 눈에 보이긴 하지만…

이 기능은 Spark 1.4 부터 적용되어 있고 Spark SQL의 엔진에서도 사용하는 것으로 알고 있다.

Spark의 경우 UnifiedMemoryManager를 통해 Heap, Off-Heap 메모리의 비율이나 사용량을 측정/조절한다.

여기까지는 좋은데, Third Party Library에서 사용하는 Off-Heap 공간이 문제다. Netty, Parquet 등의 Third Party Library에서도 성능 향상을 위해 Unsafe를 사용하고 있는데, 이러한 Library들에서 사용하는 메모리 공간에 대해서는 Spark에서 관여하지 않는 듯 하다.

가끔 Parquet 포맷의 파일을 사용할 때 “java.lang.OutOfMemoryError: Direct buffer memory”와 같은 오류 메시지를 볼 수 있는데, Parquet Library에서 허용하는 Direct Memory 이상의 메모리를 사용했기 때문에 발생하게 된다.(https://issues.apache.org/jira/browse/SPARK-4073)

여기까지 Spark Tungsten Project에 관련된 내용을 정리하였다. Part 1에 비해 약간은 코드가 적고 이론적인 설명이 많다.

원래 Spark의 BlockManager, UnifiedMemoryManager, NettyUtils, PooledByteBufAllocator에 대한 코드 분석과 Java NIO, Netty를 연계한 설명도 이 글에 포함할 계획이었다.

하지만 워낙 양이 많아 하나의 글로 풀어 내기에는 방대할 것 같아 여기까지의 내용을 1편으로 정리하고 이어지는 2편에서 다룰 예정이다.

글을 읽어주셔서 감사합니다.

혹시 글 내용 중 틀렸거나, 보충 설명이 필요한 부분에 대해서는 코멘트 부탁드립니다.

Tungsten 관련 내용이 https://issues.apache.org/jira/browse/SPARK-10000 티켓의 Attactchment에 포함되어 있는데 읽어 보시는 것을 추천드립니다.

감사합니다

--

--