Airbyte를 활용해 Snowflake로 소스 데이터베이스 CDC ELT하기

Hyuna J
Snowflake Korea
Published in
8 min readSep 29, 2022

운영환경의 데이터를 분석계에서 활용하기 위해서 운영환경의 데이터를 분석계인 snowflake로 지속적인 적재작업을 수행하는 것은 snowflake를 처음 시작할 때 마주치는 단계입니다.
실제로 S3에 적재되어 있는 데이터들을 Snowflake로 옮기는 방법은 굉장히 간단합니다. 먼저 COPY INTO 명령어를 사용하면 Snowflake에서 해당 파일이 이전에 로딩된적이 있는지를 판단하여 중복 없이 타겟 테이블에 데이터를 적재시켜줍니다. 두번째로는 S3 버킷에 새로운 파일이 생겼을 때 이를 자동으로 감지해서 미리 설정해 둔 타겟 테이블로 데이터를 적재시켜주는 SNOWPIPE 기능을 활용하여 적재 과정을 자동화 시킬 수도 있습니다.
일반적으로 기존 데이터에 대한 변경없이 데이터가 추가되기만 하는(예를 들어 로그데이터 같은) 경우들은 S3에 지속적으로 파일을 추가해서 데이터를 쉽고 간단하게 로딩할 수 있습니다.
Kafka를 사용 중이라면 kafka snowflake connector를 통해서 kafka 메세지를 snowflake로 로딩시킬 수도 있습니다.

snowpipe

접근 방식이 조금 달라지는 경우는 바로 소스데이터가 데이터베이스일 때입니다. 운영데이터베이스의 데이터를 분석계에 우선 1차적으로 적재하려고 하거나, 운영데이터 형태 그대로 분석단계에서 활용하려고 하는 두 경우 모두 운영 데이터베이스를 분석계로 적재하는 과정이 필요합니다. 적재하는 과정에는 크게 1) 전체 로딩(full loading) 2) CDC 증분 로딩(CDC incremental loading) 두 과정이 발생하게 됩니다. 즉 운영 데이터베이스를 전체 로딩한 다음 그 다음부터는 추가적으로 변경된 데이터만 로딩하는 과정을 반복적으로 수행함으로써 운영 데이터베이스의 데이터를 분석계에서도 활용하게 되는거죠.
하지만 데이터베이스에서는 데이터가 새롭게 추가(insert)만 되는 것이 아니라 데이터가 삭제(delete)되기도 하고, 업데이트(update)되기도 하기 때문에 단순히 추가된 데이터를 적재하는 것만으로는 데이터베이스의 업데이트 사항들을 반영했다라고 말하기는 어렵습니다.
그래서 데이터베이스의 데이터를 그대로 분석계에서 활용하기 위해서는 CDC(Capture Data Change) 데이터를 적재하는 개념이 필요해지게 됩니다. 소스 데이터베이스에서 발생되는 트랜잭션(insert, update, delete 등)을 추출해서 타겟 데이터베이스에도 동일하게 적용시켜주는 것이죠.

이 과정을 수행하기 위한 많은 방법들과 솔루션이 있지만, 오늘은 ELT managed 플랫폼인 Airbyte를 활용해서 MySQL 소스 데이터를 Snowflake로 지속적 로딩하는 과정을 수행해보겠습니다.
Airbyte는 ETL(Extract, Transform, Load)와 반대되는 ELT 도구입니다. Airbyte는 데이터를 먼저 대상에 로딩시키고 그 다음 변환 단계를 거치게 됩니다.

Airbyte를 Snowflake와 연결하는 방법은 매우 간단합니다. DB 접속 정보들을 넣어주고 replication 타입을 설정해주면 모든 단계가 끝납니다.

1) [Sources] 탭에서 먼저 소스가 될 데이터베이스의 연결 정보를 입력해줍니다. 저는 MySQL을 소스 데이터베이스로 쓸 것이기 때문에 소스 MySQL의 연결 정보를 입력해주었습니다.
[Replication Method]는 CDC를 선택해줍니다.

Airbyte — source
Airbyte — source

2) [Destinations] 탭에서 목적지가 될 snowflake의 연결 정보를 입력해줍니다.
Host에는 여러분들의 snowsight URL에서 보여지는대로 locator.region.snowflakecomputing.com을 사용하시면 됩니다.
예를 들어 xxxxxxx.ap-northeast-2.aws.snowflakecomputing.com 이 될 수 있겠습니다.

Airbyte — Destination

3) Connection을 통해 생성한 source와 destination을 연결해줍니다.
[Conncetions] 탭에서 [New Connection] 버튼을 클릭합니다. 각 단계에서 방금 생성한 connection들을 선택해줍니다.
[Set up connection] 단계에서는 좀 더 자세한 내용들을 설정하게 됩니다.

Airbyte — conncetion
  • Transfer > Replication frequency
    데이터를 복제하는 주기를 설정합니다. 미리 설정되어있는 값들을 선택할 수도 있고, cron 표현식을 통해 여러분이 원하는 주기를 설정해줄 수도 있습니다.
  • Streams > Destination Namespace
    목적지로 데이터를 복제할때 가져갈 스키마의 구조를 설정합니다.
    airbyte에서는 namespace의 단위를 connection 대상의 종류별로 다르게 설정합니다. 예를 들어 MySQL은 DATABASE를 하나의 namespace 단위로 보고 Snowflake는 SCHEMA 하나의 namespace 단위로 봄으로서 그 기준을 같은 레벨로 맞춰 서로의 구조를 맞춰가게 됩니다.
    - Mirror source struncture : 소스 데이터의 구조를 타겟 데이터에 그대로 적용합니다. 예를 들어 MySQL의 HR DATABASE 아래 EMPLOYEE라는 TABLE이 있다면, Snowflake상에 데이터를 보낼때에는 동일한 레벨을 맞추어 HR SCHEMA 아래 EMPLYOEE라는 TABLE을 생성하게 됩니다.
    - Destination Default : Destination에서 설정한 기본 namespace 복제되고 저장됩니다. Snowflake 연결에서는 SCHEMA를 namespace로 보기 때문에 destination 연결정보에 적었던 schema 아래에 테이블들을 복제합니다.
    - Custom format : 사용자가 정의한 이름의 namespace로 데이터를 복제합니다.
  • Activate the streams you want to sync
    연결한 소스 데이터베이스의 테이블 정보들이 보입니다. 테이블들마다 싱크 여부, 싱크 모드 등을 설정할 수 있습니다.
    - Sync mode > Full refresh overwrite : 싱크시마다 소스의 전체 데이터를 전체 추출해서 타겟에 덮어씌웁니다.
    - Sync mode > Full refresh append : 싱크시마다 소스의 전체 데이터를 전체 추출해서 타겟에 추가합니다.
    - Sync mode > Incremental Deduped + history : 소스의 변경데이터만을 추출해서 타겟 테이블에 적용합니다. 우리가 일반적으로 생각하는 CDC의 기능입니다. (참고로 Incremental 모드는 소스 데이터베이스에 primary key가 있는 경우에만 활성화됩니다. 테스트용으로 테이블을 만들어서 해보다가 incremental 기능이 활성화 안되어서 한참 고민했네요.)
    - Sync mode > Incremental append : 소스의 변경데이터만을 추출해서 타겟테이블에 적용합니다. 하지만 변경 전 데이터를 그대로 남겨두고 변경 후 데이터를 추가하는 방식입니다. 예를 들어 사원정보 50번 데이터의 전화번호에 대해 update가 발생했다면 기존 전화번호 데이터와 새로운 전화번호 데이터가 모두 남게 됩니다.
  • Normalization & Transformation
    Airbyte는 데이터들을 json 타입으로 전송합니다. 이를 다시 소스테이블처럼 보기를 원하는 경우 Normalized tabular data 버튼을 클릭합니다.
    이 외에도 DBT를 통한 형 변환을 지원하고 있습니다.

4) 테스트를 위해 [Sync now] 버튼을 클릭합니다. sync가 이뤄지는 과정들을 로그를 통해 확인할 수 있습니다.

5) 이제 목적지였던 Snowflake에서도 소스 데이터와 동일한 테이블이 생성되고 데이터가 조회되는 것을 확인할 수 있습니다.

--

--

Hyuna J
Snowflake Korea

Interested in working with DATA | Sales Engineer at Snowflake. Previously at AWS and Oracle