Data Loader, Better, Faster, Stronger

t.k.woo
네이버 쇼핑 개발 블로그
43 min readJan 21, 2021

Large parquet dataset을 위한 PyTorch dataset, dataloader 제작 일기

딥러닝을 하면서 처음 마주하는 고민은 “데이터 로더를 어떻게 만들까?”인 것 같습니다. 필자는 가지고 있는 데이터의 형식에 따라, 양이 많고 적음에 따라, 사용할 프레임워크에 따라 항상 다르게 만들어 왔는데요, 예를 들어서 TensorFlow의 경우 빠른 로딩을 위해 tfrecord로 만들어서 tf.data를 사용하기도 하고 데이터 크기가 아주 작은 경우에는 데이터 전체를 메모리에 업로드해서 접근하기도 했습니다.

만약 아주 많은 양의 데이터가 있다면 데이터 로더를 어떻게 구성하는 게 좋을까요? 이 글에서는 좋은 데이터 로더란 무엇인지, 어떤 특징이 필요한지에 대해 다음과 같은 순서로 다뤄보려 합니다.

  • 15억 개의 쇼핑 데이터
  • 데이터 로더의 조건
  • 다른 곳은 어떻게 하고 있을까? - Tensorflow, pytorch, petastorm
  • 좋은 데이터 로더 만들기

15억 개의 쇼핑 데이터

분명 데이터 많은 게 좋다고 배웠는데… 그랬는데….

네이버 쇼핑에는 매일 15억 건이 넘는 상품이 서비스되고 있다. 특히 가격비교와 같은 서비스는 사용자가 더 쉽게 쇼핑할 수 있는 편의성을 제공한다.

가격비교 기능을 구현하기 위해서는 상품 카테고리 분류와 같이 상품을 묶어주는 클러스터링 기술이 필요하다. 이 중 카테고리 분류의 경우 약 2천만 개의 데이터를 사용해서 머신러닝 모델을 학습하고 있다(카테고리 분류, 매칭에 대한 이야기가 궁금하다면 TensorFlow를 활용한 네이버쇼핑의 상품 카테고리 자동 분류 참고).

상품 데이터는 상품명, 상품 이미지, 브랜드, 메이커, 가격 등의 다양한 정보를 포함하고 있다. 1000만 건이 넘는 큰 데이터를 처리하기 위해서 네이버 쇼핑에서는 Hadoop 파일 시스템(HDFS)과 같은 분산 데이터베이스를 적극적으로 활용하고 있다. 학습 데이터도 상당히 큰 용량을 차지하고 주기적으로 데이터 변경이 일어나기 때문에 hive table 형태 또는 HDFS의 분산 데이터베이스로 보관하고 있다. 이 방법의 큰 장점은 데이터엔지니어링 툴을 직접 사용할 수 있기 때문에 데이터 전처리를 빠른 속도로 적용할 수 있다는 점이다.

문제는 ‘이 데이터를 학습 환경에 어떻게 가지고 올 것인지’ 고민하는 것부터 시작된다.

데이터가 원격 저장소에 보관되어 있기 때문에 그대로 사용하는 것은 네트워크 접근 속도가 병목이 될 확률이 크다. 따라서 로컬 머신으로 데이터를 가지고 오는 것을 고려해야 한다.

또, 원격 저장소에 보관되어 있는 데이터의 형식이 h5나 tfreccord 형태가 아니기 때문에 일반적으로 학습에 익숙한 형태로 변환할 것인지도 생각해봐야 할 문제다. 예를 들어 10억 개의 데이터에 간단히 transform을 적용해서 새로운 저장 형식으로 만든다고 생각해 보겠다. 적용하는 transform이 1밀리초가 소요되는 작은 작업이라도 약 30분의 시간이 필요하다는 사실을 항상 가슴 속 깊이 새겨두어야 한다. 간단하게 테스트 하나 해보고 싶은데 1시간씩 걸린다면….

사용 환경을 간략하게 정리해보면 다음과 같다.

  • 100GB~수 TB급의 데이터
  • 데이터는 원격 저장소에 보관
  • 사용 데이터 포맷은 분산처리 플랫폼 포맷: parquet
  • PyTorch 프레임워크에서 로딩 가능해야 함

참고 — Parquet 파일과 간단한 사용 팁

Apache Parquet는 Hadoop 에코시스템에서 사용하는 columnar storage이다. Parquet의 디자인 목표는 다음과 같다.

  • interoperability
  • space efficiency
  • query efficiency

즉, Hadoop 에코시스템 내에서 프레임워크, 데이터 모델, 언어에 관계없이 상호운용이 가능한 데이터 저장 포맷을 의미한다. hive table, hadoop file system에서 사용되고 있다.

columnar storage의 의미

출처: Hadoop Summit 발표 자료

일반적인 관계형 데이터베이스 테이블에서는 각 행(row)마다 단일 레코드의 필드값이 저장된다. row based storage는 한 행의 여러 열을 동시에 작업할 때 효율이 극대화된다. 분석하는 열이 전체의 일부분이라면 어떻게 될까? row based storage에서는 불필요한 다른 열까지 모두 읽어야 하므로 비효율적인 연산이 발생한다.

columnar storage는 row based storage의 단점을 극복하기 위한 방법으로 많이 사용된다. 다른 열에 영향을 주지 않고 특정 열의 값을 추가/대체할 때 효율적으로 사용할 수 있다. 주로 같은 열 내에서는 데이터 타입이 동일하게 사용되므로 행 기반일 때와 다른 압축 방식을 사용할 수 있어 공간 효율 또한 향상시킬 수 있다는 장점이 있다.

많은 메타 데이터를 다뤄야 하는 쇼핑의 특성상 columnar storage를 사용하면 데이터 처리 성능에 더 유리할 것이다.

columnar 방식만의 장점이 있지만 단점 또한 생각해보아야 한다. 10억 개의 상품명 데이터를 하나의 열에 보관한다면 어떻게 될까? 높은 압축률과 접근 용이성의 장점은 있지만 데이터의 사이즈, 행의 개수 자체가 너무 크다는 것을 항상 염두에 두어야 한다. 한 번에 저장하기 어려운 양이므로 columnar 방식에서는 “Row group”이라는 개념을 통해 데이터를 분리해두는 기법을 사용한다.

parquet에서는 1개의 파일을 N개의 row group으로 구성할 수 있다. 또한 columnar 기반 스토리지이기 때문에 column 길이가 길수록 속도가 빠르다는 것을 기억하면 row group 크기를 조절하는 데 좋은 참고가 될 수 있다. 예를 들어, block size가 128MB인 경우 row group을 128MB보다 작으면서 가장 근접한 값으로 유지할 때 IO 비용을 최소화할 수 있다.

실제 HDFS에 parquet 파일이 저장된 경우를 살펴보겠다.

예시는 block size 128MB 환경에서 상품명에 wordpiece subtoken parsing 전처리를 적용한 후 저장한 데이터이다. 일부 row group의 크기가 128MB를 초과하는 것을 볼 수 있다(여기서는 parquet 파일 하나가 1개의 row group을 가짐).

part-00000–451817c-xxx.parquet 파일을 자세히 보겠다.

row group 크기가 150MB로 128MB를 초과하기 때문에 2개의 block을 사용해서 저장한 것을 볼 수 있고 HDFS 상에서 해당 파일에 접근할 때 실제로 두 번의 IO 접근이 필요해져서 연산의 비효율이 발생하는 것을 짐작해볼 수 있다.

따라서 row group 개수를 지정할 때 block size 이내(128MB)의 최대 크기가 되도록 partition을 나누는 것이 좋다.

데이터 로더의 조건

좋은 데이터 로더는 어떤 특징이 있을까? 필자는 좋은 데이터 로더라면 다음과 같은 기능을 제공해야 한다고 생각한다.

  • 여러 파일로 쪼개져서 저장/읽기 가능
  • 간단한 key, value 구조
  • lazy data loading(메모리 보호)
  • IO 병목을 막기 위한 데이터 캐시
  • 데이터에 custom transform을 유연하게 적용 가능

하나씩 설명해보자면 다음과 같다.

  • 여러 파일로 쪼개져서 저장/읽기 가능
    분산 처리 데이터베이스에서는 데이터를 partition 또는 shard로 나누어 저장한다. 이는 병렬 처리에 유리한 구조이기 때문에 가급적 여러 파일인 형태를 그대로 유지하는 것이 좋다(repartition에도 시간 비용이 들어가기 때문).
    또한 data shuffle이 제공되어야 한다.
  • 간단한 key, value 구조
    pandas나 Spark, h5의 경우 column과 데이터 타입 조회가 쉽다. tfrecord는 직렬화되어 있기 때문에 feature key 값과 데이터 타입 조회가 어렵다.
  • lazy data loading(메모리 보호)
    메모리가 TB급이 아니라면 학습 데이터 전체를 메모리에 업로드하는 것이 불가능하다. 따라서 학습에 필요한 부분만 로드하는 기능이 필요하다.
  • IO 병목을 막기 위한 데이터 캐시
    모든 데이터를 메모리에 로드하고 사용할 수 없기 때문에 반드시 학습 중간에 IO 로딩을 거쳐야 한다. 이 과정에서 IO 로딩 속도 문제로 병목이 발생하므로 IO 로딩 병렬화와 캐시 기능이 제공되어야 한다.
  • 데이터에 custom transform을 유연하게 적용 가능
    데이터 변형에는 문자열 형식의 label을 정수형인 index로 바꾸는 정적인 변환 뿐만 아니라 image augmentation, NLP token masking 같은 동적인 변환도 존재한다. 데이터 로더를 만드는 과정에서 동적 변환이 쉽게 지원되도록 구현해야 한다.

우리에게 필요한 데이터 로더는

  1. hive table 형식인 parquet 파일을 그대로 이용하면 좋겠다(기존에는 tfrecord 사용).
  2. 빠른 실험을 위해 PyTorch에서 사용할 수 있어야 한다.
  3. 앞에서 설명한 데이터 로더의 조건을 만족하면 좋겠다.
  4. (+ 코드가 직관적이면 좋겠다.)

다른 곳은 어떻게 하고 있을까

우리가 원하는 기능을 누군가 이미 만들어두지 않았을까? 데이터셋과 데이터 로더 관련 기술을 찾아보면 다음과 같은 도구를 찾을 수 있다.

  • Google: TensorFlow — tfrecord, tf.data API
  • Facebook: Pytorch — torch.utils.data API
  • Uber: Petastorm
  • Nvidia: Dali

이 도구들을 살펴보면서 우리에게 적합한 데이터 로더를 찾아보겠다.

TensorFlow — tfrecord

TensorFlow는 대표적인 딥러닝 프레임워크인 만큼 데이터 로딩에 대한 고민이 충분히 이루어져 있다. 공식 문서의 가이드 항목에서 데이터 캐싱, transform 병렬화에 대한 분석, prefetch의 정의와 실행 시간 분석이 제공된다(참고: tf.data API로 성능 향상하기).

TensorFlow를 사용한다면 tfrecord를 사용하는 것이 좋은 방법이다. 하지만 실제로 사용해보면서 다음과 같은 불편한 점도 있었다.

  1. 엄격한 포맷
    tf.train.Feature(float_list=tf.train.FloatList(value=[value]))와 같이 tfrecord에 저장할 모든 데이터는 tf에서 제공하는 데이터 타입으로 변환해야 한다.
  2. parquet와의 상호 변환 문제
    tfrecord 변환 과정에서 사용한 tokenize, 이미지 인코딩 등의 값을 기존에 사용하던 parquet에 추가 후 업로드 할 때 tf.data.example 등의 tfrecord 역변환 과정을 거쳐야 한다.

쇼핑 도메인은 데이터가 주기적으로 변하고 이미지, 텍스트, 상품 정보를 동시에 확인해야 하는 multi-modal 특징이 있기 때문에 학습 데이터뿐만 아니라 기타 메타데이터(상품 속성, 판매자 정보 등)를 쉽게 확인할 수 있어야 한다. 즉, tfrecord를 사용하는 경우 Python object 타입이나 numpy 등을 자유롭게 사용하지 못하고 데이터 열람을 위해 뷰어나 변환 도구를 새로 만들어야 한다는 점이 다소 불편하게 느껴졌다.

Parquet를 그대로 사용하는 방법은 없을까?

Petastorm

우버에서도 큰 데이터에 대한 학습 방법을 고민한 것 같다. 그 결과로 TB급의 parquet 데이터를 직접 다룰 수 있는 Petastorm이라는 오픈소스를 발표했다. Petastorm은 Python 기반의 ML 프레임워크(TensorFlow, PyTorch, PySpark)를 모두 지원한다는 큰 장점이 있고 pure Python 코드로 작성되었다는 것이 특징이다.

실질적으로 원격 저장소에서 직접 데이터를 읽을 수 있도록 PySpark를 지원한다는 점이 큰 장점으로 느껴졌다.

출처: Introducing Petastorm: Uber ATG’s Data Access Library for Deep Learning

Petastorm 시나리오를 보면 PySpark를 통해 읽은 원격 데이터셋(dataframe)을 Petastorm으로 읽어서 petastorm.Dataset으로 처리한 후 TensorFlow나 PyTorch의 dataloader로 변환하는 형태로 구성되어 있는 것을 확인할 수 있다.

즉, 다음과 같은 형태로 사용할 수 있다.

  1. parquet(HDFS, 로컬)를 PySpark로 읽어서 전처리(column 선택 등)
  2. Petastorm-PySpark dataframe 처리
    1) PySpark dataframe → 로컬 캐시 디렉터리에 write
    2) petastorm.reader 등의 형태로 로드
  3. petastorm → pytorch dataloader
    1) petastorm.spark.converter → petastorm.pytorch.dataloader : context manager
    2) iterator 생성 (iter(petastorm.pytorch.dataloader))
  4. batch = next(iter())

내부 구조는 다음과 같이 구현되어 있다.

출처: Introducing Petastorm: Uber ATG’s Data Access Library for Deep Learning

ETL은 PySpark를 통해 parquet 파일을 읽고 데이터셋을 생성하는 기능을 제공한다. Unischema를 통해 지정된 data column, 데이터 타입을 Reader를 통해 읽는다. 이때 Reader에서 사용하는 parquet backend는 PyArrow로 구현되어 있으며 데이터 로딩의 주요 엔진 기능을 수행한다. tf, PyTorch 데이터 로더에 대한 변환 역시 Reader에서 지원한다.

Petastorm의 특징과 데이터 로더 조건 비교

1. parallel execution strategy 지원

단일 프로세스로 아무리 노력해도 IO에서 병목인 상황이라면 학습이 느려지기 때문에 캐시와 더불어 병렬화 기능은 아주 중요하다. Petastorm은 2가지 병렬화 실행 전략을 지원한다.

  • thread pool
  • process pool

읽어야할 데이터에 따라 두 전략 중 하나를 선택하면 된다. 일반적으로는 data row가 인코딩되어 있는 고차원 이미지 같은 경우 c++ 기반 패키지를 통해 디코딩할 수 있으므로 thread pool을 사용하는 것이 유리하고 row 크기가 작은 데이터이면서 pure Python 코드로 접근해야 하는 경우는 process pool을 사용하는 것이 유리하다.

2. shuffling

Parquet 파일은 앞서 설명한 것처럼 row group 단위로 사용하기 때문에 row group 단위로 메모리에 업로드할 수 있다. 이 구조를 그대로 사용한다면 같은 row group 내에 포함된 데이터끼리 상관(correlation)이 높아질 가능성이 커진다.

또한 row group 0의 데이터로 학습을 진행한 후 row group 1을 학습하면 row group 0의 학습 정보가 소실될 문제가 생길 수도 있다. 이를 catastrophic forgetting 문제라고 한다.

이 문제를 해결하려면 학습 batch를 구성할 때 다양한 row group에서 샘플을 추출하면 된다. 하지만 row group 전체를 메모리에 로드하는 것이 불가능하므로 해당 기능을 구현하려면 높은 IO 비용을 부담하는 수 밖에 없다.

Petastorm과 tfrecord는 그림과 같은 shuffling 전략을 통해 문제를 해결한다. 전체 row group을 메모리에 업로드할 수 없으므로 row group들 중 일부를 random sampling을 통해 메모리에 캐싱한다. 캐시된 row group 내에서 random sampling을 통해 모든 데이터가 적절히 추출되도록 조절한다.

3. 로컬 캐싱

Petastorm에서는 원격 저장소의 데이터에 접근할 수 있도록 시나리오가 작성되어 있다. PySpark를 사용해서 원격 저장소 데이터를 직접 사용할 수 있는 인터페이스가 열려있지만 사실 원격 저장소에 직접 접근하는 방법은 네트워크 접근 비용이 로컬 디스크의 IO 접근 비용보다 높기 때문에 학습에 용이한 방법은 아니다.

따라서 Petastorm은 Spark에서 처리된 column 데이터를 로컬 캐시 디렉터리에 저장하는 방법을 지원한다. Spark에서 다루는 데이터 역시 columnar data이므로 Spark dataframe을 전처리로 사용하고 학습에 사용할 column data만 추출해서 캐시 디렉터리에 저장하는 방법을 사용한다.

데이터는 학습의 1 epoch 때 로딩하는 과정에서 복제되고 2 epoch부터는 로컬 디렉터리에서 로딩이 이루어진다.

4. (단점) 메모리 캐싱

3번에서 설명한 것처럼 원격 저장소에 있는 데이터를 로컬 디스크에 캐시하는 것도 중요하지만 실제 학습 속도를 빠르게 하기 위해서는 병렬적으로 전처리된 배치 데이터를 메모리에 queue 형식으로 보관하는 것이 중요하다.

Petastorm 공식 블로그에는 해당 기능에 대한 언급이 없어서 코드를 직접 확인해봤다.

converter.maketorchdataloader() 함수를 보면 TorchDatasetContextManager를 출력하게 되어 있다.

def make_torch_dataloader(self,  
batch_size=32,
num_epochs=None,
workers_count=None,
**petastorm_reader_kwargs):
self._check_and_set_overriden_petastorm_args(
petastorm_reader_kwargs, num_epochs=num_epochs, workers_count=workers_count)
return TorchDatasetContextManager(
self.file_urls,
batch_size=batch_size,
petastorm_reader_kwargs=petastorm_reader_kwargs)

이 TorchDatasetContextManager는 https://petastorm.readthedocs.io/en/latest/modules/petastorm/spark/sparkdataset_converter.html#TorchDatasetContextManager를 열어보면 pytorch.utils.data.DataLoader가 아닌 petastorm.pytorch.DataLoader를 사용하고 있는 것을 확인할 수 있다.

class TorchDatasetContextManager(object):    ### ... (중략) ....    def __enter__(self):
from petastorm.pytorch import DataLoader
_wait_file_available(self.parquet_file_url_list)
self.reader = make_batch_reader(self.parquet_file_url_list,
**self.petastorm_reader_kwargs)
self.loader = DataLoader(reader=self.reader, batch_size=self.batch_size)
return self.loader

petastorm.pytorch.DataLoader를 보면 shufflingqueuecapacity를 인자로 가지고 있는 것을 확인할 수 있다. shufflingqueuecapacity 설정 값만큼 캐시 버퍼를 RandomShufflingBuffer() 함수에서 가지고 오는 것을 보아 메모리 캐시 기능도 제공하는 것을 확인할 수 있었다.

class DataLoader(LoaderBase):  
### ... (중략) ...
def _iter_impl(self):
keys = None
if self.shuffling_queue_capacity > 0:
# We can not know what is the reasonable number to use for the extra capacity, so we set a huge number
# and give up on the unbound growth protection mechanism.
min_after_dequeue = self.shuffling_queue_capacity - 1
self._shuffling_buffer = RandomShufflingBuffer(self.shuffling_queue_capacity,
min_after_retrieve=min_after_dequeue,
extra_capacity=100000000)
else:
self._shuffling_buffer = NoopShufflingBuffer()
### ... (생략) ...

하지만 아직 해당 기능이 완벽히 수행되지 않는 것 또한 확인할 수 있다. 실제로 capacity 값을 1 이상으로 설정해서 PyTorch 데이터 로더를 만들 경우 무한루프에 빠지는 버그를 쉽게 만날 수 있었다. 소스코드 주석에서도 다음과 같은 내용이 있는 만큼 아직 완벽하게 제공되는 기능은 아닌 것 같다.

# We can not know what is the reasonable number to use for the extra capacity, so we set a huge number
# and give up on the unbound growth protection mechanism.

5. (단점) 사용 편의성

편의성의 경우 사용자의 관점, 취향에 따라 다르지만 필자는 3가지 정도의 이유에서 Petastorm의 불편함을 느꼈다.

1. Spark가 지원되는 것은 좋지만 Spark에서 데이터를 읽는 시간을 기다려야 한다.

  • 300GB 기준 30분 정도 소요. 즉 학습 데이터를 한 번 로딩할 때 30분이 소요된다.
  • 학습 서버에 HDFS 클라이언트와 PySpark를 세팅하는 다소 번거로운 과정을 거쳐야 하는 것도 단점으로 생각할 수 있을 것이다.

2. sparkconverter.maketorch_dataloader()의 출력이 torch.utils.data.DataLoader()가 아닌 context manager가 출력된다.

  • PyTorch의 직관적인 코드와 자유도를 방해한다고 생각한다.
  • loader를 사용할 때 with 구문을 이용해야 한다.
def train_and_evaluate(_=None):  
with converter_train.make_torch_dataloader() as loader:
model = train(loader)

3. 메모리 캐시와 멀티 프로세스 병렬 기능에 버그가 있다.

PyTorch 데이터 로더

일반적인 데이터 로더의 흐름은 다음과 같다.

parquet 등의 raw 파일 형태에서 파일 읽기(메모리에 로딩), augmentation과 같은 dynamic transform, 모델 학습 순서로 이루어져 있다.

순차적으로 이루어지는 경우 실행 시간을 그래프를 그려보면 학습 시간 전체는 각 과정이 소요하는 시간의 합이라고 할 수 있다.

  • y축(위쪽부터): open, read+transform, train, epoch total
  • x축: 시간

학습 시간을 단축시키기 위해서 파일 open 과정과 transform 과정에 소모되는 시간을 단축시키는 것이 중요하다. GPU에서 모델 학습이 이루어지기 때문에 CPU는 학습을 위한 데이터 준비를 병렬적으로 준비해야 한다. TensorFlow에서는 read + transform 병렬화 과정을 prefetch라는 용어로 표현한다.

이 prefetch 과정을 CPU에서 병렬로 수행해야 한다.(GPU 메모리가 남는다면 GPU에서 수행할 수도 있다.) 이때 생성된 학습 batch들은 메모리 캐시에 보관해두어야 접근이 편리할 것이다.

PyTorch에서 prefetch 기능을 잘 지원하고 있는지 확인할 필요가 있다. 그 전에 PyTorch의 데이터 로더 방식을 간단하게 리뷰해보겠다.

https://pytorch.org/tutorials/beginner/dataloadingtutorial.html

class PytorchDataset(torch.utils.data.Dataset):  
def __init__(self, data_files):
''' initialize '''
self.data = <data_files>
def __len__(self):
return len(self.data)
def __getitem__(self, idx):
sample = self.transform(self.data[idx])
return sample

PyTorch에서는 custom data를 사용할 때 1개의 sample에 접근할 수 있는 Dataset 클래스를 정의한다. 이 클래스 객체를 torch.utils.data.DataLoader()에 인자로 넘기는 것만으로 손쉽게 배치 데이터를 얻을 수 있다.

dataset = PytorchDataset(data_files)  
dataloader = torch.utils.data.DataLoader(dataset, batch_size=8)

예에서 볼 수 있듯이 직관적인 데이터 접근 인터페이스를 제공하는 것과 데이터셋 구성의 자유도가 높다는 것이 장점이지만 자유도가 높은 만큼 복잡한 구성에서 코드 가독성이 떨어지는 단점이 발생할 수 있다.

본론으로 돌아가서 우리가 원하는 prefetch 기능이 제공되는지 확인해 보겠다.

torch.utils.data.DataLoader에서 num_workersprefetch_factor를 사용해 병렬화와 메모리 캐시에 대한 기능을 제공한다. num_workers > 0 일 때 두 기능 모두 활성화된다.

workers들은 torch.multiprocess로 구현되어 있으며 backend는 cpython.multiprocessing 모듈인 것을 확인할 수 있다.

좋은 데이터 로더 만들기

앞서 각 데이터 로딩 방법이 데이터 로더로서의 기능을 제공하는 것을 확인해 보았다. 데이터 로더의 기능적인 측면이 중요하다는 것과 만들어진 툴에서 해당 기능을 충분히 지원하려는 것도 확인할 수 있었는데, 실질적으로 중요한 부분은 ‘지원되는 기능이 얼마나 편리하게 제공되는가’이다.

가장 먼저 확인해야 할 지표는 동등한 조건에서의 실행 시간이다.

순차 로딩에서의 실행 시간 비교

비교 조건은 다음과 같다.

  • batch_size 100
  • 1000 iteration당 수행 시간 측정
  • 이미지, tokenized text, wordpiece id, label loading
  • fastparquet 기반 새로 제작한 툴
    - 기존 데이터(이미지, 텍스트) 로딩 + image augmentation 추가(albumentations)
    - torch.utils.data.Dataset, DataLoader 사용
  • (*) 데이터 전체를 메모리에 로드한 후 테스트
  • (**) workers = 12 적용
표 1 데이터 로더별 순차 로딩 실행 시간 비교

기준 지표는 pandas(*)로 메모리 전체가 업로드 가능한 환경에서의 병렬 prefetch가 적용되지 않았을 때의 실행 시간이다. 실험을 위해 batch size는 100, iteration 1000회, multi modal data loading(이미지, 토큰화된 텍스트, 텍스트 토큰들의 id, label)을 가정했다. 실제로는 메모리 전체에 업로드가 불가능하므로 IO 병목이 생겨 이 속도보다는 늦어질 수 밖에 없다.

Petastorm은 기준 대비 약 23%의 속도를 기록했다. 비교를 위해 naive한 PyTorch h5 데이터셋을 구현했으며 두 경우 실행 속도가 비슷한 것을 확인했다. dali의 경우 file open 도구를 무엇을 쓰는지에 따라 다르기 때문에 정확한 비교는 어렵지만 pandas를 사용한 경우 Petastorm보다 약 2배 정도 빠른 결과를 보였다.

어떻게 Petastorm보다 빠르게 만들 수 있을까

데이터 로더 구성 중 실행 속도에 가장 큰 영향을 주는 요소는 3가지 정도로 나눌 수 있다.

  • file IO (read)
  • transform (augmentation)
  • parallelize

3개 요소에 해당하는 부분을 개선할 수 있는지 확인해 보겠다.

parquet read — PyArrow vs fastparquet

Petastorm에서 사용하는 parquet backend는 PyArrow이다. parquet 접근 라이브러리를 바꿔주는 것만으로도 속도 개선에 도움이 된다.

# pyarrow == 2.0.0
for parquet in list_parquet:
df = pd.read_parquet(parquet, engine='pyarrow', columns=['unique_id', 'img_jpg])
# fastparquet == 0.4.1
for parquet in list_parquet:
df = pd.read_parquet(parquet, engine='fastparquet', columns=['unique_id', 'img_jpg'])

pandas에서도 read_parquet 함수에서 2개의 backend를 지원하기 때문에 pandas만으로도 테스트해볼 수가 있다(pip 등으로 backend 라이브러리도 설치 필요).

표 2 string column 1개 선택 시 parquet(snappy 압축) 데이터 2000만 건 read 속도

데이터 양과 타입에 따라 다르지만 약 1.6~2배에 가까운 속도 차이를 보인다. backend 라이브러리 교체만으로도 성능을 2배 정도 향상시킬 수 있다.

또한 Petastorm의 경우 Spark read 시간 + Spark to Petastorm dataset(로컬 캐시 디렉터리에 복사)에 시간이 오래 걸리는 편이다. Petastorm에서 parquet 파일 2000개(snappy 압축 기준 300GB 정도)의 데이터를 읽는데 30분 정도가 소모된다(2000만 건 데이터).

로컬 캐시에 적재하는 초기화 시간이 30분 이상이 걸리는 것은 전체 학습 시간에 작은 부분을 차지할 수 있으나 초기 로딩 테스트 등 매 학습마다 30분이 고정적으로 소모되기 때문에 실험 시간을 상당히 비효율적으로 사용할 수 밖에 없게 된다.

shuffling

모든 데이터를 메모리에 적재할 수 없기 때문에 Petastorm과 같은 shuffling 방식을 사용한다.

parquet read와 shuffling이 적용된 PyTorch 데이터셋을 만들면 다음과 같다.

class ParquetCategoryDataset(data.Dataset):  
def __init__(self,
parquet_path,
raw_cols=['unique_id'],
use_cols=[],
num_cached_parquet=5
):
self.raw_cols = raw_cols # parquet file에서 사용할 column
self.use_cols = use_cols # 출력 sample에서 사용할 column
self.parquet_list = sorted(glob(join(parquet_path, '*.parquet')))
self.num_cached_parquet = num_cached_parquet # 캐시할 파일 개수
self.steps_cache = int(np.ceil(
len(self.parquet_list) / self.num_cached_parquet)) # cache step
self.current_parquet_idx = 0
self.current_pd_parquets = None # cached parquets
self.current_indices_in_cache = [] # data index in cached parquet
self.steps_per_epoch = 0
self.total_len = self.get_total_length()
self._cache_setting() def _cache_setting(self):
cur_pd, cur_indices = self._cache_parquet(self.current_parquet_idx)
self.current_pd_parquets = cur_pd
self.current_indices_in_cache = cur_indices
def get_total_length(self):
fdf = fastparquet.ParquetFile(self.parquet_list)
total_len = 0
for df in fdf.iter_row_groups(columns=['unique_id']):
total_len += len(df)
return total_len
def _cache_parquet(self, idx):
next_idx = (idx+1)*self.num_cached_parquet
next_idx = None if next_idx &gt; len(self.parquet_list) else next_idx
list_part_parquet = self.parquet_list[
idx*self.num_cached_parquet:next_idx
]
fparquet = fastparquet.ParquetFile(list_part_parquet) list_df = []
for df in fparquet.iter_row_groups(columns=self.raw_cols):
list_df.append(df)
df_data = pd.concat(list_df)
now = time.time()
seed = int((now - int(now))*100000)
rng = np.random.RandomState(seed=seed)
np_indices = rng.permutation(len(df_data)) \
if self.shuffle else np.arange(len(df_data))
list_indices = np_indices.tolist()
return df_data, list_indices def _transform_raw_to_array(self, pd_raw_data):
dict_array_data = {}
if 'prod_id' in self.use_cols:
dict_array_data['prod_id'] = np.array(int(pd_raw_data['unique_id']))
if 'output' in self.use_cols:
dict_array_data['output'] = np.array(
label_dict[pd_raw_data['cat_id']])
if 'img_input' in self.use_cols:
dict_array_data['img_input'] \
= jpg_array_to_img_matrix(pd_raw_data['img_jpg'])
def __len__(self):
return self.total_len
def __getitem__(self, idx):
# idx는 사용하지 않고 parquet data queue에서 pop
refresh_idx = 1
# 현재 캐시 파일 내 데이터를 모두 탐색한 경우
if len(self.current_indices_in_cache) &lt; refresh_idx:
# 다음 parquet 파일들을 로딩
self.current_parquet_idx += 1
# 모든 parquet 파일 로딩한 경우 첫 번째 파일로
if self.current_parquet_idx &gt;= self.steps_cache:
self.current_parquet_idx = 0
# parquet cache loading
self._cache_setting()
pd_idx = self.current_indices_in_cache.pop()
pd_raw = self.current_pd_parquets.iloc[pd_idx]
sample = self._transform_raw_to_array(pd_raw) return sample

transform — augmentation

albumentations는 numpy, opencv 기반으로 구현된 image transform 라이브러리이다. 사용법은 torchvision.transforms과 유사하며 torchvision보다 약 2~15배 빠른 속도를 나타낸다.

다음은 하나의 image에 대해 transform을 적용하는 작업의 1000회 실행 시간을 측정하여 평균을 낸 값이다.

표 3 라이브러리별 image transform 1000회 실행 시간 비교

torchvision.transforms과 albumentations은 여러 변환 기법을 compose 함수로 묶어 처리할 수 있기 때문에 하나의 객체로 관리할 수 있다.

album_transforms = A.Compose([  
A.Resize(250, 250),
A.ShiftScaleRotate(
shift_limit=0.05,
scale_limit=0.05,
rotate_limit=15,
p=0.5),
A.RandomCrop(250, 250),
A.HorizontalFlip(p=0.5),
#A.RandomBrightnessContrast(p=0.2),
#A.Normalize(mean=(0.485, 0.456, 0.406), std=(0.229, 0.224, 0.225)),
ToTensorV2(),
])

만들어 둔 dataset class에 적용하는 방법 또한 매우 간단하다. __getitem__() 함수 내에서 적용하고 싶은 numpy image에 적용하면 결과를 얻을 수 있다.

### ... (생략)
def __getitem__(self, idx):
# idx는 사용하지 않고 parquet data queue에서 pop
refresh_idx = 1
# 현재 캐시 파일 내 데이터를 모두 탐색한 경우
if len(self.current_indices_in_cache) &lt; refresh_idx:
# 다음 parquet 파일들을 로딩
self.current_parquet_idx += 1
# 모든 parquet 파일 로딩한 경우 첫 번째 파일로
if self.current_parquet_idx &gt;= self.steps_cache:
self.current_parquet_idx = 0
# parquet cache loading
self._cache_setting()
pd_idx = self.current_indices_in_cache.pop()
pd_raw = self.current_pd_parquets.iloc[pd_idx]
sample = self._transform_raw_to_array(pd_raw) # self.transform == album_transform
if self.transform:
transformed = self.transform(image=sample['img_input'])
sample['img_input'] = transformed['image']
return sample

병렬화

캐시 방식 데이터셋 구현은 캐시의 특성상 dataset[idx]인 index를 사용하는 것이 아닌 cache.pop() 구조를 사용한다. index를 사용하지 않기 때문에 PyTorch 데이터 로더의 병렬화 관점에서는 추가로 생각해야 할 부분이 있다.

PyTorch 데이터셋은 1개의 sample을 만들어내는 클래스이기 때문에 위 그림처럼 1개의 프로세스일 때는 모든 데이터에 중복 없이 순차적 접근이 가능하다.

하지만 병렬 프로세스로 만들면 parquet 캐시도 그대로 복제된다. 불행하게도 모든 프로세스가 공유되지 않는 캐시 메모리를 갖기 때문에 해당 버퍼에서 데이터를 가지고 올 때 중복이 발생할 수 밖에 없다.

데이터셋 내에 캐시 버퍼를 두는 경우는 해당 문제를 피하기 어려우므로 PyTorch 데이터 로더 소스코드를 수정하거나 redis와 같은 인메모리 데이터베이스를 사용하지 않는 이상 근본적인 해결은 어렵다.

우회해서 해결하기 위해 중복이 일어나는 경우를 고려해보겠다.

  1. batch 내 중복: batch 내에 같은 데이터가 존재하는지
  2. epoch 내 중복: 전체 데이터를 어느 시점에 다 볼 수 있는지

중복을 피하는 것이 가장 좋지만 전체 중복을 피하는 것이 불가능한 경우 어떤 경우를 피해야 할까?

딥러닝은 대체로 여러 epoch를 통해 모델을 학습하는 만큼 epoch 내 중복은 허용할 만하다고 생각할 수 있다. 예를 들어 5 epoch 동안 같은 데이터를 5번 중복해서 보게 되는 만큼, 같은 iteration 동안 epoch에 관계 없이 5번 중복이 일어나는 경우는 허용 가능하다고 생각할 수 있다.

반면 batch 내 중복의 경우는 해당 데이터에 correlation이 생기는 형태로 학습에 영향을 미칠 수 있으므로 최대한 피해야 한다.

즉, __getitem__() 함수에서 parquet 캐시를 빠른 주기로 교체해서 최대한 중복을 피하도록 디자인하면 병렬화된 데이터 로더를 만들 수 있다.

def __getitem__(self, idx):        # parquet file을 교체하는 refresh_idx를 전체 주기보다 짧게 디자인한다.
# 이때 교체 주기는 하이퍼파라미터로 중복 비율에 영향을 미친다.
parquet_refresh_freq = 4
refresh_idx = 1 \
if self.num_workers == 0 \
else len(self.current_pd_parquets) \
- len(self.current_pd_parquets) // (parquet_refresh_freq*self.num_workers)
if len(self.current_indices_in_cache) &lt; refresh_idx:
self.current_parquet_idx += 1
# 캐시 파일 교체 주기가 빨라지므로 교체할 때 parquet list도 무작위로 섞는다.
if self.current_parquet_idx &gt;= self.steps_cache:
self.current_parquet_idx = 0
if self.num_workers &gt; 0:
now = time.time()
seed = int((now - int(now))*100000)
rng = np.random.RandomState(seed=seed)
rng.shuffle(self.parquet_list)
self._cache_setting() # 단일 프로세스에서 같은 리스트를 pop으로 꺼내는 경우는 병렬 프로세스에서 중복을 일으키므로
# indices 행렬에서 random sampling으로 수행 후 해당 index를 삭제하는 형태로 변형한다.
if self.num_workers != 0:
now = time.time()
seed = int((now - int(now))*100000)
rng = np.random.RandomState(seed=seed)
rand_idx = rng.randint(
len(self.current_indices_in_cache)
)
pd_idx = self.current_indices_in_cache[rand_idx]
del self.current_indices_in_cache[rand_idx]
else:
pd_idx = self.current_indices_in_cache.pop()
pd_raw = self.current_pd_parquets.iloc[pd_idx]
sample = self._transform_raw_to_array(pd_raw) if self.transform:
transformed = self.transform(image=sample['img_input'])
sample['img_input'] = transformed['image']
return sample

iteration에 따른 누적 unique id 수를 그래프로 그리면 다음과 같다(batchsize: 100, 전체 데이터 수: 102,768).

단일 프로세스의 경우 순차 탐색이 가능하기 때문에 1 epoch 만에 10만 개를 데이터 전체를 볼 수 있다(사실 실제로는 캐시 파일 교체 시 약간의 차이가 존재하기 때문에 1 epoch + 수 batch_idx 필요). 반면 worker > 1인 멀티 프로세스의 경우 순차 탐색이 불가능하기 때문에 앞서 설명한 캐시 내에서 shuffle 효과를 최대화하고 캐시 파일 주기를 변경하는 방법으로 중복을 최소화한다.

epoch 내에서 발생하는 중복은 피하기 어렵다. 캐시 파일 교체 주기를 조절해서 coverage를 조절할 수 있다. 5 epoch에 해당하는 iteration에서 99% 데이터를 학습할 수 있게 조절할 경우 coverage는 다음과 같다.

표 4 epoch별 unique id coverage
표 5 batch 내 unique id 비율

12개의 프로세스를 병렬적으로 수행할 때 단일 프로세스보다 6배 정도 빠른 속도로 학습할 수 있다. 즉, 같은 시간 동안 6배의 데이터를 학습할 수 있기 때문에 5 epoch에 해당하는 시간 동안 모든 데이터를 볼 수 있고 6배에 해당하는 데이터를 학습할 수 있다.

표 6 병렬화 수별 실행 시간

참고로 batch size 100 기준 12 이상은 큰 성능 향상은 없었다(물리코어 14, 논리코어 48 CPU 기준).

실험값에 대한 내용은 Appendix를 참고한다.

Appendix. 복원 추출과 비복원 추출

복원 추출은 모집단에서 표본을 추출할 때 하나의 자료를 뽑은 뒤 다시 모집단에 넣고 다음 자료를 뽑는 방법이다.

비복원 추출은 모집단에서 표본을 추출할 때 하나의 자료를 뽑은 뒤 모집단에 넣지 않고 다음 자료를 뽑는 방법이다.

기본적으로 데이터셋은 epoch 내에서 비복원 추출을 전제로 한다. 복원 추출할 때는 어떤 결과를 보일까?

10000개의 데이터가 있다고 가정하고 iteration 만큼 복원 추출하면 고유한 id의 비율은 얼마나 될까?
(iteration당 1개의 데이터를 추출)

data = np.arange(10000)  
iteration = 50000
unique_id_ratio = np.unique(np.random.choice(data, iteration)).shape[0] / len(data)

비복원 추출의 경우 1epoch에서 선형적으로 증가하면서 1만 개를 보는 시점에 누적 비율이 100%를 달성한다. 반면 복원 추출의 경우 중복이 발생하므로 1 epoch에서는 약 63% 정도의 누적 unique id 비율을 달성하고 epoch별로 87%, 95%, 98%, 99%를 달성하는 것을 볼 수 있다.

해당 비율은 제안한 데이터 로더의 unique id 실험 결과와 일치한다.

def aver_unique_ratio(x, samples, times):  
_sum = 0
for i in range(times):
_sum += np.unique(np.random.choice(x, samples)).shape[0] / len(x)
return _sum / times
x = np.arange(1000, 50000, 1000)
y = [aver_unique_ratio(np.arange(10000), t, 10) for t in x]
plt.plot(x, y)
plt.show()

실제 사례에서는 parquet 파일을 일부만 로드해서 사용하는 형태이기 때문에 몇 가지 트릭을 넣어주는 것이 초기 학습에 도움이 될 수 있다.

10000개의 데이터 중 초기 1000개의 복원 추출 unique 비율을 보면 중복 비율이 더 작은 것을 확인할 수 있다. 즉, 10000개짜리 parquet 파일이 여러 개 존재할 때 refresh를 자주 할수록 초기 중복 비율을 낮추는 데 유리하다.

마치며

지금까지 큰 데이터를 위한 데이터 로더 제작 방법을 살펴보았다.

데이터 엔지니어링 툴과 직접적인 연계를 할 수 있는 방법을 찾아보았고 Petastorm과 같은 우수한 선행 연구가 있는 것을 확인했다. Petastorm에서 다룬 parquet backend 사용 방법, data shuffling 방법을 기반으로 더 빠르고 편하게 사용할 수 있는 데이터 로더를 개발했다.

새롭게 제안한 데이터 로더는 parquet backend 교체만으로 멀티 프로세싱을 사용하지 않더라도 기존 솔루션 대비 우수한 성능을 보였다(Petastorm 대비 220%, dali-cpu 대비 126% 속도).

멀티 프로세싱을 사용하는 경우에는 PyTorch 데이터 로더의 특징으로 인해 복원 추출을 해야 하는 단점이 있지만 학습에는 큰 문제가 없고 같은 시간 동안 2배~6배 더 많은 데이터를 볼 수 있는 것을 확인했다(Petastorm 대비 약 10배, dali-cpu 대비 약 6배).

복원 추출 기반 데이터 로더는 평가용으로 사용할 수 없기 때문에 학습용과는 따로 구성해야 한다. 하지만 일반적으로 평가용 데이터는 크기가 작기 때문에 전체 데이터를 메모리에 업로드할 수 있다.

만약 데이터 로더를 반드시 비복원 추출로 만들어야 한다면 process 간 공유가 가능한 메모리를 반드시 사용해야 한다. redis와 같은 인메모리 데이터베이스 등이 대표적인 방법일 것 같다. 다른 해결법으로는 torch.utils.data.DataLoader 대신 ray 기반의 새로운 로더를 만드는 방법도 가능하다. 다만 두 방법 모두 코드의 복잡성이 증가하게 되므로 꼭 필요한 상황이 아니라면 학습용은 복원 추출, 평가용은 비복원 추출 데이터 로더를 구성해서 사용하는 것도 많은 상황에서 사용 가능한 해결 방법이라고 생각한다.

네이버 쇼핑에서는 우수한 머신러닝 모델을 만드는 것만큼 데이터 엔지니어링에도 집중하고 있다. 실험 속도를 빠르게 만들어줄 데이터 파이프라인부터 서비스를 위한 추론 환경 구성까지 많은 고민을 하고, 또 풀어나가고 있다. 앞으로도 편하고 행복한 쇼핑이 될 수 있게 한 발씩 나아가겠다.

--

--