Parquet Min/Max Statistics를 이용한 Spark/Impala의 Predicate Pushdown 최적화

조은현
네이버 쇼핑 개발 블로그
11 min readSep 12, 2021

요약

저희 쇼핑검색데이터개발팀에서는 Spark/Impala를 통해 Parquet 파일을 읽는 배치 작업이 많습니다. 그 중 특정 파일은 항상 특정 Column으로 필터링(predicate)하여 사용하였습니다.

하지만 필요한 필터링된 데이터 양은 적은데도, 데이터를 읽을 때마다 전체 데이터를 모두 읽어와서 I/O Overhead가 너무 크다는 문제가 있었습니다. 이는 Parquet에는 빠른 검색을 할 수 있는 Index가 따로 없기 때문입니다.

이러한 문제를 저희는 Parquet의 특징을 이용한 Predicate Pushdown 최적화로 해결했습니다. 이는 바로 Parquet을 특정 Column으로 정렬해서 저장하는 것입니다. 이 글을 통해 어떤 식으로 최적화가 이루어졌는지, 어떻게 그게 I/O Overhead를 줄이는 게 가능한 건지 보도록 하겠습니다.

Parquet

Parquet은 Apache Hadoop 에코시스템에서 빅데이터를 저장하기 위해서 만들어진 데이터 저장 포맷입니다. 가장 큰 특징은 Column Format 기반이며, 스키마를 가지고 있다는 것입니다.

Row Format VS Column Format 저장 방식

그림에서 보시는 바와 같이, 일반적으로 Row Format으로 저장되는 것과 다르게 Parquet은 Column Format이기에 Parquet 파일을 읽을 때 특정 Column만 필요하다면 손쉽게 특정 Column만 읽어올 수가 있습니다.

또한 Column끼리 저장되어 있다는 것은, 같은 데이터 타입끼리(String은 String, Int는 Int끼리) 저장할 수 있다는 뜻입니다. 이로 인해 데이터 타입에 맞는 인코딩을 사용할 수 있게 되어 압축률이 좋아집니다. 이러한 장점들로 인해 Parquet은 빅데이터 저장에 활발히 사용되고 있습니다.

Predicate Pushdown

이런 Parquet의 또다른 특징 중 하나는, 메타데이터에 Column의 통계 정보를 가지고 있다는 것입니다. 그 중, 이 글에서 중점적으로 살펴볼 것은 Row Group별로 컬럼의 최소값과 최대값을 저장해두는 부분입니다.

> parquet-tools meta part-00001-...snappy.parquet...row group 1: RC:34971 TS:589487 OFFSET:4
--------------------------------------------------------------------mallSeq: BINARY SNAPPY ... ST:[min: 1, max: 500, num_nulls: 0]
...

Spark와 Impala는 Parquet 파일을 읽을 때 이러한 통계 정보를 이용해서 Predicate Pushdown을 제공합니다. 이 Parquet 파일을 특정 Column으로 필터링할 때, 통계 정보를 사용해서 해당 파일에 원하는 데이터가 있는지 없는지를 판별하는 것입니다.

예를 들면 지금 이 part-00001-…parquet 파일은 mallSeq라는 이름의 String 컬럼이 있고, 최소값은 1, 최대값은 500이라는 것을 알 수 있습니다. 그럼 이 데이터를 (mallSeq = ‘501’)라는 조건으로 읽는다면 이 Parquet 파일에는 원하는 데이터가 없음을 알기에, 이 파일은 전혀 읽을 필요가 없어집니다.

이는 하나의 예시입니다. Parquet은 내부적으로 이러한 메타데이터가 많고 더 세부적인 구조로 나누어져 있어서, 실제로는 Predicate Pushdown이 일어나면 파일 단위로 읽거나 읽지 않는 것이 아니고, 파일의 일부만 읽어오는 경우가 많습니다.

문제

앞에서 말했던, 특정 Parquet 데이터 중 하나를 예시로 들어보겠습니다. 저희 팀이 사용하는 ‘report_primitive’라는 Parquet 데이터는, 대부분의 경우 ‘mallSeq’라는 Column으로 필터링을 해서 사용했습니다.

--- Query
SELECT * FROM report_primitive WHERE mallSeq = ‘24’;

이 Parquet 데이터는 Row 수가 약 20억개로 매우 큰 반면에, mallSeq로 필터링한 우리가 직접 필요한 데이터 수는 훨씬 적었습니다. 그럼에도 불구하고 사용할 때 항상 전체 데이터를 모두 읽어왔습니다.

Details For Query (Spark UI에서 확인)
--- Impala Profile로 확인한 쿼리 실행 결과
HDFS_SCAN_NODE
- RowsRead: 22.99M (22991127)
- RowsReturned: 0 (0)
...
HDFS_SCAN_NODE
- RowsRead: 22.58M (22577492)
- RowsReturned: 1 (1)
...
HDFS_SCAN_NODE
- RowsRead: 22.05M (22054044)
- RowsReturned: 0 (0)
...
HDFS_SCAN_NODE
- RowsRead: 22.73M (22734021)
- RowsReturned: 0 (0)
...
HDFS_SCAN_NODE
- RowsRead: 22.15M (22154184)
- RowsReturned: 0 (0)
...
HDFS_SCAN_NODE
- RowsRead: 22.73M (22734021)
- RowsReturned: 0 (0)
...

보시면 Spark에서도 약 20억개의 Row를 모두 읽었고 Impala에서도 90개의 Node가 각 2천만개 정도씩 모든 Row를 읽었음을 알 수 있습니다. 우리는 이 문제를 Predicate Pushdown을 최적화하여 해결해보기로 했습니다.

Predicate Pushdown 최적화

row group 106:   RC:816742 TS:156769360 OFFSET:2818572288
--------------------------------------------------------------------
mallSeq: BINARY SNAPPY ... ST:[min: 1, max: 999995, num_nulls: 0]
...
row group 107: RC:38582 TS:7525102 OFFSET:2929261817
--------------------------------------------------------------------
mallSeq: BINARY SNAPPY ... ST:[min: 1, max: 999934, num_nulls: 0]
...
row group 108: RC:30219 TS:6009142 OFFSET:2934799976
--------------------------------------------------------------------
mallSeq: BINARY SNAPPY ... ST:[min: 1, max: 999983, num_nulls: 0]

Parquet의 메타데이터를 살펴보았을 때, mallSeq Column의 min/max를 보시면 각 row group마다 비슷하여 데이터가 무작위로 퍼져 있음을 알 수 있었습니다. 이 데이터를 mallSeq 기준으로 정렬하여 저장하면 어떻게 될까요?

row group 131:   RC:15127 TS:533495 OFFSET:393457554
--------------------------------------------------------------------
mallSeq: BINARY SNAPPY ... ST:[min: 967336, max: 96734, num_nulls: 0]
...
row group 132: RC:12941 TS:477921 OFFSET:393795779
--------------------------------------------------------------------
mallSeq: BINARY SNAPPY ... ST:[min: 96734, max: 967458, num_nulls: 0]
...
row group 133: RC:12941 TS:1033830 OFFSET:394108622
--------------------------------------------------------------------
mallSeq: BINARY SNAPPY ... ST:[min: 967458, max: 967465, num_nulls: 0]

mallSeq를 기준으로 데이터가 분리되어 있는 것을 확인해 볼 수 있습니다. 이 데이터를 다시 Spark/Impala로 읽어 보겠습니다.

Details For Query
HDFS_SCAN_NODE
- RowsRead: 68.33K (68335)
- RowsReturned: 0 (0)
...
HDFS_SCAN_NODE
- RowsRead: 3.04M (3040100)
- RowsReturned: 2 (2)
...
HDFS_SCAN_NODE
- RowsRead: 3.11M (3110100)
- RowsReturned: 0 (0)
..
HDFS_SCAN_NODE
- RowsRead: 0 (0)
- RowsReturned: 0 (0)
...
HDFS_SCAN_NODE
- RowsRead: 0 (0)
- RowsReturned: 0 (0)
...
HDFS_SCAN_NODE
- RowsRead: 0 (0)
- RowsReturned: 0 (0)
...

Scan한 row 수가 약 600만개로, 기존에 비해 0.2% 정도로 급감했습니다. 기존에는 모든 Parquet 파일들이 mallSeq = ‘24’에 해당하는 데이터를 가지고 있을 수 있었지만, 정렬해서 저장한 Parquet 파일들은 아주 적은 수의 파일만 mallSeq = ‘24’에 해당하는 데이터를 가지고 있을 것이고, Spark/Impala는 해당 파일만 읽어왔기에 이런 결과가 도출된 것입니다.

결론

Parquet 파일로 데이터를 저장 시, 특정 Column으로 Predicate Pushdown이 자주 일어날 것으로 예상되면, 해당 Column으로 정렬해서 저장하는 것이 I/O Overhead를 줄이는 데에 크게 도움이 됩니다.

시나리오에선 단순히 ‘mallSeq = ?’라는 조건을 이용했으나, Predicate Pushdown은 ‘mallSeq IN (?)’, ‘mallSeq < ?’등에서도 동작합니다. 물론 Scan하는 Parquet 수는 늘어나겠지만, 전체 Scan하는 것보다는 성능 면에서 훨씬 더 좋을 것입니다.

하지만 정렬해서 저장하는 것이 무조건 좋은 것은 아닙니다.

발생할 수 있는 단점과 제약 사항

단점

만약 제가 mallSeq로 데이터를 정렬해서 저장하려 할 때 특정 mallSeq에 해당하는 데이터가 또다른 mallSeq에 해당하는 데이터보다 훨씬 많다면 어떻게 될까요?

HDFS 캡처

보시면 Parquet의 파일 사이즈가 크게 다릅니다. 정렬 시 어쩔 수 없이 발생하는 문제입니다. 이는 Spark등 분산 컴퓨팅 엔진으로 데이터를 처리할 때 Data Skew 현상을 유발할 수 있습니다.

제약사항

  1. Column 타입이 Array 등의 복합 타입인 경우, Parquet metadata에 min/max statistic이 존재하더라도 Predicate Pushdown이 동작하지 않습니다.
  2. Column을 다른 타입으로 캐스팅할 시 Predicate Pushdown이 동작하지 않습니다.

예시: Impala

--- 원본 쿼리 SELECT * FROM report_primitive WHERE mallSeq = '24'
SELECT * FROM report_primitive WHERE CAST(mallSeq AS BIGINT) = 24

아래의 쿼리를 실행하면 Predicate Pushdown이 동작하지 않습니다.

예시: Spark

--- 원본 쿼리 SELECT * FROM report_primitive WHERE mallSeq = '24'
SELECT * FROM report_primitive WHERE mallSeq = 24

Spark에서는 매우 주의해야합니다. Spark는 비교할 때 타입이 다르면 암시적으로 캐스팅하기 때문에, Predicate Pushdown이 동작하지 않습니다.

읽어주셔서 감사합니다.

참고사항: Impala: version 3.2.0-cdh6.3.2, Spark: version 2.4.0-cdh6.3.2으로 진행되었습니다.

--

--