Spark Internal Part 3. Spark SQL’s Catalyst Optimizer

with Deep Dive into Spark SQL’S Catalyst Optimizer

이용환
16 min readOct 14, 2018

처음 Spark을 접했을 때가 벌써 3년 전이고, 그 당시에는 RDD를 이용하여 프로그래밍 하는 것이 더 편했다.

그런데 최근 회사에서 PIG Script로 작성된 ETL 프로그램을 Spark으로 옮기다보니 SQL을 통해 처리할 수 있는 로직에 대해서는 Spark SQL을 사용하는 것이 RDD를 사용하는 것보다 더 편하다는 것을 알게 되었다.

그 이유는 아래와 같다.

  1. 사용하는 데이터 포맷이 Parquet이고, SQL만으로 처리할 수 있는 경우 Schema에 매핑되는 클래스를 정의할 필요가 없다.
  2. RDD의 경우 개발자가 최적화를 해야하지만, Spark SQL에서는 Catalyst Optimizer가 최적화를 대신해준다(물론 일부 Configuration은 사용자가 상황에 맞게 바꾸어주어야 한다).
  3. Dataframe(a.k.a Dataset[Row])은 Untyped Data인 Row를 사용하기 때문에 연산에 제한이 있었지만 Dataset은 Typed Data로 변환하여 처리하기 때문에 Dataframe보다는 좀 더 복잡한 연산이 가능하다.

그래도 아직 부족한 부분이 있다면 GroupByKey(Dataset에서는 (key, Iterable[DataType])형태로 처리되지 않음)인데, 이 부분 또한 Dataset <-> RDD 간 변환이 자유롭기 때문에 어느정도는 커버 가능하다고 본다.

이 글에서는 Databricks Blog와 Spark Summit에서 발표된 Deep Dive into Spark SQL’S Catalyst Optimizer 번역 및 요약과 약간의 코드 분석을 진행할 예정이다.

Deep Dive into Spark SQL’s Catalyst Optimizer

이 내용은 Databricks Blog의 Deep Dive into Spark SQL’S Catalyst Optimizer를 요약/정리한 것입니다.

Spark SQL은 Spark을 구성하는 요소 중 가장 최신의 기술적으로 발전 된 요소 중 하나입니다.

Spark SQL을 구현하기 위해 Scala Functional Programming 구조를 기반으로 확장 가능한 Optimizer인 Catalyst를 구현하였습니다. Catalyst의 확장 가능한 설계는 두가지 목적을 가지고 있습니다.

  1. Spark SQL에 새로운 최적화 요소나 기술 추가를 쉽게 할 수 있게 하는 것
  2. 외부 개발자들이 Optimizer를 확장시키는 것을 가능하게 하는 것

Trees

Catalyst를 구성하는 주요 데이터 타입은 Node Object로 구성된 Tree이다.

Node 타입들은 다음과 같은 속성을 가진다.

  • TreeNode 클래스를 상속받는다.
  • 0개 이상의 자식을 가질 수 있다.
  • 변경 가능하지 않다(immutable).
  • transformation 함수를 통해서 만들어진다.

아래와 같이 3개의 노드 클래스가 존재한다고 생각해보자.

  • Literal(value: Int): 상수 값을 표현하는 Node
  • Attribute(name: String): Input Row에 대한 Attribute(ex. “x”)
  • Add(left: TreeNode, right: TreeNode): 두 Node 표현의 합

이 클래스들로 x+(1+2)를 표현하면 아래와 같이 표현이 가능하고,

Add(Attribute(x), Add(Literal(1), Literal(2)))

트리 형태로 그려보면 아래와 같이 그릴 수 있다.

그림1. Tree 형태로 표현한 Expression(출처: https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html)

위의 Tree 예제가 실제로 Spark SQL에서 어떻게 쓰이는지에 대해서 감이 잘 오지 않았는데, A Deep Dive into Spark SQL’s Catalyst Optimizer with Yin Haui 영상과 Spark SQL 코드를 참고하여 확인하였다.

일단 위 영상에서 Expression, Attribute, Query Plan 등의 용어 설명이 나온다.

Expression

입력 값에 의해 계산되는 새로운 값을 나타낸다. 위의 쿼리에서의 Expression의 예는 아래와 같다.

  • 1 + 2 + t1.value
  • t1.id = t2.id (boolean 타입 값이 생성된다)

Attribute

Dataset의 컬럼 혹은 데이터 연산에 의해 새롭게 생성된 컬럼을 의미한다. 위의 쿼리에서의 Attribute의 예는 아래와 같다.

  • t1.id (Dataset의 컬럼)
  • v (1 + 2 + t1.value에 의해 생성된 컬럼)

Query Plan

Query Plan(출처: A Deep Dive into Spark SQL’s Catalyst Optimizer with Yin Huai)

Input Dataset에 적용하여 새로운 Dataset을 생성해내는 Aggregate, Join, Filter와 같은 연산을 의미한다.

위와 같은 개념을 숙지하고 코드를 보았는데 Spark SQL 관련 코드는 RDD쪽 코드보다 상대적으로 상속 계층도 엄청 많고 타입이 너무 많아서 일부 내용밖에 파악하지를 못했다.

대략적으로 상속 관계를 파악해보면 다음과 같다.

정말 상속 계층이 엄청나게 복잡하다. 그래도 다이어그램을 그리면서 Expression과 QueryPlan의 정확한 차이점을 파악할 수 있었다.

Expression 중 위에서 등장했던 Add의 코드를 보면 아래와 같다.

symbol, decimalMethod 등 해당 Expression을 표현하는 문자열 정보와 실제 연산을 수행하는 nullSafeEval 함수가 정의되어 있다.

nullSafeEval에서 호출하는 numeric.plus의 정의를 따라가보면 아래와 같은 코드가 등장한다.

실제 코드가 들어가야 하는 부분이 비어있는걸 보면, 위의 implicit을 이용하여 해당 구문을 처리하는 것 같이 보인다.

결론적으로 위에서 Tree는 Logical Planning 단계에서는 LogicalPlan과 Expression으로 구성되는 것을 확인할 수 있었다.

Physical Planning 단계에서는 다른 방식으로 구현이 되었는지 TreeNode를 사용하지 않고 있었다.

Rules

새로운 Tree는 Tree를 다른 Tree로 변경(transformation)하는 Rule을 이용하여 생성할 수 있다.

Rule을 통해 입력으로 들어온 Tree 전체를 변환할 수도 있지만, 특정 구조를 가진 Sub Tree를 찾아 변경하는 Pattern Matching Set을 적용하는 방식이 일반적이다.

Catalyst에서 Tree는 하위 모든 노드에 재귀적으로 Pattern Matching 함수를 수행하는 transform 함수를 제공한다.

우리는 아래와 같이 두 개의 상수를 더하는 Add Operation을 하나의 Literal로 Fold(접는)하는 Rule을 구현할 수 있다.

그림2. 두 개의 상수를 더하는 Add Operation을 Folding하는 Rule(출처: https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html)
위 Fold를 그림으로 표현해 보았다

Transform에서 사용되는 Pattern Matching 표현식은 입력 가능한 모든 Tree의 Subset과 일치하는 부분 함수여야 한다.

-> 위에서 내가 작성한 코드를 예로 들자면, Add Operation의 left와 right에 올 수 있는 Node의 경우의 수는 48~51번째 줄에 정의되었다. 즉, Add Operation의 apply 함수 내의 Pattern Matching 표현식은 입력 가능한 모든 Tree의 Subset과 일치한다고 표현할 수 있다.

Catalyst는 이러한 Rule을 Tree에 수행한 뒤, Rule을 적용할 수 없는 Tree의 경우 해당 Tree를 건너 뛰거나 더 하위 Tree에 대해 Rule을 수행(하위 Tree는 Rule이 적용될 수도 있으므로)한다. 따라서 Catalyst의 Rule은 Optimization이 필요한 Tree에 대해서만 적용되고 그렇지 않는 Tree에 대해서는 적용되지 않는다.

Rule은 동일한 Transform 호출 내에서 여러 개의 패턴과 일치할 수 있기 때문에, 여러 Transform을 호출하지 않고 한번에 처리할 수 있도록 패턴을 정확히 구현하는 것이 중요하다.

그림3. Add 표현은 위와 같이 세부적으로 나뉠 수 있다(출처: https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html)

위의 그림3이 이해가 잘 되지 않았었는데, 워낙 간략한 Operation이라서 그랬던 것 같다.

만일 Literal Add 연산이 엄청 복잡하거나 Recursive 했다면 Literal(0)인 부분에 대해서는 left나 right를 그대로 반환하여 계산을 줄이는 것이 더 좋은 방법일 수 있기 때문에, 위와 같이 세분화하는 것이 좋다고 서술한 것 같다.

Tree를 완전히 Transform 하기 위해서는 Rule이 여러 번 적용되어야 할 수 있다. Catalyst는 Rule을 Batch라는 단계로 묶고, 각 Batch를 Tree가 Rule을 적용해도 변경되지 않은 지점인 Fixed Point까지 반복해서 실행한다.

위 부분이 정확히 어떤 의미인지 파악하기가 어려워서 코드를 확인해보았다.

Rule을 Batch로 묶어 실행시켜주는 클래스는 RuleExecutor(Abstract)이며, 실제로 Batch를 실행시키는 코드는 아래와 같다.

RuleExecutor.scala

48 Line의 apply 함수를 통해 Batch를 실행시키며, 51 Line에서와 같이 Batch 객체에 대해 foreach문을 수행하여 Rule을 실행한다.

59Line에서 실제 Rule을 수행하고, 72 Line에서는 Iteration을 1 증가시킨다.

73 Line에서 maxIteration을 초과하여 수행하였는지(Fixed Point) 확인하고, 초과하지 않았을 경우에도 81 Line에서처럼 이전 Plan과 Rule을 통과한 Plan이 동일할 경우 더이상 해당 Rule을 적용하지 않는 것을 확인할 수 있었다.

Using Catalyst in Spark SQL

Catalyst에서는 4개 부분으로 나누어 Tree에 Transformation을 수행한다.

Catalyst에서 SQL Query를 분석하여 RDD Code까지 Generation하는 과정(출처: https://databricks.com/blog/2015/04/13/deep-dive-into-spark-sqls-catalyst-optimizer.html)
  1. Analysis
  2. Logical Plan Optimization
  3. Physical Planning
  4. Code Generation

Analysis

Spark SQL은 SQL Parser에서 반환한 Abstract Syntax Tree(AST) 혹은 Dataframe 객체의 Relation을 계산(연산)하는 것으로부터 시작된다.

두 경우 모두 Relation이 분석되지 않은 Attribute 참조나 Relation을 포함하고 있다: 예를 들어 “SELECT col FROM sales” 쿼리에서

  • col의 타입이 무엇인지
  • col이라는 컬럼 이름이 Valid한지

에 대한 정보를 sales 테이블을 확인하기 전까지는 알 수 없다.

Spark SQL은 Catalyst Rule과 Catalog object(Data source의 모든 Table을 Tracking하는 객체)을 이용하여 이러한 Attribute를 분석한다.

Catalog라는 용어가 나오는데, Hive와 같은 External Datasource에 존재하는 Table이나 SparkSession에 등록된 Global, Temporary View 등을 관리하는 객체라고 생각하면 된다.

SessionCatalog.scala

위 코드는 SessionCatalog의 선언부이며, 주석에서와 같이 Hive Metastore를 Proxy로 제공하거나, Spark Session에 속한 View들을 관리한다고 나와 있다.

Logical Optimizations

Logical Optimization 단계에서는 Logical Plan에 Rule 기반 Optimization을 적용한다. Rule Based Optimization은 Constant Folding, Predicate Pushdown, Projection Pruning, Null Propagation, Boolean Expression Simplification 등의 Rule 들을 적용한다.

위에서 언급한 Optimization이 무엇이고, 대략적으로 어떻게 구현되어 있는지 확인해보았다.

Constant Folding

컴파일러 최적화에서도 사용하는 기법인데, 상수 표현식을 Runtime Time에 계산하지 않고 Compile Time에 미리 계산해버리는 방법이다.

예를 들어 아래와 같은 쿼리가 있을 때,

B의 경우 1+2는 Runtime에 모든 Row에 대해 Evaluation 하는 것보다 Compile Time(Optimization 단계)에서 미리 3으로 계산해버리는 것이 처리 속도를 높일 수 있다.

expressions.scala 내의 ConstantFolding

위 코드는 Spark SQL에 포함되어 있는 ConstantFolding 코드이며, 인자로 들어온 Logical Plan이 Literal일 경우 그대로 반환하고, Foldable한 Plan일 경우 Evaluation 후 Literal로 만들어 반환한다.

Predicate Pushdown

일반적인 RDBMS에서도 사용하는 기법이다. 쿼리 밖에 있는 조건절을 쿼리 안쪽으로 넣는 방법이다.

이전 글에서도 설명한 예시인데, 아래와 같은 쿼리가 있을 때

Sub Query 밖에 있는 Where 절을 Sub Query 안쪽으로 밀어넣게 되면 불필요한 deptno에 대한 연산이 줄어들게 된다.

출처: https://www.youtube.com/watch?v=RmUn5vHlevc&t=994s

위에서 언급한 영상의 내용 중 Predicate Pushdown에 대한 그림이 있어 첨부하였다.

위 그림을 보면 좌측의 경우 t1 테이블과 t2 테이블을 Join한 후 Filter를 수행한다. 어차피 t2.id가 50000 이하인 값에 대해서는 Join이후에 걸러지므로 Join이전에 Filter를 적용해버리는 방식이다.

Projection Pruning

출처: https://www.youtube.com/watch?v=RmUn5vHlevc&t=994s

연산에 필요한 컬럼만을 가져오는 기법이다. Pruning이 적용되지 않았다면 좌측 최상단 Project 과정까지 올라가면서 t1, t2 테이블의 모든 컬럼들을 가지고 가겠지만, Pruning을 적용하게 되면 Scan 이후 필요한 컬럼만을 Project하므로 성능이 개선된다.

Physical Planning

Physical Planning 단계에서는 Logical Plan을 이용하여 1개 이상의 Physical Plan을 만들어낸다.

Cost Based Optimization이나 Spark Operation 관련 Optimization을 진행한다.

Code Generation

만들어진 Plan을 각 장비에서 실행시킬 수 있도록 Java Byte Code로 변환한다.

마치며

사실 Physical Planning이나 Code Generation 부분은 조금 보다가 말았다.

이 글을 쓰려고 자료를 보고 공부하는데만 해도 이틀정도가 걸려서 더 보다가는 내 할일을 못할 것 같았다.

그래도 Spark SQL이 내부적으로 어떤 방식으로 동작하는지는 알 수 있었고, 예제를 직접 코딩해보아 좋았던 시간이었다.

예제는 아래와 같이 구현하였으며, 구현하면서 들었던 생각은 코딩 자체보다 Expression Tree의 규칙을 정하고 클래스를 설계(위의 클래스 다이어그램을 보면 엄청나게 정교하게 설계되어있다)하는게 더 어렵다는 생각이 들었다.

이 코드를 작성하면서 Pattern Matching이나 Case Class에 대해 더 잘 공부할 수 있었다. 혹시 Scala 공부 중인 분이면 예제 코드를 구현해보는 것도 나쁘지 않은 생각일 것 같다.(https://medium.com/@leeyh0216/scala%EB%A1%9C-dsl-%ED%9D%89%EB%82%B4%EB%82%B4%EB%B3%B4%EA%B8%B0-561c22869a62?source=your_stories_page--------------------------- 글도 참고 바랍니다)

--

--