Spark Internal Part 1. RDD의 내부 동작

이용환
12 min readSep 16, 2018

--

2016년 1월에 Apache Spark을 접하고 벌써 2년 6개월 정도가 지났다. 물론 계속 Spark을 공부하거나 사용하지는 않았고 프로젝트에서 필요할 때마다 2~3개월 정도씩 사용했었던 것 같다.

최근에 사내에서 Pig로 작성된 일부 ETL 작업을 Spark으로 포팅하는 작업을 진행하고 있는데, 문득 ‘Spark 내부에서 RDD는 어떻게 동작할까?’라는 궁금증이 들었다.

물론 Spark에서는 High Level RDD API를 제공하고 있기 때문에, 굳이 개발자가 내부 구현을 알지는 않아도 되지만 내부적으로 어떻게 동작하는지 알면 더 잘 사용할 수 있지 않을까?라는 생각에 Spark 코드를 조금 살펴보기 시작했다.

물론 워낙 규모가 크고 분석할 시간도 충분하지는 않았기 때문에 중간 중간 건너 뛴 부분도 있지만, 이 글을 읽고 나면 대략적으로 RDD가 어떻게 연산되는지 알 수 있을 것이라 생각한다.

이 글에서는 Spark 소스코드를 따라가며 RDD의 동작을 알아볼 예정이다.

우리가 사용하는 RDD는 Spark Core 내의 org.apache.spark.rdd.RDD.scala에 정의되어 있으며, 주석에 상세하게 RDD에 대한 설명이 적혀 있다.

RDD의 5가지 특성

내부적으로 RDD는 5가지 주요 특성으로 표현할 수 있다.

  • 파티션 목록
  • 각 Split을 연산(계산)하는데에 사용되는 함수
  • 의존하는 다른 RDD 목록
  • (Optionally) Key-Value RDD를 위한 파티셔너
  • (Optionally) 각 Split이 연산되는데에 최적의 노드 목록

이 중 1,2,3번째 특징에 대해 정리해보고자 한다.

A list of partitions(파티션 목록)

RDD는 여러 개의 파티션으로 이루어져 있고, 하나의 파티션은 동일한 타입의 여러 개의 객체들로 이루어져 있다.

RDD.scala의 partitions 함수

RDD의 partitions 함수를 보면, 내부적으로 다시 getPartition 함수를 이용하여 Partition 배열 객체를 반환하며

RDD.scala의 getPartitions 함수

getPartitions 함수의 경우 추상 클래스인 RDD를 상속받은 하위 RDD 클래스를 반환하는 것을 볼 수 있다. 하위 클래스인 HadoopRDD에서는 getPartitions 함수를 어떻게 구현하였는지 보자.

HadoopRDD.scala의 getPartitions 함수

204 Line을 보면 getInputFormat(jobConf).getSplits(jobConf, minPartitions) 함수를 호출하는 것을 볼 수 있다.

HadoopRDD.scala의 getInputFormat 함수

getInputFormat에서는 Hadoop의 InputFormat 클래스를 반환하는데, 이 클래스의 getSplits 함수의 설명은 다음과 같다.

Hadoop InputFormat 클래스의 getSplits 함수
  • Job에 사용되는 입력 파일의 논리적 Split을 반환한다.
  • 각 InputSplit은 처리를 위해 각 Mapper에 Assign 된다.

주의해야 할 것은 Note인데, 여기서의 Split은 논리적인 Split이며 실제 물리적인 Chunk를 가진 Split이 아니라고 적혀 있다. 즉, 물리적 Split을 추상화 한 논리적 Split으로써, 물리적 Split의 위치나 Offet 정도만을 가지고 있다는 뜻이다.

즉, Spark의 HadoopRDD를 구성하는 Partition은 Hadoop의 Split과 대응되며, 실제 File Chunk를 가지고 있는 것이 아닌 논리적인 정보만을 가지고 있다고 생각하면 된다.(다른 RDD도 파일을 열어보면 알겠지만, 데이터에 대한 논리적인 정보만을 가지고 있다)

A function for computing each split(각 Split을 계산하는 함수)

이 부분을 이해하기 위해서는 Job, Stage, TaskSet, Task에 대한 이해가 필요하고, 아래 영상에 잘 설명되어 있다.

Deep Dive into the Apache Spark Scheduler Xingbo Jiang(Databricks)

우리가 자주 호출하는 map 함수를 보면, 아래와 같이 구현되어 있다.

RDD.scala의 map 함수

Spark과 Scala를 동시에 공부할 때는 몇몇 용어에서 혼란이 올 수 있다.

  • Spark RDD의 map, reduce와 Scala의 Collection에서 제공하는 map, reduce
  • Spark의 Lazy Evaluation과 Scala의 Lazy Evaluation

물론 개념 자체로만 보면 거의 유사하지만, 내부 구현은 완전히 다르다.

위 map 함수에서는 Spark의 Lineage(리니지m 많이 사랑해주세요…), Lazy Evaluation, Clouser 처리에 대한 내용이 모두 담겨 있다.

  1. CleanF 함수를 이용하여 주어진 함수 내의 Clouser 변수를 제거한다. 분산 환경에서 주어진 함수를 처리하는 주체는 Driver가 아닌 Executor이고, 이 과정에서 함수 밖에 있는 Clouser 변수에 접근할 수 없기 때문에, CleanF와 같은 함수를 이용하여 Clouser 변수를 미리 치환하는 것으로 보인다.
  2. map 함수를 수행하는 시점에서 실제로 값을 계산하지 않고, 주어진 정보를 기반으로 초기화한 RDD(여기서는 MapPartitionsRDD)를 반환한다. Scala 언어에서의 Lazy Evaluation과는 완전히 다른 개념이다. 실제 연산은 Action 계열 연산에서 Spark Context의 runJob 함수 수행 후, 각 Executor에서 실행되게 된다.
  3. MapPartitionsRDD 초기화 인자를 보면 현재 RDD를 전달하게 되어 있다. 이 부분은 생성자에서 Dependency 정보를 생성하는데 활용되는데, 아래에서 다시 기술하도록 하겠다.

여기까지가 우리가 작성한 코드에서 map 함수를 호출했을 때 Driver Application에서 발생하는 과정이다. 이후 Action 계열의 함수를 호출한 후, DAG를 생성하고 Executor까지 넘어가는 부분에 대해서는 너무 복잡해서 분석을 포기했고, Executor에서 어떻게 주어진 Task를 처리하는지에 대한 부분부터 다시 추적을 시작하게 되었다.

Executor는 Driver에서 전달한 Task들을 수행한다. Driver로부터 Serialize된 실행 정보를 전달받아 Task 객체를 초기화한 후, run 함수에서 Task의 run 함수를 실행하게 된다.

Executor.scala의 run() 함수에서 task.run을 실행

Spark에서 Task란 무엇일까? 위에 첨부한 영상에서 잘 설명해주고 있으며, Spark의 Task.scala 코드를 보면 다음과 같이 정의되어 있다.

Task.scala

Task는 Execution의 단위이며, Spark에는 2가지 종류의 Task가 있다.

  • ShuffleMapTask
  • ResultTask

Spark Job은 1개 이상의 Stage로 구성되어 있다. Job을 구성하는 가장 최종 Stage는 여러 개의 ResultTask들로 이루어져 있고, 그 이전 Stage들은 ShuffleMapTasks로 구성되어 있다.

ResultTask는 Task를 실행시킨 후 Task의 결과를 Driver Application으로 전송한다. ShuffleMapTask는 Task를 실행 시킨 후 Task의 결과를 여러 개의 Bucket으로 나눈다(다음 Task에서 사용하기 위해서).

즉, ResultTask는 우리가 호출한 Action에 해당하며 ShuffleMapTask는 map, reduce 등의 함수에 해당한다고 생각할 수 있다.

다시 코드 내용으로 돌아가서, Executor에서 호출한 task.run 함수의 task는 Task.scala에 정의된 Task 클래스이다.

Task.scala의 run() 함수에서는 runTask를 호출한다.

우리가 추적하고자 하는 map 함수는 ShuffleMapTask에 해당하므로, ShuffleMapTask의 runTask 함수 내부를 확인해보자.

ShuffleMapTask.scala의 runTask 함수

99 Line을 보면 드디어 우리가 기다리던 rdd가 나오는 것을 볼 수 있다. 해당 라인은 writer 객체의 write 함수를 호출하여 rdd.iterator의 결과를 어딘가(메모리가 될 수도, 디스크가 될 수도 있을 것이다)에 쓰는 과정을 나타낸다.

RDD.scala의 iterator 함수

RDD의 iterator 함수는 StorageLevel이 NONE이 아닐 경우 getOrCompute 함수를 호출하며,

RDD.scala의 getOrCompute 함수

getOrCompute 함수에서는 BlockManager를 통해 연산에 필요한 Block(Split)을 가져오고 337 Line의 computeOrReadCheckpoint 함수를 호출한다.(334 Line의 주석을 보면, 이 함수는 Executor에서 호출된다는 것을 알 수 있다)

RDD.scala의 computeOrReadCheckpoint 함수

computeOrReadCheckpoint 함수를 보면 324 Line에서 compute 함수를 호출하게 되고,

RDD.scala의 compute 함수

compute 함수에서는 드디어 우리가 map 함수에 넘겼던 함수를 Parent RDD에 적용하여 새로운 우리가 원하는 새로운 RDD를 만드는 것을 볼 수 있다.

정~말 길고 지루한 과정이었지만 여기까지가 우리가 쉽게 사용하는 map 함수가 호출되는 과정이었다.

A list of dependencies on other RDDs.

위의 map 함수를 설명하는 과정에서 Dependency에 대한 내용이 나오는데, RDD의 3번째 특징인 ‘A list of dependencies on other RDDs.’에 연관된 내용이었다.

Spark의 Dependency 클래스에는 NarrowDependency, OneToOneDependency, ShuffleDependency 등 다양한 Dependency 클래스가 정의되어 있다.

시중에 나와 있는 많은 서적에서 map 함수는 NarrowDependency를 가지는 함수라고 설명되어 있고, Spark 내부적으로는 NarrowDependency를 상속받은 OneToOneDependency를 가지고 있다.

Dependency.scala의 OneToOneDependency 클래스

왜 굳이 NarrowDependency와 OneToOneDependency로 나누어 놓은 것일까?

map 함수의 경우 부모 Partition의 RDD와 자식 Partition의 RDD가 1:1 대응 관계를 이룬다.

그런데 생각해보면 우리가 파티션 수를 줄이기 위한 Coalesce 함수(Shuffle 변수를 false로 두었을 때) 또한 NarrowDependency를 가진다. 이 경우에는 n개의 부모 Partition이 1개의 자식 Partition과 대응되는 관계를 가지기 때문에 굳이 OneToOneDependency를 만든 것으로 나와 있다.

우리가 map 함수를 호출했을 때를 생각해보면, MapPartitionsRDD 객체를 반환하도록 되어 있다. MapPartitionsRDD의 생성자를 보면 부모 클래스인 RDD클래스의 인수로 부모 RDD 객체를 넘겨 초기화하는 것을 볼 수 있다.

MapPartitionsRDD 클래스의 생성자

RDD 클래스의 보조 생성자를 보면 아래와 같이 정의되어 있다.

RDD.scala의 보조 생성자

Parent RDD에 대해 1:1 Dependency를 가지는 RDD를 위한 생성자.

Spark에서는 위와 같은 방식으로 RDD간의 관계를 Dependency라는 객체를 통해 저장하고 관리하고 있고, 우리는 이를 Lineage라고 부르는 것이다.

이 내용은 8월 30일 NCSOFT 데이터수집기술팀 기술공유 시간에 공유했던 내용입니다.

Spark을 잘 활용하는 법에 대해서는 많이 소개되고 있지만, Spark이 내부적으로 어떻게 동작하는 지에 대한 글은 생각보다 많지 않은 것 같습니다.

이 글을 쓰게 된 이유는 제 실력이나 지식을 뽐내고자 한 목적이 아닌, 저와 같은 생각(내부 동작을 더 알고 싶어하는)을 가진 다른 분들이 좀 더 쉽게 접할 수 있으면 좋겠다는 생각에서 적게 된 것입니다.

읽다 보면 틀린 부분이 있을 수도 있고 부족한 부분이 있을 수도 있는데, 피드백 주시면 적극적으로 반영하도록 하겠습니다.

읽어주셔서 감사합니다.

--

--