29CM 데이터 파이프라인 소개

brownbears
29CM TEAM
Published in
10 min readJan 12, 2023

안녕하세요 데이터그로스팀 이진환입니다. 29CM에선 21년 9월 이후부터 데이터에 기반한 의사결정을 원활하게 할 수 있도록 데이터 파이프라인을 빠르게 개선하여 운영하고 있습니다. 지금부터 29CM의 데이터 파이프라인 변천사와 데이터그로스팀의 엔지니어링파트가 생각하는 다음 단계의 데이터 파이프라인을 소개하도록 하겠습니다.

목차

  • Phase0 (21년 9월 이전)
  • Phase1 (21년 9월 ~ 22년 12월)
  • Phase2 (23년 ~)
  • 마치며

Phase0 (21년 9월 이전)

Phase0 (21년 9월 이전) 데이터 파이프라인 아키텍처

21년 9월 이전까지의 데이터 파이프라인은 위와 같습니다. 운영 데이터의 수집이 ETL 방식으로 진행되어 데이터가 필요할 때마다 데이터 엔지니어, 데이터 분석가들이 필요한 데이터를 요청/개발하고 있었고 워크플로는 명확한 기준 없이 Airflow와 Jenkins, Bigquery Scheduled queries로 나뉘어져 사용하고 있었습니다. 또한 데이터 레이크인 GCS와 데이터 웨어하우스인 Bigquery가 추출/변형이 완료된 형태의 동일한 데이터를 가지고 있었고 데이터 마트인 PostgreSQL에는 별도의 ETL 형식 데이터 파이프라인으로 생성한 데이터가 저장되고 있었습니다.

이렇게 기준이 잡히지 않은 데이터 파이프라인은 다음과 같은 문제를 가지고 있었습니다.

  • ETL 방식으로 데이터를 수집하고 있어서 필요할 때마다 매번 데이터 엔지니어, 데이터 분석가의 리소스가 소요됨
  • 특별한 기준이 없이 여러 워크플로 사용으로 인해 오류를 찾기 힘듦
  • Jenkins, Bigquery Scheduled queries에서 오류가 발생할 경우, 백필에 리소스가 소요됨
  • 데이터 레이크와 데이터 웨어하우스에 불필요하게 동일한 데이터가 저장됨
  • 데이터 파이프라인이 2가지(Jenkins, Airflow)로 이원화되어 관리에 리소스가 소요됨
  • 데이터 마트에 저장된 데이터를 데이터 웨어하우스에서 접근할 수 없음

또한 워크플로인 Airflow 1.10.12 버전을 직접 구축하여 운영하고 있었는데 다음과 같은 문제를 가지고 있었습니다.

  • Kubernetes 기반이 아닌 단일 인스턴스에 컨테이너 기반 동작으로 빠른 확장성의 한계
  • Airflow scheduler의 cpu 과점유 현상
  • Airflow worker에 비즈니스 로직이 적용되어 있어 너무 무거워짐
  • 데이터 엔지니어가 플랫폼 문제를 해결하는데 많은 리소스가 들어감

29CM의 데이터 기반의 의사결정을 돕기 위해선 기존 데이터 파이프라인을 유지하면서 위와 같은 복합적인 문제들을 해결하는 것보단 클라우드 서비스를 활용해 새롭게 파이프라인을 구축/운영하는 것이 좋겠다는 결론을 내리고 GCP 클라우드 서비스를 활용한 데이터 파이프라인을 구축하였습니다.

Phase1 (21년 9월 ~ 22년 12월)

Phase1 (21년 9월 ~ 22년 12월) 데이터 파이프라인 아키텍처

GCP를 선택한 이유는 첫 번째로 Bigquery가 있었고 두 번째로 Firebase에서 Bigquery로 유저 데이터를 손쉽게 가져올 수 있었다는 점이었습니다. 부수적인 이유로는 AWS에 비해 GCS, Bigquery의 데이터 보관 정책이 유연하고 비용이 더 저렴하였습니다.

혼용하여 사용하던 워크플로를 Airflow만 사용하도록 정책을 세웠고 기존에 직접 구축하여 운영하던 Airflow를 GCP의 Composer를 사용해 운영은 GCP에 일임하고 DE는 비즈니스를 담당하도록 역할을 분리했습니다.

  • 물론 어느 정도 리소스를 들여 Composer 모니터링 및 이슈를 확인해야 합니다!!

Airflow의 버전을 2.x로 업그레이드하여 scheduler의 cpu 과점유 현상을 해소했고 Airflow는 본 목적인 워크플로를 관리하는 용도로만 사용하기 위해 비즈니스 로직을 Airflow worker에 구현하여 처리하는 것이 아닌 비즈니스 로직을 담당하는 별도의 VM을 호출하는 방식으로 역할을 분리하였습니다. 즉, Airflow는 작업을 스케줄링하고 호출, 모니터링만 담당하고 실제 동작은 별도의 VM에서 처리하는 구조로 구성하였습니다.

이러한 Airflow를 사용하여 ELT, 3rd Party 데이터 수집, 데이터 분석가가 필요한 집계 테이블 및 데이터 마트 생성/전송을 데이터 엔지니어가 진행하고 있습니다.

이렇게 플랫폼을 GCP로 통일하고 데이터가 필요할 때마다 ETL 방식으로 수집하던 부분을 ELT 방식으로 변경하였습니다. 데이터가 최초, 변형 이후 저장되는 곳은 Bigquery가 담당하고 있습니다. 즉, Bigquery를 데이터 레이크 겸 데이터 웨어하우스로 사용하고 있는데 수집하는 데이터 대부분이 정형화된 형태이고 저장소 가격이 저렴한 부분도 있으며 90일간 수정되지 않은 테이블에 대해선 자동으로 가격이 50% 인하가 되며 성능, 내구성, 가용성이 차이가 나지 않기 때문입니다.

운영 데이터를 추출하는 방식은 embulk 오픈소스를 활용하여 구성했습니다. embulk를 활용해 특정 컬럼을 기반으로 매 시간 등록/수정된 데이터(CDC — Change Data Capture)를 추출하여 main/delta 방식으로 최신성을 유지하고 있습니다. 디비지움을 활용하는 방향은 도입하기까지 개발 시간이 소요될 것으로 예상했고 백엔드팀의 도움이 필요했었습니다. 디비지움 도입에 들어가는 리소스 대비 운영 데이터를 실시간으로 유지하고 이를 활용하여 발생할 수 있는 임팩트가 적다고 판단하여 embulk를 도입하고 디비지움은 백로그로 고이 저장한 상황입니다.

Database replication to BigQuery using change data capture | Cloud Architecture Center | Google Cloud

이벤트 데이터의 경우, Firebase를 사용하여 수집하고 있었지만, 실시간 수집이 보장되지 않아 트렌드 추이 파악이 어렵고 완전한 형태의 데이터는 2일이 지나야만 Bigquery로 전달된다는 큰 한계가 있었습니다.

보통 Firebase에서 Bigquery로 데이터 전달은 1일 정도 소요되지만, Bigquery의 dataset 초기설정 오류로 인해 2일이 지나야만 전달이 되는 구조가 되었습니다.

또한 필요한 지표들을 데이터 분석가가 대시보드로 개발해야만 확인할 수 있다는 한계로 더 빠른 의사결정을 위하여 Amplitude를 도입하였습니다.

Amplitude 도입으로 단순한 지표의 추이는 DA의 도움 없이 Amplitude 접근을 할 수 있는 29CM 직원이면 누구나 확인을 할 수 있게 되었고 Amplitude에서 제공하는 export 기능으로 매시간 Amplitude에 적재되는 모든 이벤트를 GCS에 적재할 수 있었습니다.

export 기능에 Bigquery도 존재하여 5분 ~ 3시간까지 선택해서 가져올 수 있지만 개발 당시, 베타 서비스로 제공이 되었고 누락되는 데이터가 10% 이상으로 정합성을 맞출 수 없다고 판단하여 GCS로 진행하였습니다.

이렇게 Phase0에 비해 구조화된 데이터 파이프라인을 빠르게 구축하였고 약 1년간 이를 활용하여 많은 비즈니스, 분석 및 추천 서비스를 개발, 배포하였습니다. 29CM의 빠른 성장과 함께 개발 당시의 예상보다도 더 빠르게 Phase1의 한계에 봉착했는데 현재 발생하고 있는 한계점은 다음과 같습니다.

  • 데이터 마트, 집계 테이블 생성을 데이터 엔지니어가 생성하는데 요청이 늘어나고 있음
  • 저장된 전체 데이터를 조회하여 데이터 마트 및 집계 테이블을 생성하는 빈도가 늘어나고 있음
  • CDC로 가져오는 운영 데이터가 실시간 반영이 되지 않음
  • CDC는 특정 컬럼을 기반으로 진행되기 때문에 hard delete가 된 row는 감지를 못함
  • CDC는 특정 컬럼을 기반으로 진행되기 때문에 수정이 발생한 다음, 특정 컬럼 또한 수정되지 않으면 누락 발생
  • 이벤트 데이터의 실시간 수집이 지원되지 않음
  • Amplitude의 경우, 비용 이슈로 인해 특정 이벤트만 수집하고 있고 모든 이벤트 데이터는 Firebase로 수집되지만 수집되는 기간이 긺
  • 수집한 데이터나 생성한 데이터의 위치, 담당자 확인, 문서화, 계보 관리가 힘듦

Phase2에서는 이러한 한계점을 해결할 수 있는 데이터 파이프라인 구축을 진행하고자 합니다.

Phase2 (23년 ~)

Phase2 (23년 ~) 데이터 파이프라인 아키텍처

Firebase, Amplitude로 이벤트 수집을 진행하고 있던 부분을 CDP(Customer Data Platform)로 대체하여 1개의 플랫폼에서 처리할 예정입니다. 카프카를 활용하여 직접 구축하는 방향이 가장 좋지만, 구축까지의 시간이 오래 걸릴 것으로 생각하여 1. SaaS, 2. 오픈소스 순서로 도입을 검토할 예정입니다.

현재 확인하고 있는 CDP의 Saas 형태로는 Segment, RudderStack, Snowplow가 있고 오픈소스로는 Snowplow가 있습니다.

What is a Customer Data Platform | RudderStack

운영 데이터의 실시간 수집은 Debezium 오픈소스와 Apache Kafka를 활용하여 구축할 예정입니다.

데이터 마트, 집계 테이블 생성은 DBT(Data Build Tool)라는 서비스를 활용하면 쿼리만으로 변형할 수 있으며 데이터 계보와 문서, 테스트 기능도 제공하여 데이터 엔지니어뿐만 아니라 데이터 분석가도 손쉽게 사용할 수 있습니다. 데이터 엔지니어가 담당하던 변형 부분들을 데이터 분석가도 담당하게 된다면 빠르게 분석을 위한 테이블을 생성할 수 있으며 데이터 엔지니어는 엔지니어링에 초점을 맞출 수 있어 리소스 측면에서도 장점이 있습니다.

또한 해당 서비스를 사용하게 되면 Airflow 내에 변형을 위해 저장된 SQL 및 코드들을 DBT 서비스가 담당하여 Airflow는 워크플로 관리, DBT는 변형 관리라는 좀 더 명확한 역할 분리가 가능해집니다.

이후 수집한 테이블이나 생성된 테이블의 문서화나 담당자, 검색 등을 손쉽게 하기 위해 DDP(Data Discovery Platform)를 도입할 예정입니다. 해당 기능을 도입하게 되면 가장 큰 이점은 데이터의 검색 및 관리에 용이해 집니다. 회사나 팀의 규모가 커짐에 따라 생성되는 테이블도 많아지게 되는 데 중앙화된 위치에서 관리한다면 사용하고자 하는 테이블이 어디에 있는지, 특정 컬럼의 의미가 무엇인지 이해하는데 소요되는 시간이 줄어들게 됩니다.

현재 확인하고 있는 DDP는 GCP의 Dataplex나 오픈소스인 Datahub가 있습니다.

마치며

불과 1년 4개월 전에는 제대로 된 데이터 파이프라인이 존재하지 않아 특정 데이터를 생성하는 데에도 며칠이 걸렸지만, 현재는 구조화된 데이터 파이프라인 구축이 완료되었고 실시간 수집, 사용자 편의성, 기능별 역할 분리 등을 도입하기 위해 고려하고 있습니다. Phase2 다음인 Phase3로는 이벤트 수집, 운영 데이터 CDC를 Apache Kafka를 활용하여 직접 구축하는 것과 DQP(Data Quality Platform) 도입으로 데이터 품질 검증 등이 남아 있습니다.

이것으로 29CM의 데이터 파이프라인 소개를 마치며 데이터 파이프라인 개발에 도움이 되었으면 좋겠습니다.

감사합니다.

함께 성장할 동료를 찾습니다

29CM (무신사) 는 3년 연속 거래액 2배의 성장을 이루었습니다.

다양한 소스 데이터에서 안전하고 빠르게 수집, 처리, 저장, 제공하도록 파이프라인을 개발하고 운영하며, 제공된 데이터를 활용하여 데이터 기반 의사결정을 돕기 위해 다양한 대시보드를 제작 및 분석을 진행하고 있습니다. 데이터를 활용하여 가치를 창출하고 함께 성장할 동료 데이터 엔지니어 및 데이터 분석가를 찾고 있습니다.

많은 지원 부탁드립니다!

🚀 29CM 채용 페이지 : https://www.29cmcareers.co.kr/

--

--