Apache Airflow와 Amazon SageMaker Feature Store 연동하기

Sungin Lee
Cloud Villains
Published in
11 min readSep 13, 2021

지난 AWS re:Invent 2020에서 새로 출시된 서비스인 Amazon SageMaker Feature Store가 프로덕션 환경에서 사용 가능한지 검증해 볼 일이 있었다. Apache Airflow를 이용해 머신 러닝 워크플로우를 관리하고 있는 곳이었기 때문에 Airflow와 SageMaker Feature Store가 연동이 가능한지도 검증해야 했다.

이 게시물에서는 검증을 위한 테스트 환경을 어떻게 구축했는지 공유하면서, SageMaker Feature Store를 도입하려고 할 때 고민해봐야 할 점들을 함께 살펴보고자 한다.

배경 지식

Apache Airflow?

Airbnb에서 만든 Python 기반의 워크플로우 관리 오픈소스 플랫폼이다. Python 기반이라 머신 러닝을 하는 사람들에게 접근성이 좋아 머신 러닝 파이프라인을 구축하는 데 많이 쓰인다.

Feature Store?

출처: Logical Clocks 블로그

조직 내부에서 데이터에 대한 feature를 공유하기 위해 사용되는 feature 저장소이다. 동일한 원본 데이터를 가지고 여러 머신 러닝 모델을 학습시키다 보면 복수의 모델이 중복되는 feature를 사용하게 될 수 있다. 이렇게 자주 사용되는 feature는 모델을 학습시킬 때마다 직접 추출하기보다 미리 추출하여 별도의 저장소에 보관하는 것이 더 효율적일 것이다.

또한 feature store를 활용하면 모델 학습 시에 원본 데이터 소스(DB 등)에 접속할 일이 그만큼 줄어드는 것이기 때문에 기존 아키텍처에 따라 메인 DB의 부하를 줄이는 효과도 있을 수 있다.

Architecture

Feature Store를 사용하는 단계는 크게 다음과 같은 두 단계로 나누어진다.

  1. 원본 데이터 소스에서 데이터를 불러와 feature engineering 작업 후 feature 저장
  2. 저장된 feature를 불러와 모델 학습

이번 검증에서는 1단계, 즉 feature engineering 후 feature를 저장하는 단계만 검증하였다.

구체적으로는 AWS 기본 VPC 내 public subnet에 EC2 인스턴스를 생성하고, 이 인스턴스에 도커를 이용해 Airflow를 설치하였다. 이후 S3 버킷에서 데이터 불러와 전처리 후 SageMaker Feature Store에 업로드하는 DAG를 작성하였다.

검증에 사용한 데이터 용량이 크지 않아 Airflow webserver 컨테이너 내부에서 전처리를 수행하도록 코드를 작성하였으나, 대용량 데이터 처리 시에는 worker 인스턴스 혹은 컨테이너를 분리하여야 할 것이다.

Demo

주의사항

SageMaker Python SDK 버전 2.38 이상에서는 FeatureGroup.ingest() 메소드 실행 시 NotImplementedError가 발생하는 이슈가 있다 (2021년 9월 기준 아직 해결되지 않았다).

pip list | grep sagemaker

명령어로 SageMaker SDK 버전을 확인하고, 2.38 이상의 버전이 설치되어 있다면 2.37 버전으로 다운그레이드 해야 한다.

Feature Group 생성

Feature를 업로드하기 전에 feature group을 생성해야 한다. 여기서는 이미 feature group이 생성된 상태라고 가정하고 진행하겠다. Feature group 생성에 대한 자세한 내용은 공식 문서를 참고하도록 하자.

이미지 빌드

기본 VPC 내 public subnet에 EC2 인스턴스를 생성하고 도커를 이용해 Airflow를 설치하였다. 그런데 Airflow 베이스 컨테이너 이미지에는 S3 및 SageMaker SDK와 관련된 일부 패키지가 설치되어 있지 않았다. 그래서 airflow 베이스 이미지에다 필요한 패키지를 설치하여 이미지를 새로 빌드하였다.

docker build --tag custom-airflow .

그리고 Airflow 설치용 docker-compose.yaml 파일에 있는 airflow-common 이미지 소스를 위에서 빌드한 이미지로 변경하였다.

DAG 파일 작성

데이터를 소스(S3 버킷)에서 불러와 간단한 전처리 후 SageMaker Feature Store로 저장하는 DAG 파일을 작성하였다.

load_data 함수 내부를 한 줄씩 살펴보자.

데이터 불러오기

data = pd.read_csv(kwargs['dag_run'].conf['path'])

데이터가 csv 파일이라고 가정하고, pandas 패키지로 원하는 path에서 데이터를 불러온다. path는 나중에 DAG를 실행시킬 때 입력하도록 설정하였다.

Feature Engineering

data = data[['prd_no', 'datetime', 'adjusted_cnt']]

원하는 feature를 추출한다. 이번 검증의 경우 feature engineering 자체가 목적이 아니었기 때문에 단순히 특정 컬럼을 추출하는 방식으로 처리하였다.

Record Identifier 컬럼 추가

data = data.reset_index()

모든 feature group에는 각 행(record)마다 고유한 값을 갖는 record identifier 컬럼이 필요하다. 검증 시 사용한 샘플 데이터는 시계열 데이터로 제품번호(prd_no)와 시간(datetime)의 다중 컬럼 인덱스로 이루어져 있었는데, SageMaker Feature Store는 하나의 컬럼만을 identifier로 사용할 수 있어 임시로 pandas DataFrame의 index를 identifier로 사용하였다. 실제 프로덕션에서 다중 컬럼 인덱스 데이터를 SageMaker Feature Store에 저장할 때는 해시 등을 통해 인덱스를 관리해야 할 것으로 보인다.

위의 코드는 reset_index() 메소드를 이용하여 index를 새로운 컬럼으로 가져오는 코드이다.

event_time 컬럼 추가

data['event_time'] = time.time()

모든 feature group에는 feature의 버전 관리를 위해 Eventtime(feature를 입력한 시간) 컬럼이 필요하다. Eventtime 컬럼이 지원하는 데이터 타입은

  • yyyy-MM-dd'T'HH:mm:ssZ 형태의 문자열 (e.g. 2021-09-08'T'15:21:00Z)
  • yyyy-MM-dd'T'HH:mm:ss.SSSZ 형태의 문자열 (e.g. 2021-09-08'T'15:21:00:000Z)
  • unix timestamp (float 형태)

이다.

위의 코드는 time.time() 함수를 통해 현재 unix timestamp를 입력하는 코드이다.

데이터 업로드

FeatureGroup(
name=kwargs['dag_run'].conf['feature_group_name'],
sagemaker_session=feature_store_session
).ingest(
data_frame=data,
max_workers=3,
wait=True
)

ingest() 메소드를 이용해 feature group에 데이터를 업로드한다. feature group 이름은 DAG 실행 시에 입력하도록 설정하였다.

wait=True로 설정하면 모든 데이터가 업로드된 후에 DAG가 종료되고, wait=False로 설정하면 업로드 API 요청만 보낸 후에 DAG가 종료된다. 그러나 이 경우 데이터 업로드 완료 여부를 파악하는 API가 제공되지 않는다는 문제가 있다 (2021년 9월 기준).

한편, wait=True로 설정할 경우 일정 시간이 지나면 ERROR-Received SIGTERM. Terminating subprocesses. 라는 메시지와 함께 DAG가 실패한 것으로 처리되는 이슈가 있었다. 이는 Airflow Scheduler의 설정 문제로, airflow.cfg 에서 AIRFLOW__SCHEDULER__SCHEDULE_AFTER_TASK_EXECUTIONFalse로 변경하여 해결하였다. Airflow 버전 2.1.3에서 이 이슈가 해결되었다고 하는데 실제로 해결되었는지는 확인이 필요하다.

DAG 실행시키기

DAG 파일을 잘 작성하였다면 Airflow webserver에서 방금 생성된 DAG를 확인할 수 있다.

Airflow webserver에서 방금 작성한 DAG를 확인할 수 있다.

필요한 config (원본 데이터 path, feature를 업로드할 feature_group_name)를 json 형태로 입력하여 DAG를 trigger한다.

‘Trigger DAG w/ config’ 버튼을 클릭한다.
{
"path": "s3://<Bucket>/<Prefix>/<File>.csv",
"feature_group_name": "<FeatureGroupName>"
}
config 입력

Trigger 이후 DAG가 success 상태가 될 때까지 기다린다. 성공했다면 S3 콘솔 상에서 feature group 생성 시 설정했던 offline store 위치에 parquet 파일이 저장된 것을 확인할 수 있을 것이다.

offline store 위치에 parquet 파일이 저장된 것을 확인할 수 있다.

이렇게 Airflow를 사용해 원본 데이터 소스에서 데이터를 불러와 feature 추출 후 Feature Store에 저장해 보았다.

Next Steps?

다음 단계는 Feature Store에 저장된 feature를 불러와 모델 학습 및 추론에 사용하는 것이다. Online store를 사용할 경우 GetRecord(API 문서/Boto3) 혹은 BatchGetRecord(API 문서/Boto3) API를 통해 가장 최근에 저장된 feature를 불러올 수 있고, offline store를 사용할 경우 Amazon Athena 쿼리를 통해 원하는 버전의 feature를 불러올 수 있다.

Athena 쿼리로 가장 최근의 feature를 불러오는 예시. 쿼리 예문은 AWS 공식 문서를 참고하였다.

SageMaker Feature Store 사용 시 고려할 점

마지막으로 이번 검증을 수행하면서 느낀 애로사항을 적어 보았다. 실제로 SageMaker Feature Store를 프로덕션에 도입하려고 한다면 아래와 같은 점을 고려하여야 할 것이다.

  1. Record Identifier: 단일 컬럼 identifier만을 지원한다. 다중 컬럼 index를 사용하는 feature의 경우 별도의 해시 등을 통한 관리가 필요하다.
  2. 지원하는 데이터 타입이 제한적임: 지원되는 데이터 타입이 string, integral, fractional 뿐이다. Datetime 정도는 추가로 지원해 줄 만하지 않았나 하는 아쉬움이 있다.
  3. Feature 불러오기 방법이 제한적임: 공식적으로 지원하는 feature 불러오기 방법이 개별 record identifier 값을 입력해야 하는 GetRecord API와 Athena 쿼리 뿐이다. 대규모 feature를 불러오기 위해서 사용할 수 있는 방법은 SQL 쿼리밖에 없는 셈이다.
    오프라인 저장소 사용 시 feature가 S3에 parquet 형태로 저장되기 때문에 Apache PyArrow를 사용하여 직접 불러올 수도 있을 것 같지만, 이 방법은 추가로 테스트가 필요하다.

--

--