DynamoDB 데이터 변경 이벤트 파이프라인 구축하기 (feat. Kinesis)

Jules
당근 테크 블로그
44 min readDec 27, 2021

안녕하세요, 당근마켓에서 라이브 스트리밍 서버 개발 인턴으로 근무하고 있는 Jules에요.

저는 이번에 당근마켓의 라이브 스트리밍 채팅 데이터를 활용하기 위한 DynamoDB 데이터 변경 이벤트 기반의 파이프라인을 구축했어요. DynamoDB, Kinesis Data Streams, Kinesis Data Firehose, Lambda 및 S3를 활용해서 아래 다이어그램과 같이 AWS-Native 이벤트 파이프라인이 만들어졌어요.

AWS-Native Event Pipeline

저와 같이 DynamoDB에 데이터를 쌓아 두고 이를 기반으로 파이프라인을 구축하고 싶은 여러분에게 도움이 되기를 바라며, 저의 경험을 차근차근 공유해 볼게요.

채팅 데이터 파이프라인의 필요성

출처 : Libby Penner on Unsplash

라이브 스트리밍에서 채팅은 빠질 수 없는 필수 기능이에요. 이러한 라이브 스트리밍 채팅에서 생성되는 메시지를 추후 방송 다시보기, 데이터 분석 등의 다양한 목적으로 사용할 예정이에요. 그래서 이 채팅 데이터를 여러 용도에 맞게 활용할 수 있도록 원하는 형태로 조작해서 저장해야 해요. 모든 것을 자동화하고 싶은 개발자로서, 채팅 데이터가 생성되면 자동으로 변환, 저장 및 조회를 수행하는 파이프라인이 필요하게 됐어요.

데이터 변경 이벤트 기반의 파이프라인의 필요성

DynamoDB는 데이터의 Read/Write를 제공하고, PITR¹ 기반으로 S3 백업 기능도 제공하고 있어요. 이러한 기능들을 사용해서 데이터를 활용할 수도 있겠다고 생각하실 수도 있지만, 우리는 이것들 대신 DynamoDB의 변경 이벤트 포착 기능을 활용해 파이프라인을 구축하기로 결정했어요. 그 이유는 아래와 같아요.

DyanmoDB Capacity Unit 제한

DyanmoDB에서 Read를 반복 수행하면 throttle이 불가피해요. MySQL과 같은 관계형 데이터베이스의 경우, Batch Read를 Read-Only Replica 상에서 수행하는 전략을 취한다면 반복 Read가 전체 온라인 서비스 트랜잭션 수행과 격리되어 성능에 영향을 주지 않아요. 하지만 DynamoDB는 테이블과 파티션 단위로 Capacity Unit을 공유하며 Read를 수행하기 때문에 짧은 시간 동안의 반복적인 Read 수행은 Capacity Unit을 소진시켜 throttle을 발생시켜요.

더 구체적으로 설명 해볼게요. DynamoDB On-Demand Mode는 테이블마다 초당 40,000 RCU & 40,000 WCU, 파티션마다 초당 3,000 RCU & 1,000 WCU를 Hard-limit으로 두고 있으며, 이 제한 범위 내에서 Request load에 따라 Capacity Unit을 자동으로 분배﹒조정﹒최적화 해요. 만약 요청이 너무 많아져서 이 전체 Hard-limit을 초과하면 throttle이 발생하는데, 병목이 된 파티션 뿐 아니라 전체 테이블 요청이 throttle 되어서 모든 요청이 거절될 수 있어요. 또한, 기본적으로 Read 작업은 API에 관계 없이 동시성 원칙에 따라 RCU를 다르게 소비하는데, Eventually Consistent Read가 적용되면 4KB 마다 0.5 RCU를 사용하고, Strong Consistent Read의 경우 4KB마다 1 RCU를 사용해요.²

Scan의 문제점

DynamoDB Read API 중 Scan은 주어진 테이블의 모든 항목을 다 읽어 오는 명령으로, 기본적으로 Eventually Consistent Read를 수행해요. 한 번 데이터를 읽을 때 최대 1MB 씩 읽어올 수 있고, 그 때마다 128 RCU(= 1MB / 4KB * 0.5 RCU)가 소비돼요. 결국 Scan을 수행할 때 (전체 데이터 크기 / 1MB * 128 RCU) 만큼의 RCU를 소비하는 것인데, 전체 데이터를 스캔하는 동안 이 RCU를 계속해서 사용하기 때문에 동시에 다른 Read 작업이 throttle 될 위험이 높아져요. 특히 Scan은 수행 속도가 아주 느리기 때문에³, 많은 Scan 작업을 동시에 수행하는 경우 병목이 발생할 확률이 더 커질 거에요.

Query의 문제점

반면 Query는 주어진 파티션의 모든 항목만 다 읽어 오는 명령이예요. 역시 Eventually Consistent Read를 수행해서 한 번의 Read마다 128 RCU를 사용해요. 그래서 이론적으로 따지자면 한 파티션에 24개(= 3,000 RCU / 128 RCU)의 동시 Read 요청이 들어오면 파티션에 할당된 RCU를 모두 소진해서 throttle이 발생하게 돼요. 뿐만 아니라, 테이블 전체의 RCU도 함께 소진되기 때문에 결국 온라인 서비스 로직에도 악영향을 미치게 돼요.

저희 라이브 스트리밍으로 예를 들어 설명해 볼게요. 라이브 스트리밍 방송이 종료되는 시점에 해당 방송의 전체 채팅 데이터를 DynamoDB에서 읽어 오기 위해 라이브 방송 ID를 파티션 키로 해서 Query를 수행한다고 가정 할게요. 이 방송이 한 개만 있다면 문제가 되지 않겠지만, 여러 방송이 동시다발적으로 진행﹒종료된다면 RCU와 WCU가 순간적으로 아주 많이 소비될 수 있겠죠. 여러 라이브 방송에서 끊임없이 발생하는 채팅 메시지를 DynamoDB에 저장해야 하고, 종료된 방송의 채팅을 동시에 전부 읽어와야 하기 때문이에요. 그래서 결국 라이브 스트리밍을 안정적으로 송출해야 하는 온라인 서비스 로직에도 병목이 발생할 수 있어요.

DynamoDB S3 백업의 한계

출처 : Amazon DynamoDB 테이블 S3 내보내기 기능 출시

S3 백업은 테이블의 모든 항목을 S3에 저장해주는 기능이에요. 데이터는 DynamoDB JSON, Amazon Ion 형태로 저장할 수 있고 암호화 설정도 추가할 수 있어요. Export를 수행하면 기간 내 전체 데이터가 gz 파일로 압축되어 아래의 data/ 디렉토리 안에 저장돼요. 별도의 변형 없이 테이블의 모든 데이터를 백업하는 데에는 활용하기 좋은 기능이에요.

출처 : Amazon DynamoDB 테이블 S3 내보내기 기능 출시

하지만, 고정된 형태로 모든 데이터가 저장되기 때문에 저장 이전에는 조작, 변환, 추가 등의 작업을 할 수 없다는 한계가 있어요. 데이터가 S3에 모두 쓰인 이후에 Athena, Glue 등의 정적 SQL 쿼리를 활용하던지, 직접 데이터를 읽고 변환하고 쓰는 어플리케이션을 작성해야 해요. 또한, S3에 Export를 여러 번 수행하면 그 때마다 모든 데이터를 새로운 디렉토리에 다 새로 쓰기 때문에 시﹒공간적으로 비효율적인 데다가 변경사항을 빠르게 파악하기 어렵다는 단점도 있어요. 저희는 채팅 데이터를 DynamoDB JSON 대신 원하는 형태로 변환하고 싶었고, S3 디렉토리 구조도 설정하고 싶었기 때문에 S3 백업 옵션은 채팅 데이터 파이프라인 구성에 적절하지 않다고 판단했어요.

DynamoDB 이벤트 기반 파이프라인

그래서, 데이터를 직접 읽어 와서 활용하는 대신 DynamoDB 데이터 변경 이벤트를 기반으로 파이프라인을 구축 하기로 결정했어요.

Transactional Outbox 패턴

DynamoDB는 Transactional Outbox 패턴의 Outbox 역할을 수행할 수 있어요.

Transactional Outbox 패턴

Transactional Outbox 패턴은 MSA에서 신뢰도 높은 메시징 구현을 위해 활용하는 패턴이에요. 전통적인 관계형 데이터베이스는 위와 같은 방식으로 이벤트 발행 로직을 서비스 로직과 분리함으로써 Transactional Outbox 패턴을 구현해요.

반면, DynamoDB는 테이블 항목 변경을 자동으로 포착하고 이를 포착해 스트림 레코드 형태로 반환하는 스트림 기능을 제공해요. 이를 통해 DynamoDB를 Outbox로 활용해 테이블 항목이 생성, 업데이트, 삭제될 때마다 거의 실시간(Near-real time)으로 스트림 레코드를 받을 수 있어요. 이러한 이벤트 레코드 발행 기능을 활용하면 별도의 Outbox Table 구현 없이도 DynamoDB 가용성에 독립적으로 원하는 작업을 수행할 수 있기 때문에 더 안정적인 파이프라인과 서비스를 제공할 수 있어요.

S3 데이터 활용

DynamoDB에 생성된 이벤트 데이터는 파이프라인을 거쳐 최종적으로 S3 버킷에 저장되어서 방송 다시보기를 위한 자막 생성과 통계 지표 수집에 활용돼요. 이 기능들은 데이터를 한 번에 많이 조회하고, 요청 횟수가 많지 않고, 아주 즉각적인 응답을 요구하지 않는다는 특징을 가져요. 따라서, S3에 데이터를 저장하고 Athena를 통해 조회﹒분석하는 것이 합리적이에요.

다만, 두 가지 목적은 각각 데이터에 접근하는 방식에 차이가 있어요. 방송 다시보기는 방송(LivestreamID) 단위로 채팅 데이터를 조회하는 반면, 통계 지표는 주로 시간 단위로 데이터를 요구하기 때문이에요. 그래서 각 기능을 효율적으로 운용하기 위해서는 각 특성에 맞게 데이터를 파티셔닝 할 필요가 있어요. 파티셔닝은 이벤트 스트리밍 과정에서 구현할 수 있으며, 어떤 서비스를 활용하는 지에 따라 구현 방식이 달라져요. 지금부터 더 자세히 살펴 볼게요.

이벤트 스토어 서비스 분석

다시 한 번 짚고 넘어 갈게요. 현재까지 결정된 사항들은 아래와 같아요.

  • 채팅 메시지는 DynamoDB에 Write 된다.
  • DynamoDB는 데이터 변경 이벤트를 발행하는 Outbox이다.
  • 데이터의 최종 저장 목적지는 S3 버킷이다.

채팅 데이터가 DynamoDB에 쓰이면 DynamoDB가 그 변경사항을 포착해서 이벤트를 발행하는 단계까지 왔어요. 그러면 이제 그 이벤트 레코드, 이벤트 스트림을 잘 활용해서 S3 버킷까지 전송해야 해요.

DynamoDB가 발행하는 데이터 변경 이벤트 활용을 위해 AWS가 제공하는 이벤트 스토어 서비스에는 DynamoDB Streams(이하 DDB Streams), Kinesis Data Streams(이하 KDS), Managed Streaming for Apache Kafka(이하 MSK) 3가지가 있어요. 그 중 DDB Streams와 KDS는 AWS 콘솔이나 커맨드를 활용해 DynamoDB 테이블과 내부적으로 바로 연결할 수 있지만, MSK는 DynamoDB와의 연결을 개발자가 직접 구현해야 한다는 차이가 있어요. 따라서 앞의 두 가지를 먼저 비교한 후 MSK와의 비교도 이어가 보도록 할게요.

DynamoDB Streams vs Kinesis Data Streams

DDB Streams와 KDS는 모두 DynamoDB가 포착한 테이블 항목 변경 이벤트를 스트림 레코드 형태로 받아 관리하는 이벤트 스토어 서비스에요. 앞서 말했듯, DynamoDB와 연결하는 과정을 AWS 내부에서 핸들링 해주기 때문에 개발자는 연결하는 옵션만 설정 해주면 DynamoDB에서 발생하는 항목 변경 이벤트를 스트림 레코드 형태로 받아 활용할 수 있어요. 이 레코드들은 스트림 내부에 샤딩되어 저장﹒관리 되어 활용 되다가 보존 기간이 지나면 파기되기 때문에 그보다 길게 저장하기 위해서는 별도의 처리가 필요해요. 이처럼, DDB Streams와 KDS는 이벤트 수신, 관리, 송신 프로세스를 모두 AWS 인프라 내부에서 처리하기 때문에 성능, 신뢰성, 확장성을 보장할 수 있어요.

반면, 두 서비스는 아래와 같이 기능의 차이가 있어요.

DynamoDB Streams와 Kinesis Data Streams 기능 비교

각 스트림 레코드에는 이벤트 종류(INSERT/MODIFY/REMOVE)와 변경된 항목의 값이 담겨 있는데, 데이터 형식에도 차이가 있어요. 우선 DDB Streams 레코드는 아래와 같이 생겼어요. 테이블의 스키마와 변경된 데이터가 JSON 형태로 그대로 전송되는 걸 확인할 수 있어요.

DynamoDB Streams Record

반면 KDS 레코드는 아래와 같이 생겼어요. JSON 형태지만 실제 변경된 항목의 값은 Records:Data 필드에 들어가요. DynamoDB에서 생성된 이벤트 레코드(바로 위의 형식)가 base-64 인코딩 되어 저장되고, KDS 안에서는 사용하지 않는다고 해요.

Kinesis Data Streams Record

Records 외의 나머지 필드들은 KDS 접근에 필요한 값이에요. KDS와 Kinesis Data Firehose를 함께 활용하면 KDS 내부에 직접 접근할 필요가 없어서 이 값들을 사용할 일은 없어요. 만약 Kinesis Data Firehose를 사용하지 않는다면 이 문서를 참고하여 클라이언트를 직접 개발할 수 있어요.

스트림 소비자

위에서 이런 저런 차이점을 살펴 봤지만, 사실 가장 결정적인 차이점은 스트림을 활용할 수 있는 소비자 어플리케이션의 종류에 있어요.

DDB Streams에는 AWS Lambda 혹은 개발자가 직접 작성한 커스텀 어플리케이션만 연결할 수 있어요. Lambda를 작성하고 DDB Streams에 Lambda Trigger를 설정하면 원하는 Lambda 함수를 호출할 수 있어요. 이렇게 하면 이벤트 발생부터 Lambda 호출까지의 프로세스는 AWS가 수행하기 때문에 신뢰할 수 있지만, Lambda 호출 이후의 프로세스는 보장되지 않아요. 또한, 커스텀 어플리케이션을 작성해서 직접 DDB Streams의 레코드를 처리하는 경우 역시 어플리케이션이 스트림 레코드를 수신하는 구간부터 그 이후 모든 단계에 오류와 병목이 생길 위험이 추가돼요.

AWS는 이미 내부적으로 안정성, 원자성, 확장성을 모두 고려해 인프라를 구성해 놓았을 뿐 아니라, 수 년간 클라우드 인프라 서비스를 제공하면서 성능을 발전시켜 왔어요. 따라서, 단지 이벤트 파이프라인을 만들고 싶은 이 개발자가 커스텀 어플리케이션 안에서 이 모든 프로세스를 직접 개발하는 것이 비용이 훨씬 많이 들 거에요.

반면, KDS에는 AWS Lambda, 커스텀 어플리케이션 뿐 아니라 AWS의 다른 데이터 프로세싱 서비스에도 바로 연결할 수 있어요. 이러한 AWS 서비스들은 AWS 내부적으로 연결﹒전송 프로세스가 처리되기 때문에 신뢰도 높은 이벤트 파이프라인을 구축하는 데에 유리해요. 사용할 수 있는 서비스는 Kinesis Data Analytics, Kinesis Data Firehose가 있어요.

Kinesis Data Streams + Kinesis Data Firehose → 👍🏼

저희는 Kinesis Data Streams가 Kinesis Data Firehose와 함께 연결하여 활용할 수 있다는 점에서 DDB Streams보다 특히 유용하다고 판단했어요. Kinesis Data Firehose(이하 KDF)는 다양한 소스로부터 대량의 스트리밍 데이터를 받아 캡쳐하고 변환해서 안전하게 전송해주는 Data Delivery 서비스에요. 특징은 Fully-Managed Auto-Scaling 서비스라 별도 관리가 거의 불필요하다는 점이에요. 또한, Lambda를 활용한 데이터 변환, S3 동적 파티셔닝, 암호화, 압축, 백업, Cloudwatch 등을 모두 옵션으로 제공하기 때문에 필요한 경우 편리하게 활용할 수 있어요.

KDF의 좋은 점을 더 구체적으로 살펴 볼게요. KDS에서 받은 스트림 레코드에서 특정 값만 뽑아서, 그 값에 따라 분류하여 S3에 저장해야 하는 상황이라 가정하고, KDF를 쓰지 않았을 때의 상황을 그려 봐요.

기본적으로 요구사항의 모든 내용, 즉 KDS에서 스트림 레코드를 받고, 데이터를 변환한 후, S3 버킷으로 전송하는 기능을 직접 구현해야 해요. 이러면 양쪽 End Node(KDS, S3 버킷)에 변화가 발생하면 우리의 코드와 프로토콜도 그에 따라 수정해야 한다는 점에서 세부사항 변경 비용이 더 커요. 또, 변동성이 큰 트래픽 속에서 성능을 최적화하기 위해 양쪽 End Node 연결 구간에 버퍼를 구현해야 할 것이고, 직접 작성하는 만큼 Point of Failure 범위가 넓어지기 때문에 파이프라인의 안정성을 보장하기 어려워질 거에요. 더 세부적으로 요구사항 하나하나를 생각해 볼게요.

Kinesis Data Firehose 사용 여부에 따른 기능 구현 차이

이런 점들을 종합하여 KDS와 KDF를 함께 사용하는 것이 팀 상황에 더 적합하다는 결론을 내렸어요. DDB Streams를 사용하거나 KDS만 사용했을 때는 개발자가 직접 구축해야 하는 부분이 많아지는데, 이를 충분히 잘 운용하기 위한 노력에 비해 효용이 적다고 판단했기 때문이에요.

지금까지 DDB Streams 보다는 KDS와 KDF를 사용해 파이프라인을 구축하는 것이 더 적절하다는 것을 확인했어요. 그럼 Managed Streaming for Apache Kafka와 비교했을 때는 언제 어떤 서비스를 활용하는 것이 더 좋을까요?

Kinesis Data Streams vs Managed Streaming for Apache Kafka

이벤트 스토어 구조

KDS와 MSK는 모두 이벤트 스트림을 처리하는 이벤트 스토어 서비스에요. 스트림에 레코드를 쓰는 생산자(Producer)와 레코드를 읽는 소비자(Consumer)가 있고, 각 액터가 모두 독립적으로 동작하면서 이벤트 스토어를 병렬적으로 활용해요.

두 서비스는 기본적으로 아래와 같이 기능의 차이가 있어요.

Kinesis Data Streams와 Managed Streaming for Apache Kafka 기능 비교

자유도와 운용 효율

표를 보면 알 수 있겠지만, 두 서비스의 가장 큰 차이점은 자유도에 있어요. KDS는 AWS에서 Fully-Manage 해주는 서비스다보니 명확한 제한 사항이 존재하는 반면, MSK는 Apache Kafka가 구동되는 인프라만 구성해주는 서비스라 제한 사항이 없어요. 따라서, KDS는 자유도가 낮기 때문에 세부적인 성능 튜닝이 불가능하다는 단점과 별도의 관리 없이 안전하게 Auto-Scaling이 가능하다는 장점을 동시에 가지게 돼요. 반면 MSK는 자유도가 높아 세부 성능 최적화가 가능해진다는 점을 장점으로 가지는 동시에, 이러한 튜닝을 위한 프로비저닝이 반드시 적절하게 선행되어야 한다는 특징을 가져요.

확장성

두 번째로 큰 차이점은 확장성에 있어요. 결론부터 말하면 MSK가 KDS보다 확장성이 뛰어난데, 이는 KDS와 MSK가 로드 밸런싱을 수행하는 방식이 다르기 때문에 발생하는 차이에요.

우선, KDS는 파티션 키가 동일한 요청은 무조건 동일한 Leader 노드의 샤드로 라우팅 해요. 파티션 키를 해싱한 값을 바탕으로 어떤 샤드로 요청을 보낼지 결정하기 때문이에요. 각 샤드는 스트림 전체와 별개로 처리율 제한을 갖는데, 동일한 샤드에 해당하는 요청은 이 처리율 제한 안에서 모두 처리되어야 해요. 그러다 보니 요청이 한 샤드에 몰리는 경우 Hot-Shard가 생기고 결국 throttle이 발생하는 문제가 생겨요.

따라서, 파티션 키는 KDS 로드 분산의 가장 중요한 포인트라고 할 수 있어요. 파티션 키가 적절하게 설계되지 않아 로드가 한 샤드에 집중되는 경우 문제가 발생할 수 있지만, 반대로 말하면 Access 패턴을 바탕으로 키를 잘 설계하면 대부분의 로드 밸런싱 문제를 해결할 수 있다고도 말할 수 있어요. 그렇기 때문에, 대부분의 튜닝은 AWS에 맡기고 적은 부분에 집중할 필요가 있는 경우 KDS를 선택하는 것이 효율적이에요.

출처 : The Power of Apache Kafka® Partitions

반면, MSK는 같은 토픽의 레코드인 경우에도 내부 로직에 따라 각기 다른 브로커 노드의 파티션으로 분산 저장해요. 토픽은 1개 이상의 파티션을 가지며, 각 파티션은 Kafka에 의해 브로커 노드에 골고루 분산돼요. 또한, 파티셔닝 규칙으로는 Key 해싱 뿐 아니라 Round-Robin이나 커스텀 로직을 적용할 수 있어요. 만약 어떤 Topic에 대한 요청이 들어오면 파티셔닝 규칙에 따라 파티션을 찾고, Kafka 클러스터에서 그 파티션 Leader를 갖고 있는 브로커 노드를 찾아요. 그러면 요청은 그 브로커 노드로 라우팅되고, 결과적으로 특정 토픽에 요청이 몰린다 해도 병목이 발생할 확률이 줄어들어요.

뿐만 아니라, MSK는 Replica 수에 따라 파티션을 복제하고 로드 밸런싱에 활용해요. Kafka는 복제된 파티션 중에서 하나의 Leader를 정하고 나머지를 Follower로 지정해요. 오직 Leader로 지정된 파티션만 실제 요청을 받아 I/O를 수행하고, Follower는 백업용으로 Leader의 변경 사항을 전달받기만 해요. 앞서 말했듯이 파티션들은 브로커 노드에 분산되는데, replica가 있는 경우 파티션을 더 골고루 분산시킬 수 있어요.

따라서, 파티셔닝 로직과 replica 수 등 파라미터를 잘 설정하는 경우, MSK의 로드 분산은 KDS에 비해 더 효율적으로 구성될 수 있어요. 만약 이러한 파라미터 튜닝에 대한 전문가가 있어 전략을 잘 설계할 수 있는 상황이라면 MSK를 선택하는 것이 더 합리적일 거에요.

안정성

마지막으로, 안정성 측면에서도 두 서비스를 유의미하게 비교할 수 있어요.

MSK는 우리가 이벤트를 받아 올 DynamoDB와는 별개로 단독 EC2 인스턴스에 구성돼요. 그렇기 때문에 DynamoDB Streams로부터 이벤트를 받아 Kafka를 구동 중인 EC2 인스턴스에 전달하는 과정을 별개의 Lambda 함수로 작성해서 Trigger 해야 해요. 이러한 별개의 과정을 구축하는 데에는 추가 비용이 발생할 뿐 아니라, Lambda 함수의 메시지 크기 제약 등으로 인해 병목이 발생할 위험이 추가돼요.

KDS는 앞서 말했듯 AWS Fully-Managed 서비스이기 때문에, 개발자가 관여하지 않아도 안정적인 연결 상태와 데이터 전송을 보장해요. 또한, AWS 가용 리전 3개에 걸쳐 Replica를 구성하여 동기식으로 복제할 뿐 아니라, 최대 365일 간 해당 데이터를 저장함으로써 데이터의 손실을 방지하고 있어요. 그렇기 때문에, 큰 비용을 지불하는 대신 안정성 보장 측면에서는 KDS가 MSK보다 뛰어나고 효율적이라 할 수 있어요.

파이프라인 설계

결론적으로, 우리는 Kinesis 기반의 AWS-Native 이벤트 파이프라인을 구축 하기로 결정했어요. 현재 우리 팀의 상황과 여러 해결책 후보들의 비용﹒효용을 비교했을 때, 직접 개발하는 것보다 인프라 성능과 안정성을 레버리지 하는 것이 유용성과 효용이 더 크다고 판단했어요.

전체 파이프라인은 위와 같이 구성되어 있으며, 파이프라인의 각 리소스가 수행하는 역할은 아래와 같아요.

  • DynamoDB : API로부터 채팅 데이터 수신 & 데이터 변경 이벤트 포착 & 발행
  • Kinesis Data Streams : 이벤트 스트림 저장
  • Kinesis Data Firehose : 이벤트 스트림 레코드 전송
  • Lambda : 레코드 형태 변환
  • S3 Bucket : 최종 저장소

구현

이제 각 리소스를 어떤 식으로 설정하고, 리소스 사이의 연결은 어떻게 구성해야 하는지에 대해 설명할게요. 아래 등장하는 AWS 명령어들은 AWS CLI 2를 설치해야 활용할 수 있어요.

DynamoDB

우선, 최초 채팅 데이터 소스인 DynamoDB 테이블은 이렇게 구성되어 있어요.

채팅 메시지 테이블 스키마

테이블의 Primary Key는 파티션 키와 정렬 키의 조합으로 구성했어요. 파티션 키는 문자열 타입의 LivestreamID, 정렬 키는 타임스탬프와 고유번호를 조합한 숫자 타입의 UID에요. DynamoDB는 파티션 키의 설계가 성능에 직접적인 영향을 미치기 때문에 데이터 접근 패턴에 따라 적절히 설계하는 것이 중요해요. 우리 서비스의 경우, 대부분의 데이터 접근과 질의가 라이브 스트리밍 방송 단위로 이루어질 것을 고려해서 LivestreamID를 파티션 키로 지정했어요.

또한, 빠르게 성장할 신규 서비스를 개발하는 것이기에, 트래픽을 예측해 프로비저닝 값을 지정하는 대신 ON_DEMAND 모드로 DynamoDB를 운영하기로 했어요. 초기에는 DynamoDB가 Auto-Scaling을 수행해 트래픽의 변동에 유연하고 안정적으로 대응하도록 하고, 추후 트래픽 예측을 수행할 수 있는 환경이 되면 Provisioned 모드로 운영할 수 있어요.

S3

앞서 말했듯이, 우리 서비스는 데이터 접근과 질의가 LivestreamID와 시간 단위로 모두 이루어질 것으로 예상돼요. 그래서 S3 버킷의 파티셔닝 역시 두 가지 기준으로 각각 수행해서 데이터 접근 성능을 향상할 거에요.

LivestreamID 기준 동적 파티셔닝 구조
시간 기준 동적 파티셔닝 구조

S3 버킷은 AWS 리전 안에서 고유한 이름을 부여해 생성하면 돼요.

aws s3api create-bucket \
--bucket <YOUR_BUCKET_NAME> \
--region ap-northeast-2 \
--create-bucket-configuration \
LocationConstraint=ap-northeast-2

각각의 방식으로 파티셔닝된 데이터는 동일한 S3 버킷에 저장할 수도 있고 서로 다른 버킷에 저장할 수도 있어요. 단, 동적 파티셔닝을 수행하고 해당 경로로 데이터를 전송하는 주체는 Kinesis Data Firehose이기 때문에 Kinesis Data Firehose는 반드시 각각 별도로 구축해야 해요.

Kinesis Data Streams

DynamoDB에서 발생한 항목 변경 이벤트를 Kinesis Data Streams가 받을 수 있도록 설정해 볼게요. 우선 아래 명령어를 통해 Kinesis Data Streams를 생성해요. 역시 트래픽 프로비저닝보다는 Auto-Scaling을 수행하기 위해 ON_DEMAND 모드를 설정했어요.

aws kinesis create-stream \
--stream-name <YOUR_STREAM_NAME> \
--stream-mode-details \
StreamMode=ON_DEMAND

생성된 Kinesis Data Streams를 우리의 데이터 소스인 DynamoDB 테이블과 연결해요.

aws dynamodb enable-kinesis-streaming-destination \
--table-name <DDB_TABLE_NAME> \
--stream-arn <YOUR_STREAM_ARN>

이렇게 하면 Kinesis Data Streams 구축은 끝이에요. 아주 쉽죠?

여기까지 아주 간단하게 DynamoDB 테이블에서 데이터 변경 이벤트를 포착해 Kinesis Data Streams에 스트림 형태로 전달하도록 설정했어요. 이제 스트림 데이터를 원하는 형태로 바꿔서 S3 버킷까지 전송하는 구간을 구축해 볼게요.

Kinesis Data Firehose

Kinesis Data Firehose의 기본적인 기능은 데이터를 전송(Delivery)하는 것이에요. 우리는 그 전송에 더해 데이터를 원하는 형태로 변환하고 싶기 때문에 몇 가지 값을 추가로 설정해야 해요. 아까 위에서 2가지 형태로 데이터를 파티셔닝하기 위해서는 각각의 Firehose를 별도로 구축해야 한다고 언급했어요. 우선 LivestreamID를 파티션 키로 하는 동적 파티셔닝 구현을 중심으로 살펴본 후에, 시간 기반 파티셔닝 구현에 대해 알아 볼게요.

Processor

Kinesis Data Firehose의 스트림 레코드를 다루는 프로세서 타입은 Lambda를 포함해서 총 4가지에요.

  • AppendDelimiterToRecord : 레코드 사이에 NewLine (\n) 문자를 자동으로 추가하는 기능이에요. S3 오브젝트 안의 레코드 사이에 이 NewLine 문자가 추가되기 때문에 S3 오브젝트 파싱에 유용해요.
  • Lambda : 데이터 변형을 수행하는 Lambda를 지정하는 기능이에요. 또한, 동적 파티셔닝의 파티션 키를 생성하기 위해 partitionKeyFromLambda와 함께 쓰여요. 한 KDF에 Lambda는 딱 한 개만 지정할 수 있어요.
  • MetadataExtraction : 동적 파티셔닝의 파티션 키를 Inline Parsing 방식으로 생성하는 기능이에요. JSON 오브젝트에 JQ 파서가 바로 쿼리하는 방식으로 파티션 키가 추출되고, partitionKeyFromQuery와 함께 사용돼요. 참고로, Inline Parsing은 KDS가 소스인 경우 사용할 수 없고, DirectPut 소스인 경우에만 사용할 수 있어요. KDS는 레코드를 base-64 인코딩된 형태로 사용해서 바로 JQ 파서가 접근할 수 없기 때문이에요.
  • RecordDeAggregation : 레코드를 분리(deaggregate)하는 기능이에요. 파라미터로 분리자를 받으면 JSON 오브젝트가 분리돼요.

이 중에 우리가 이번에 프로세서로 적용할 기능은 Lambda 하나에요. 이유는 2가지예요. 우선, 레코드의 형태를 상세하게 커스터마이징 할 수 있는 기능은 Lambda 뿐이에요. 다른 3가지 프로세서는 특정한 기능을 주어진 형태로만 활용할 수 있는 반면, Lambda 안에서는 레코드 필드에 직접 접근해서 수정할 수도 있어요. 또한, AppendDelimiterToRecord, RecordDeAggregation은 Lambda 안에서 한 번에 수행할 수 있는 기능이에요. 굳이 한 단계의 프로세싱을 추가할 만큼 덩치가 큰 프로세스가 아니기 때문에 Lambda 안에서 NewLine 문자를 추가하기로 결정했어요.

정리하자면, 우리의 Lambda는 DynamoDB 이벤트 레코드를 커스텀 형태로 변환하고, 파티션 키를 Firehose가 동적 파티셔닝에 활용할 수 있도록 추출하고, 레코드마다 NewLine 구분자를 추가하는 3가지 역할을 수행할 거에요.

데이터 형태 변환

이제 먼저 커스텀 형태로 변환하는 과정에 대해 자세히 살펴 볼게요. Kinesis Data Firehose는 이벤트를 아래와 같은 포맷의 JSON으로 주고 받아요.

Kinesis Firehose Event

records 내부 항목의 data 필드가 DynamoDB 이벤트 레코드에요. DynamoDB 이벤트 레코드는 위에서 봤던 형태와 동일한데, base-64 인코딩되어 단일 문자열로 data 필드에 담겨요. Lambda는 이 이벤트 레코드의 data에 접근해서 필요한 값만 취해 아래와 같은 형태로 데이터를 변환해야 해요.

Final data format

동적 파티셔닝 키 추출

그 다음으로, Lambda는 데이터 형태 변환 말고 동적 파티셔닝을 위한 파티션 키도 추출해야 해요. Firehose는 partitionKeyFromLambda 옵션을 통해 Lambda가 뽑아 낸 파티션 키를 받아서 동적 파티셔닝을 수행하는데, 그러기 위해 아래와 같은 Kinesis Firehose Response 데이터의 partitionKeys 필드에 접근해요.

Kinesis Firehose Response

그렇기 때문에, Lambda는 최종 반환 값으로 Kinesis Firehose Response를 만들어 내야 하고, metadatapartitionKeys에 추출한 값을 추가해야 해요.

NewLine 구분자 추가

마지막으로, NewLine 구분자는 Lambda 코드 안에서 추가하도록 구현할 수 있어요. 어차피 코드 안에서 레코드 단위로 데이터를 변형하기 때문에, 레코드마다 NewLine 문자를 추가하면 별도의 AppendDelimiterToRecord 옵션 없이 NewLine을 붙일 수 있어요. 아래 코드에서 Line 72 내용에 해당돼요.

참고로, 만약 Athena를 활용해 JSON 오브젝트를 대상으로 쿼리를 수행할 계획이 있다면 NewLine을 꼭 추가해야 해요. Athena 쿼리의 JSON 직렬화/역직렬화 도구인 Hive JSON SerDe는 레코드 구분자로 NewLine 문자를 사용하고 다른 문자로 변경할 수 없기 때문이에요.

Lambda 구현

데이터 변환, 파티션 키 추출, NewLine 구분자 추가 이렇게 3가지 기능을 수행하는 코드를 아래와 같이 작성했어요. 우리 팀은 Go를 기본 언어로 사용하고 있기 때문에 AWS SDK Go와 기타 라이브러리를 활용해서 원하는 기능을 구현했어요.

이제 Lambda가 구현되었으니, 컴파일된 바이너리 파일을 zip으로 압축해서 아래의 커맨드와 옵션을 부여해 Lambda를 배포해요. 그러면 KDF에서 Lambda를 호출해서 데이터 변형을 수행할 수 있게 돼요.

aws lambda create-function \
--zip-file <YOUR_LAMBDA_ZIP_FILE_PATH> \
--cli-input-json <LAMBDA_DEPLOY_OPTION_JSON_FILE_PATH>
Lambda deploy options

이 때 Timeout과 MemorySize를 파라미터로 설정할 필요가 있어요. 그 이유는 Lambda가 Synchronous invocation mode로 호출되기 때문이에요. Synchronous invocation mode는 Lambda를 호출한 후 응답이 올 때까지 Block 되어 대기하는 방식이에요. 대기하다가 시간이 초과되면 Timeout 에러가 발생하기 때문에 데이터 프로세싱을 완수할 충분한 시간을 파라미터 값으로 설정할 필요가 있어요. 이에 대해 AWS는 공식적으로 1분 이상을 추천하고 있으며, 상한값은 5분으로 제한하고 있어요.¹⁰ 또한, 데이터 프로세싱 과정에서 MemorySize를 동적으로 할당하지 않기 때문에 적절한 메모리 용량을 설정할 필요도 있어요.

참고로, Synchronous invocation mode는 6MB의 payload 크기 제한을 가져요. 애초에 KDF에서 Lambda를 호출할 때 Buffer 최대 사이즈를 3MB로 가지기 때문에 대부분의 경우 문제가 될 가능성이 낮아요. 하지만, Lambda에서 데이터를 많이 추가하는 경우 6MB가 초과되어 문제가 발생할 수 있기 때문에, 염두에 두고 데이터 변형 기능을 구현할 필요가 있어요.

파라미터 설정

이제 거의 다 왔어요! Kinesis Data Streams과 S3 사이의 전송을 책임져 줄 Kinesis Data Firehose를 생성할 차례에요. 아래와 같은 AWS CLI 명령어를 수행하면 돼요. KDS, Lambda, S3 연결에 관한 설정값은 아래에서 조금 더 살펴 볼게요.

aws firehose create-delivery-stream \
--cli-input-json <YOUR_FIREHOSE_OPTION_PATH>
Kinesis Data Firehose create options

ExtendedS3DestinationConfiguration는 전송 목적지로 S3를 지정하는 필드에요. 유의해서 설정할 값들은 버퍼링과 Lambda 호출, 그리고 동적 파티셔닝에 관한 설정이에요.

동적 파티셔닝을 활성화하는 경우, KDF는 런타임에 파티션 키마다 버퍼를 생성하고 독립적으로 관리해요. 이 버퍼링에 대한 설정은 BufferingHints 필드에서 지정해요. 단, 이름에서도 알 수 있듯이, 이 값들은 힌트에 불과해요. 정확하게 이 값만큼 버퍼를 생성하는 것이 아니고, 실제 버퍼 상태가 이 값에 도달하면 그 때 S3 Write를 수행한다는 뜻이에요. 만약 이 값이 작다면 더 자주 Write가 수행돼서 지연 시간이 짧아지지만 그만큼 비용은 더 많이 지불해야 한다는 점을 유의해야 해요. AWS는 최적화를 위해 128MB, 300초로 설정하는 것을 추천하고 있어요.

Lambda 호출은 ProcessingConfiguration 안에서 설정해요. NumberOfRetires는 Lambda 수행이 모종의 이유로 실패한 경우 재시도하는 최대 시간이에요. BufferSizeInMBs, BufferIntervalInSeconds는 Firehose가 Lambda 호출을 최적화하기 위해 설정하는 버퍼링 값이에요. 앞서 봤던 또 다른 버퍼링 값들은 Firehose가 S3를 호출할 때의 값이고, 지금 설정하는 값은 Firehose가 Lambda를 호출할 때 활용하는 값이에요. 기본값은 각각 3MB, 60초로 되어 있고, 최소, 최대 범위는 여기서 확인하면 돼요. 둘 중 하나의 값에 도달하면 Lambda를 호출해요. 참고로, Lambda 프로세싱에서 데이터를 많이 확장(expand)시키는 경우 BufferSizeInMBs를 작게 가져가야 Lambda의 6MB payload 제한에 걸리지 않고 프로세싱을 수행할 수 있어요. 이러한 점들을 바탕으로, 다루는 레코드의 특성과 수행하는 작업에 따라 적합한 값을 설정할 필요가 있어요.

참고로, Lambda 인스턴스는 KDF 샤드마다 1개씩 할당돼요. 즉, 데이터는 파티션 키에 따라 각 버퍼에 버퍼링되고, 동일한 버퍼에 있는 데이터는 동일한 Lambda 인스턴스를 호출하며 FIFO 순서로 처리돼요. 뿐만 아니라, 여러 Lambda 인스턴스는 비동기로 호출되기 때문에 한 시점에 처리되는 데이터의 파티션 키 종류만큼 Lambda 인스턴스가 생성돼요. AWS는 Lambda 동시 호출 인스턴스 수를 기본적으로 1,000개로 제한하며 이 인스턴스 풀은 AWS 계정 내의 모든 Lambda가 공유해요. 그렇기 때문에, 이 수를 넘겨 Lambda 인스턴스를 호출하면 Lambda.InvokeLimitExceeded 에러가 발생해요. 에러가 발생하면 KDF는 NumberOfRetries 값만큼 성공할 때까지 호출을 재시도하지만, throttle이 발생하고 데이터가 계속 들어오면 끝까지 호출에 실패할 가능성이 있어요. 따라서 동시 호출 제한을 예상 범위보다 크게 늘리거나¹¹, 재시도까지 실패한 요청을 Dead-letter queues를 활용해 처리할 필요가 있어요.¹²

동적 파티셔닝은 Prefix와 DynamicPartitioningConfiguration에서 설정해요.¹³ 아까 위에서 Lambda의 동적 파티셔닝 키 추출에 대해 설명했어요. 이 partitionKeyFromLambda 옵션을 Prefix 옵션에서 사용해요. 예를 들어서 보여 드릴게요.

"Prefix": "Livestreams/LivestreamID=!{partitionKeyFromLambda:LivestreamID}/"

이런 식으로 옵션을 제공하면, metadatapartitionKeys에서 LivestreamID라는 키를 찾아와 동적 파티셔닝을 수행해요. 실제로 S3 버킷에는 이런 형태로 S3 오브젝트가 저장 된답니다.

오브젝트 동적 파티셔닝 예시

참고로 동적 파티셔닝 옵션 사용 시 Prefix 뿐 아니라 ErrorOutputPrefix도 반드시 설정하도록 AWS는 강제하고 있어요.

"ErrorOutputPrefix": "hahaha-error"

이렇게 에러가 발생한 오브젝트를 모아 저장할 디렉토리를 설정하면, 아래와 같이 프로세싱 과정에서 발생한 오류 값들이 종류에 따라 자동으로 분류되어 저장되는 것을 확인할 수 있어요.

에러 동적 파티셔닝 예시

Timestamp 기반 파티셔닝

시간 기반 파티셔닝은 동적 파티셔닝 기능을 활용하지 않고 저장할 경로(Prefix)만 지정해도 적용할 수 있어요. 더 정확히 말하자면, Firehose가 기본적으로 제공하는 경로 표현식 namespacetimestamp를 활용하면 시간 기준으로 오브젝트의 저장 경로를 지정할 수 있어요.¹⁴ 하지만 데이터 변형 없이 Prefix만 지정하면 데이터가 DynamoDB Streams 형태로 NewLine 구분 없이 저장돼요. 그렇기 때문에 Lambda를 구현해 데이터 변형을 수행할 필요가 있어요. 위에서 구현한 Lambda에서 파티션 키 추출 부분(Line 71)을 제거하고 나머지 기능은 동일하게 구현하면 돼요.

또한, Firehose 생성 파라미터 옵션 역시 위의 설정값과 비슷하게 지정하면 돼요. 유의해야 할 부분은 3가지 있어요.

  • 시간 기준 파티셔닝이 수행되도록 Prefix를 아래와 설정해야 해요.
"Prefix": "Livestreams/!{timestamp:yyyy/MM/dd/HH/mm/ss/}"
  • 동적 파티셔닝을 사용하지 않을 것이기 때문에 DynamicPartitioningConfiguration을 제거해야 해요.
  • 레코드 소스인 Kinesis Data Streams는 위와 동일하게, S3 버킷과 Lambda는 본인의 상황에 맞게 설정하면 돼요.

정상적으로 파티셔닝이 수행된 경우 아래와 같이 레코드 오브젝트가 저장돼요.

timestamp 기반 파티셔닝 예시

참고로 이 timestamp는 UTC 기준으로 설정되기 때문에, Local 기준 시간으로 파티셔닝을 수행하고 싶다면 동적 파티셔닝을 구현해야 해요. 위에서 LivestreamID를 동적 파티셔닝에 활용한 것처럼 Local 기준 시간 값들을 partitionKeyFromLambda를 통해 접근할 수 있도록 만들면 돼요.

결과

AWS-Native Event Pipeline

이렇게 DynamoDB부터 S3 버킷까지, 채팅 메시지를 이벤트 기반으로 저장하는 파이프라인을 구축했어요. 다시 한 번 저의 목적을 상기해보자면, 이 파이프라인은 라이브 스트리밍에서 발생하는 채팅 메시지를 원하는 형태로 저장해서, 추후 확장 기능 구현에 데이터를 잘 활용할 수 있도록 만들기 위해 구축 되었어요. 이 목적이 잘 달성되기 위해서는 기본적으로 여러 라이브 스트리밍에서 동시 다발적으로 발생하는 채팅 데이터가 S3 버킷에 안정적으로 저장되어야 해요. 이 결과를 두 가지 방법으로 확인을 해볼게요.

Athena S3 조회

데이터 샘플을 DynamoDB에 생성하고, 이 데이터 값이 Athena 쿼리를 통해서 정상적으로 조회되는지 확인할게요. LivestreamID 기반으로 파티셔닝한 버킷에 대해 테스트를 수행해 볼게요.

우선 동일한 LivestreamID를 가진 25개의 아이템을 데이터 샘플로서 테이블에 추가해요. 같은 LivestreamID를 부여한 것은 간단한 쿼리 하나로 데이터를 조회하기 위한 것이라서 다른 아이템을 넣어도 상관 없어요. 조회하는 SQL을 다르게 작성해서 생성한 아이템의 값이 S3까지 잘 전송됐는지 여부만 확인하면 돼요.

DynamoDB 테이블에서 조회한 채팅 아이템

그리고 위 DynamoDB 테이블의 스키마를 그대로 읽어올 수 있도록 Athena 테이블을 아래와 같이 생성해요. 각 필드의 타입에 맞게 컬럼을 정의해요. 저는 UID만 DynamoDB의 Number 타입이니까 BIGINT 타입으로 지정하고, 나머지는 STRING 타입을 지정했어요.

ROW FORMAT SERDE¹⁵는 Athena가 레코드를 해석하는 직렬화/역직렬화 포맷이에요. JsonSerDe 이름에서 알 수 있듯이 위 쿼리에 적힌 옵션은 JSON 오브젝트를 직렬화/역직렬화 하는 데에 쓰여요. 위에서 작성한 Lambda를 보면 데이터 형태를 변환하고 payload에 담을 때 JSON으로 직렬화하고 있어요. 이러한 JSON 오브젝트는 위의 옵션 규칙에 따라 해석되어 Athena에서 조회할 수 있게 돼요.

LOCATION에는 조회할 S3 버킷과 경로를 지정하면 돼요. 지정된 경로와 모든 하위 디렉토리를 재귀적으로 탐색하기 때문에 탐색하고 싶은 대상의 최상위 디렉토리만 지정하면 돼요. Firehose 옵션의 Prefix 값에 최상위 디렉토리를 두고 그 안에 파티션 값에 따라 파티셔닝을 수행하면, 그 최상위 디렉토리만 스캔하면 되기 때문에 간편해요.

참고로 Athena는 쿼리를 수행하다가 테이블 형식에 맞지 않는 레코드를 만나면 에러를 반환하고 쿼리 수행을 중단해요. 그래서 에러가 발생하는 특정 하위 오브젝트를 제외하고 쿼리를 수행하고 싶은 상황이 발생할 수 있어요. 하지만 아쉽게도 특정 하위 오브젝트만 제외하는 옵션은 없어서(!) S3 파티셔닝을 다시 수행해야 해요. 사실 애초에 S3 버킷의 동적 파티셔닝을 활용한다는 건 S3 버킷의 데이터를 적극적으로 잘 활용하겠다는 의미겠죠. 그러니 동적 파티셔닝을 적용하기 전에 데이터 활용 요구사항과 패턴을 예측하고 그걸 바탕으로 파티셔닝 구조를 설계하는 것이 중요해요.

생성된 Athena 테이블에서는 이런 저런 쿼리로 이전에 생성한 데이터가 잘 전송됐는지 확인할 수 있어요. 결과값이 25개인 것을 보니 정상적으로 잘 처리됐네요.

Athena에서 조회한 채팅 아이템

이로써 파이프라인은 올바른 데이터가 주어졌을 때 데이터 변환과 전송, 파티셔닝을 수행하고 데이터를 S3에 정상적으로 저장한다는 것을 확인했어요!

1K TPS 테스트

라이브 스트리밍의 채팅 데이터는 동시 다발적으로 빠르게 생산된다는 특징이 있어요. 위의 테스트는 단순 데이터의 형식만 확인한 거라서 실제 프로덕션 환경에서 이 파이프라인이 잘 작동할지 보장하는 데에는 한계가 있어요. 그래서 동시 다발적인 요청에 대해 Auto-Scaling이 수행되고 파이프라인이 정상적으로 동작하는지 확인해볼 거에요.

Test Flow

테스트는 DynamoDB가 데이터 변경 이벤트를 캡처해 파이프라인에 이벤트 데이터를 전달해주고, S3까지 안전하게 도착해야 성공이에요. 처리 후 저장된 데이터는 즉각적으로 사용되기 보다는 필요해지면 사용되는 특성을 갖기 때문에 Delay와 Latency는 큰 문제가 되지 않아요. 대신 Reliability, Availability가 더 중요한 지표로 작용하니까, 이 테스트에서도 이 지표들을 살펴 볼게요.

테스트는 초당 1,000개의 DynamoDB BatchWrite 요청을 파이프라인이 정상적으로 처리하는지 확인하는 방식으로 진행해요. 각 BatchWrite 요청은 최대 25개의 아이템을 쓸 수 있어서 요청마다 25개의 아이템을 생성해 DynamoDB에 BatchWrite 하도록 할 거에요. 그래서 초당 DynamoDB에 쓰이는 아이템은 25,000개에요. 1,000개의 로드는 10개의 LivestreamID에 분산시켰어요. 즉, 10개의 LivestreamID마다 각각 100개의 요청이 전송되고, 2500개의 아이템이 생성되는 거에요.

1K 로드를 발생시키기 위해 Apache JMeter를 사용해요. 또한 클라이언트 요청을 받아 임의의 채팅 데이터를 생성해 DynamoDB에 BatchWrite하는 API 서버가 있어요. API 서버는 로컬 환경에 독립적으로 EC2에서 운용되어서 최대 처리량을 처리할 수 있어요.

1K TPS 수행 중

테스트 수행 결과는 Throttle 발생 현황과 프로세싱 결과를 보여 주는 KDS, KDF 콘솔 대시보드를 통해 확인했어요.

KDS → KDF 전송
KDF → Lambda 호출
KDF → S3 전송

위 그래프들은 Kinesis Data Streams와 Kinesis Data Firehose 사이의 스트리밍과 Kinesis Data Firehose의 Lambda 호출, 그리고 S3 전송 성공률이 모두 100%라는 것을 나타내요. 이를 통해 DynamoDB부터 Kinesis Data Streams, Kinesis Data Firehose, Lambda, S3까지 어떤 구간에도 throttle이 발생하지 않고 전송에 성공했다는 것을 확인했어요. 이는 파이프라인의 모든 구간에 Auto-Scaling이 적용되어 있어 대량의 트래픽도 유연하게 수용하며, 버퍼링도 정상적으로 수행되어서 데이터의 누락과 병목이 발생하지 않았다는 것을 의미해요.

또한, Athena에서 해당 데이터를 조회했을 때 파티션 키마다 25,000개의 레코드가 존재한다는 것을 확인할 수 있었어요. 즉, 모든 요청과 아이템 쓰기 작업이 정상적으로 처리되었고, 파이프라인을 안정적으로 통과해 S3에 원하는 형태로 적재됐다는 거에요.

이로써 우리의 이벤트 데이터 파이프라인은 대용량 채팅 트래픽에도 안정적으로 Auto-Scaling을 수행하며 데이터의 무결성을 보장한다는 것을 확인했어요.

마무리

지금까지 AWS-Native 이벤트 데이터 파이프라인을 구축하고, 데이터가 정상적으로 목적지까지 원하는 형태로 전송이 되는지 확인해보았어요. 이러한 데이터 파이프라인을 활용하면 데이터베이스 성능에 독립적인 어플리케이션을 구축할 수 있을 뿐 아니라, 데이터를 자동으로 원하는 형태로 저장해 다양한 목적에 활용할 수 있을 것으로 기대돼요.

당근마켓 라이브 스트리밍 팀은 안정적인 라이브 스트리밍을 위한 플랫폼을 개발하는 조직이에요. 당근마켓 유저들이 실시간으로 소통하며 연결될 수 있도록 도와주는 플랫폼을 제공하기 위해 다양한 기능을 개발하고 있어요.

라이브 스트리밍 팀, 그리고 당근마켓 팀과 함께 새로운 문제에 도전하고 싶으신 분은 언제든 당근마켓 채용 사이트를 찾아주세요.

감사합니다!

참고하면 좋은 글

[1] Point-in-time recovery (PITR) for Amazon DynamoDB
https://aws.amazon.com/dynamodb/pitr/

[2] Read Consistency — Amazon DynamoDB
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/HowItWorks.ReadConsistency.html

[3] Best Practices for Querying and Scanning Data
https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/bp-query-scan.html

[4] Transactional Outbox pattern with Azure Cosmos DB
https://docs.microsoft.com/en-us/azure/architecture/best-practices/transactional-outbox-cosmos

[5] Reading Data from Amazon Kinesis Data Streams
https://docs.aws.amazon.com/streams/latest/dev/building-consumers.html

[6] Installing or updating the latest version of the AWS CLI
https://docs.aws.amazon.com/cli/latest/userguide/getting-started-install.html

[7] Creating partitioning keys with inline parsing
https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html#dynamic-partitioning-partitioning-keys

[8] Dynamic partitioning of aggregated data
https://docs.aws.amazon.com/firehose/latest/dev/dynamic-partitioning.html#dynamic-partitioning-multirecord-deaggergation

[9] Hive JSON SerDe
https://docs.aws.amazon.com/athena/latest/ug/json-serde.html#hive-json-serde

[10] Duration of a Lambda Invocation
https://docs.aws.amazon.com/firehose/latest/dev/data-transformation.html

[11] Lambda quotas
https://docs.aws.amazon.com/lambda/latest/dg/gettingstarted-limits.html

[12] Dead-letter queues
https://docs.aws.amazon.com/lambda/latest/dg/invocation-async.html#invocation-dlq

[13] Custom Prefixes for Amazon S3 Objects
https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html

[14] The timestamp namespace
https://docs.aws.amazon.com/firehose/latest/dev/s3-prefixes.html#timestamp-namespace

[15] Using a SerDe
https://docs.aws.amazon.com/athena/latest/ug/serde-about.html

--

--

Jules
당근 테크 블로그

라이브 스트리밍 서버 개발 @ 당근마켓