CDC log를 Snowflake에 적용시키는 방법 — Snowflake Stream & Task with AWS DMS

Hyuna J
Snowflake Korea
Published in
12 min readNov 15, 2022

지난 번 포스팅에서는 Airbyte를 사용해 소스 데이터베이스의 데이터를 Snowflake로 CDC ETL하는 방법을 알아봤습니다.

이번 시간에는 AWS의 DMS(Data Migration Service) 서비스를 사용하여 해당 시나리오를 구현하는 방법을 알아보겠습니다.
AWS의 DMS는 지원하고 있는 모든 데이터베이스 소스에 존재하는 전체 데이터와 CDC 데이터(변경 데이터 캡쳐)를 S3로 쓸 수 있는 기능을 제공하고 있습니다.
그리고 Snowflake에서 우리는 Snowflake의 Stream과 Task 기능을 사용해서 해당 변경사항들을 Snowflake의 타겟 table에 반영시킬 수 있습니다.

그럼 이제 수행되는 과정들을 단계별로 살펴보겠습니다.

Step 1. AWS DMS 타겟을 S3로 지정하여 소스 테이블의 변경 트랜잭션들을 S3로 떨어뜨립니다.
DMS는 사용자가 지정한 S3 버킷 내에서 다음과 같은 형식으로 폴더를 생성후 파일을 떨어뜨립니다.

database_schema_name/table_name/CDCLOAD00000001.csv
database_schema_name/table_name/CDCLOAD00000002.csv

그리고 해당 csv 파일을 열어보면 다음과 같은 형식으로 데이터가 저장되어 있는 것을 확인할 수 있습니다.

I,101,Smith,Bob,4-Jun-14,New York
U,101,Smith,Bob,8-Oct-15,Los Angeles
D,101,Smith,Bob,13-Mar-17,Dallas

첫번째 열이 바로 행 데이터가 어떤 트랜잭션 형태(Insert, Update, Delete)를 기반으로 데이터를 변경했는지를 가르쳐줍니다.

Step 2. S3로 DMS가 생성된 데이터들을 Snowflake 상에서 읽을 수 있도록 해당 버킷위치를 external table로 지정해줍니다.
external table은 외부에 존재하는 데이터를 로딩 없이 바로 snowflake에서 읽을 수 있도록 해주는 기능입니다. 동일한 형식의 데이터 셋들이 존재하는 버킷위치만 알려주시면 해당 위치의 데이터들을 바로 조회해보실 수 있습니다.

create external table ext_employee
location=@AWS_STAGE/HR/EMPLOYEE
auto_refresh = true
file_format = (type = csv);

Step 3. 2단계에서 생성한 external table에 대해 stream을 생성합니다. 만약 버킷에 새로운 트랜잭션을 담은 데이터 파일이 들어온다면 external table에 대해 새로운 레코드들이 insert되었음을 stream에서 인지하고 기록합니다.

create stream employee_stream on external table ext_employee insert_only = true;

stream은 테이블/뷰에 대해 발생하는 모든 트랜잭션을 레코드로 기록해두는 snowflake의 기능입니다. 해당 기능을 활용하여 다양한 Data Pipeline 작업을 수행하실 수 있습니다.

Step 4. stream에 기록된 레코드들을 주기적으로 타겟 table에 적용하도록 타겟 table과 task를 생성합니다.

create table employee(
id number,
last_name varchar,
first_name varchar,
date varchar,
location varchar);

아래 task를 통해 DMS에서 생성한 CDC 로그 레코드들을 타겟 tabel에 적용시켜주는 쿼리를 1분마다 돌리도록 설정했습니다.
task는 특정 SQL문 또는 프로시저를 사용자가 정해준 빈도에 따라 자동으로 수행되도록 도와주는 스케쥴링 기능입니다. 여기서는 CDC 로그들이 주기적으로 타겟 table에 적용되도록 하기 위해 task를 생성했습니다.
(아래 코드는 샘플용으로 간단하게 구현되었습니다. 실환경에 적용하기 위해서는 하나의 파일 안에 같은 레코드가 두번 업데이트 되었을 때 마지막 업데이트만 적용되도록 하는 등의 로직을 함께 고민해야 합니다.)

create task employee_cdc
warehouse = compute_wh
schedule = '1 minute'
as
merge into employee T -- Target table to merge changes from source table
using (select value:c1::varchar as transactions,
value:c2::number as id,
value:c3::varchar as last_name,
value:c4::varchar as first_name,
value:c5::varchar as date,
value:c6::varchar as location
from employee_stream
) S
ON T.id=S.id
when matched -- DELETE condition
and S.transactions = 'D'
then delete
when matched -- UPDATE condition
and S.transactions = 'U'
then update
set T.id=S.id,
T.last_name=S.last_name,
T.first_name=S.first_name,
T.date=S.date,
T.location=S.location
when not matched -- INSERT condition
and S.transactions = 'I'
then insert
values
(S.id,S.last_name,S.first_name,S.date,S.location);

이제 필요한 모든 준비는 끝났습니다!
자 그럼 실제 로그 데이터를 한 건씩 넣어보도록 하겠습니다.

Insert 테스트

Source 데이터베이스에서 데이터를 insert 시 S3 bucket으로 아래와 같은 로그가 떨어질 것입니다.

I,101,Smith,Bob,4-Jun-14,New York

해당 로그가 S3 버킷에 들어오면 ext_employee table은 자동으로 이를 반영합니다.

select * from ext_employee;

(이는 auto_refresh를 true로 해두시고, S3에 대한 이벤트 알림을 설정해두신 경우입니다. 이 때 지정한 버킷 위치에 데이터 파일이 추가되거나 변경될때 snowflake는 이를 자동으로 인지하고 이를 external table 메타데이터에 반영합니다.
이를 설정하지 않으실 경우 alter external table <table_name> refresh; 명령어를 통해 메뉴얼하게 수행할 수 있습니다.)

그리고 해당 table에 새로운 레코드가 들어왔기 때문에 table에 걸려있던 employee_stream에도 새로운 레코드가 들어왔음을 확인할 수 있습니다.

select * from employee_stream;

우리가 만든 task는 1분에 한번씩 돌면서 employee_stream에 있는 레코드들을 target table인 employee table로 적용시켜줄것입니다.
만약 테스트에서 task를 기다리기가 어렵다면 task 아래의 sql을 직접 돌려주셔도 괜찮습니다.

merge into employee T     -- Target table to merge changes from source table
using (select value:c1::varchar as transactions,
value:c2::number as id,
value:c3::varchar as last_name,
value:c4::varchar as first_name,
value:c5::varchar as date,
value:c6::varchar as location
from employee_stream
) S
ON T.id=S.id
when matched -- DELETE condition
and S.transactions = 'D'
then delete
when matched -- UPDATE condition
and S.transactions = 'U'
then update
set T.id=S.id,
T.last_name=S.last_name,
T.first_name=S.first_name,
T.date=S.date,
T.location=S.location
when not matched -- INSERT condition
and S.transactions = 'I'
then insert
values
(S.id,S.last_name,S.first_name,S.date,S.location);

여기서 중요한 것은 task이든지, 직접 수행한 sql이든지 한번 타겟 table로 적용된 레코드들은 stream에서 자동으로 삭제되어 중복 적재를 방지시켜준다는 것입니다.
자, 그럼 이제 employee table을 조회해보겠습니다.

select * from employee;

Update 테스트

이번에는 update 로그를 발생시키고 해당 내용이 동일하게 잘 반영되는지를 확인해보겠습니다.

U,101,Smith,Bob,8-Oct-15,Los Angeles

101번의 id를 가지는 Smith씨가 15년 10월 8일 LA로 지역을 옮기면서 해당 값이 업데이트 된 것을 확인할 수 있습니다.

동일하게 해당 로그가 S3 버킷에 들어오면 ext_employee table은 지금까지 버킷에 들어있는 로그들을 모두 읽어 이를 반영합니다.

select * from ext_employee;

그리고 해당 table에 새로운 레코드가 들어왔기 때문에 table에 걸려있던 employee_stream에도 새로운 레코드가 들어왔음을 확인할 수 있습니다.
ext_employee table과 달리 employee_stream 스트림은 기반영된 로그는 삭제하고 아직 타겟 table에 반영되지 않은 새로운 레코드만을 가지고 있는 것을 확인할 수 있습니다.

select * from employee_stream;

task 주기가 1분이었기 때문에 1분 이상을 기다린 뒤 타겟 table인 employee 테이블에 로그가 잘 반영되었는지를 조회해볼까요?
기존 테이블의 Smith씨의 정보가 업데이트된 내용을 확인할 수 있습니다.

select * from employee;

Delete 테스트

마지막으로 delete 로그를 발생시키고 해당 내용이 동일하게 잘 반영되는지를 확인해보겠습니다.

D,101,Smith,Bob,8-Oct-15,Los Angeles

예상하신것처럼 ext_employee table은 지금까지 버킷에 들어있는 로그들을 모두 읽어 이를 반영합니다.

select * from ext_employee;

그리고 해당 table에 새로운 레코드가 들어왔기 때문에 테이블에 걸려있던 employee_stream에도 새로운 레코드가 들어왔음을 확인할 수 있습니다.
이미 들어왔던 I(insert), U(update) 로그들은 반영된 후 삭제되고 미반영된 새로운 로그만이 보이는 것을 확인할 수 있습니다.

select * from employee_stream;

task 주기가 1분이었기 때문에 1분 이상을 기다린 뒤 타겟 table인 employee table에 로그가 잘 반영되었는지를 조회해보겠습니다.
기존 table의 Smith씨의 정보가 삭제된 것을 확인할 수 있습니다.

select * from employee;

이렇게해서 Insert, Update, Delete에 해당하는 트랜잭션 로그들이 모두 타겟 table에 잘 반영된 것을 확인했습니다.
Snowflake의 Stream & Task는 내부 테이블 또는 뷰에 적용하여 데이터 변경 내역들을 트래킹하고 원하는 방식으로 타겟 테이블에 적용하는 데 아주 유용한 기능이지만, 이렇게 외부 데이터 소스들을 읽어서 내부 테이블에 적용하기에도 유연하게 사용할 수 있는 기능입니다.

--

--

Hyuna J
Snowflake Korea

Interested in working with DATA | Sales Engineer at Snowflake. Previously at AWS and Oracle