Parquet Min/Max Statistics를 이용한 Spark/Impala의 Predicate Pushdown 최적화
요약
저희 쇼핑검색데이터개발팀에서는 Spark/Impala를 통해 Parquet 파일을 읽는 배치 작업이 많습니다. 그 중 특정 파일은 항상 특정 Column으로 필터링(predicate)하여 사용하였습니다.
하지만 필요한 필터링된 데이터 양은 적은데도, 데이터를 읽을 때마다 전체 데이터를 모두 읽어와서 I/O Overhead가 너무 크다는 문제가 있었습니다. 이는 Parquet에는 빠른 검색을 할 수 있는 Index가 따로 없기 때문입니다.
이러한 문제를 저희는 Parquet의 특징을 이용한 Predicate Pushdown 최적화로 해결했습니다. 이는 바로 Parquet을 특정 Column으로 정렬해서 저장하는 것입니다. 이 글을 통해 어떤 식으로 최적화가 이루어졌는지, 어떻게 그게 I/O Overhead를 줄이는 게 가능한 건지 보도록 하겠습니다.
Parquet
Parquet은 Apache Hadoop 에코시스템에서 빅데이터를 저장하기 위해서 만들어진 데이터 저장 포맷입니다. 가장 큰 특징은 Column Format 기반이며, 스키마를 가지고 있다는 것입니다.
그림에서 보시는 바와 같이, 일반적으로 Row Format으로 저장되는 것과 다르게 Parquet은 Column Format이기에 Parquet 파일을 읽을 때 특정 Column만 필요하다면 손쉽게 특정 Column만 읽어올 수가 있습니다.
또한 Column끼리 저장되어 있다는 것은, 같은 데이터 타입끼리(String은 String, Int는 Int끼리) 저장할 수 있다는 뜻입니다. 이로 인해 데이터 타입에 맞는 인코딩을 사용할 수 있게 되어 압축률이 좋아집니다. 이러한 장점들로 인해 Parquet은 빅데이터 저장에 활발히 사용되고 있습니다.
Predicate Pushdown
이런 Parquet의 또다른 특징 중 하나는, 메타데이터에 Column의 통계 정보를 가지고 있다는 것입니다. 그 중, 이 글에서 중점적으로 살펴볼 것은 Row Group별로 컬럼의 최소값과 최대값을 저장해두는 부분입니다.
> parquet-tools meta part-00001-...snappy.parquet...row group 1: RC:34971 TS:589487 OFFSET:4
--------------------------------------------------------------------mallSeq: BINARY SNAPPY ... ST:[min: 1, max: 500, num_nulls: 0]
...
Spark와 Impala는 Parquet 파일을 읽을 때 이러한 통계 정보를 이용해서 Predicate Pushdown을 제공합니다. 이 Parquet 파일을 특정 Column으로 필터링할 때, 통계 정보를 사용해서 해당 파일에 원하는 데이터가 있는지 없는지를 판별하는 것입니다.
예를 들면 지금 이 part-00001-…parquet 파일은 mallSeq라는 이름의 String 컬럼이 있고, 최소값은 1, 최대값은 500이라는 것을 알 수 있습니다. 그럼 이 데이터를 (mallSeq = ‘501’)라는 조건으로 읽는다면 이 Parquet 파일에는 원하는 데이터가 없음을 알기에, 이 파일은 전혀 읽을 필요가 없어집니다.
이는 하나의 예시입니다. Parquet은 내부적으로 이러한 메타데이터가 많고 더 세부적인 구조로 나누어져 있어서, 실제로는 Predicate Pushdown이 일어나면 파일 단위로 읽거나 읽지 않는 것이 아니고, 파일의 일부만 읽어오는 경우가 많습니다.
문제
앞에서 말했던, 특정 Parquet 데이터 중 하나를 예시로 들어보겠습니다. 저희 팀이 사용하는 ‘report_primitive’라는 Parquet 데이터는, 대부분의 경우 ‘mallSeq’라는 Column으로 필터링을 해서 사용했습니다.
--- Query
SELECT * FROM report_primitive WHERE mallSeq = ‘24’;
이 Parquet 데이터는 Row 수가 약 20억개로 매우 큰 반면에, mallSeq로 필터링한 우리가 직접 필요한 데이터 수는 훨씬 적었습니다. 그럼에도 불구하고 사용할 때 항상 전체 데이터를 모두 읽어왔습니다.
--- Impala Profile로 확인한 쿼리 실행 결과
HDFS_SCAN_NODE
- RowsRead: 22.99M (22991127)
- RowsReturned: 0 (0)
...
HDFS_SCAN_NODE
- RowsRead: 22.58M (22577492)
- RowsReturned: 1 (1)
...
HDFS_SCAN_NODE
- RowsRead: 22.05M (22054044)
- RowsReturned: 0 (0)
...
HDFS_SCAN_NODE
- RowsRead: 22.73M (22734021)
- RowsReturned: 0 (0)
...
HDFS_SCAN_NODE
- RowsRead: 22.15M (22154184)
- RowsReturned: 0 (0)
...
HDFS_SCAN_NODE
- RowsRead: 22.73M (22734021)
- RowsReturned: 0 (0)
...
보시면 Spark에서도 약 20억개의 Row를 모두 읽었고 Impala에서도 90개의 Node가 각 2천만개 정도씩 모든 Row를 읽었음을 알 수 있습니다. 우리는 이 문제를 Predicate Pushdown을 최적화하여 해결해보기로 했습니다.
Predicate Pushdown 최적화
row group 106: RC:816742 TS:156769360 OFFSET:2818572288
--------------------------------------------------------------------
mallSeq: BINARY SNAPPY ... ST:[min: 1, max: 999995, num_nulls: 0]
...row group 107: RC:38582 TS:7525102 OFFSET:2929261817
--------------------------------------------------------------------
mallSeq: BINARY SNAPPY ... ST:[min: 1, max: 999934, num_nulls: 0]
...row group 108: RC:30219 TS:6009142 OFFSET:2934799976
--------------------------------------------------------------------
mallSeq: BINARY SNAPPY ... ST:[min: 1, max: 999983, num_nulls: 0]
Parquet의 메타데이터를 살펴보았을 때, mallSeq Column의 min/max를 보시면 각 row group마다 비슷하여 데이터가 무작위로 퍼져 있음을 알 수 있었습니다. 이 데이터를 mallSeq 기준으로 정렬하여 저장하면 어떻게 될까요?
row group 131: RC:15127 TS:533495 OFFSET:393457554
--------------------------------------------------------------------
mallSeq: BINARY SNAPPY ... ST:[min: 967336, max: 96734, num_nulls: 0]
...row group 132: RC:12941 TS:477921 OFFSET:393795779
--------------------------------------------------------------------
mallSeq: BINARY SNAPPY ... ST:[min: 96734, max: 967458, num_nulls: 0]
...row group 133: RC:12941 TS:1033830 OFFSET:394108622
--------------------------------------------------------------------
mallSeq: BINARY SNAPPY ... ST:[min: 967458, max: 967465, num_nulls: 0]
mallSeq를 기준으로 데이터가 분리되어 있는 것을 확인해 볼 수 있습니다. 이 데이터를 다시 Spark/Impala로 읽어 보겠습니다.
HDFS_SCAN_NODE
- RowsRead: 68.33K (68335)
- RowsReturned: 0 (0)
...
HDFS_SCAN_NODE
- RowsRead: 3.04M (3040100)
- RowsReturned: 2 (2)
...
HDFS_SCAN_NODE
- RowsRead: 3.11M (3110100)
- RowsReturned: 0 (0)
..
HDFS_SCAN_NODE
- RowsRead: 0 (0)
- RowsReturned: 0 (0)
...
HDFS_SCAN_NODE
- RowsRead: 0 (0)
- RowsReturned: 0 (0)
...
HDFS_SCAN_NODE
- RowsRead: 0 (0)
- RowsReturned: 0 (0)
...
Scan한 row 수가 약 600만개로, 기존에 비해 0.2% 정도로 급감했습니다. 기존에는 모든 Parquet 파일들이 mallSeq = ‘24’에 해당하는 데이터를 가지고 있을 수 있었지만, 정렬해서 저장한 Parquet 파일들은 아주 적은 수의 파일만 mallSeq = ‘24’에 해당하는 데이터를 가지고 있을 것이고, Spark/Impala는 해당 파일만 읽어왔기에 이런 결과가 도출된 것입니다.
결론
Parquet 파일로 데이터를 저장 시, 특정 Column으로 Predicate Pushdown이 자주 일어날 것으로 예상되면, 해당 Column으로 정렬해서 저장하는 것이 I/O Overhead를 줄이는 데에 크게 도움이 됩니다.
시나리오에선 단순히 ‘mallSeq = ?’라는 조건을 이용했으나, Predicate Pushdown은 ‘mallSeq IN (?)’, ‘mallSeq < ?’등에서도 동작합니다. 물론 Scan하는 Parquet 수는 늘어나겠지만, 전체 Scan하는 것보다는 성능 면에서 훨씬 더 좋을 것입니다.
하지만 정렬해서 저장하는 것이 무조건 좋은 것은 아닙니다.
발생할 수 있는 단점과 제약 사항
단점
만약 제가 mallSeq로 데이터를 정렬해서 저장하려 할 때 특정 mallSeq에 해당하는 데이터가 또다른 mallSeq에 해당하는 데이터보다 훨씬 많다면 어떻게 될까요?
보시면 Parquet의 파일 사이즈가 크게 다릅니다. 정렬 시 어쩔 수 없이 발생하는 문제입니다. 이는 Spark등 분산 컴퓨팅 엔진으로 데이터를 처리할 때 Data Skew 현상을 유발할 수 있습니다.
제약사항
- Column 타입이 Array 등의 복합 타입인 경우, Parquet metadata에 min/max statistic이 존재하더라도 Predicate Pushdown이 동작하지 않습니다.
- Column을 다른 타입으로 캐스팅할 시 Predicate Pushdown이 동작하지 않습니다.
예시: Impala
--- 원본 쿼리 SELECT * FROM report_primitive WHERE mallSeq = '24'
SELECT * FROM report_primitive WHERE CAST(mallSeq AS BIGINT) = 24
아래의 쿼리를 실행하면 Predicate Pushdown이 동작하지 않습니다.
예시: Spark
--- 원본 쿼리 SELECT * FROM report_primitive WHERE mallSeq = '24'
SELECT * FROM report_primitive WHERE mallSeq = 24
Spark에서는 매우 주의해야합니다. Spark는 비교할 때 타입이 다르면 암시적으로 캐스팅하기 때문에, Predicate Pushdown이 동작하지 않습니다.
읽어주셔서 감사합니다.
참고사항: Impala: version 3.2.0-cdh6.3.2, Spark: version 2.4.0-cdh6.3.2으로 진행되었습니다.