Do you know DynamoDB Stream

안녕하세요. 빙글에서 백엔드 개발을 맡고 있는 권세중입니다.

요즘 많은 분들이 Serverless에 입문함과 동시에 DynamoDB를 접하게 되는데요. DynamoDB의 가장 신기하고 멋진 기능이지만, 잘 다뤄지지 않는 기능이 하나 있으니, 바로 DynamoDB Stream입니다. 아마 많은 분들이 이런 기능이 있다는 것도 모르고 스쳐 지나가지 않았을까 싶네요. 빙글에서는 초기부터 DynamoDB Stream을 통해 많은 기능을 구현해 왔는데요. 오늘은 그 경험을 기반으로, DynamoDB Stream에 대한 전반적인 소개와, 왜 DynamoDB Stream이 멋진 기능인지 이야기 해보려고 합니다.

Data Change Capture의 필요성

서비스를 만들다 보면, “data record의 변화를 안정적이고, 스케일 가능하게 Capture” 하는게 꼭 필요해집니다. 쉽게는 Like를 찍을때마다 likes_count += 1 부터, 어렵게는 wiki에서 문서의 변경사항을 정리해서 이메일로 보내달라던가요.
이런 부분을 우리는 흔히 Application Layer에서 처리하곤 합니다. 아마 이런 코드 익숙하시겠죠.

1: Like.create(user_id, card_id)
2: card = Card.find(card_id)
3: card.likes_count += 1
4: card.save()

이런 코드를 큰 스케일에서 돌려보신분들은 아시겠지만, 이런 로직은 매우 쉽게 고장납니다. 이 예시만 해도,

  1. L1 은 성공했는데 L2에서 에러나면?
  2. 위 코드가 서로 다른 Thread에서 동시에 여러번 실행되면? (L3때문에 Like count가 잘못되게 되죠)
  3. L4 가 실패하면? L1을 Rollback 할껀가?

이런 문제들에 대한 까다롭지만 가장 확실한 대답중 하나가 바로 DynamoDB Stream 이라고 할수 있겠습니다. 빙글에서는 이런류의 business logic에 대해 적극적으로 DynamoDB Stream을 적용했고, 결과적으로 매우 만족하고 있습니다 :)

What is DynamoDB Stream?

DynamoDB Stream 이란 DynamoDB Table 의 변경사항 (INSERT, DELETE, MODIFY) 에 대한 정보들을 Stream 형태로 처리할 수 있도록 해주는 기능입니다. Mysql같은 RDS에 익숙하신 분들이라면, Replication에 자주 쓰이는 binlog 기능을 생각하시면 됩니다. 그것보다 데이터가 훨씬 이해하기 쉽다는 것만 빼면요.
해당 기능을 enable 하게 되면 그 시점부터 해당 Table의 변경사항이 Queue 에 쌓이기 시작합니다. 따로 dequeue해서 처리해주지 않으면, 이벤트가 발생한지 24시간 뒤에 해당 이벤트는 삭제되게 됩니다. random access로 stream 데이터를 지운다거나 변경하는 행위는 불가능하며, 반드시 dequeue를 통해 처리해야 합니다.

재밌는 예외상황이 하나 있는데, 아주 짧은 시간 내에 서로를 상쇄하는 event를 발생 시키는 경우입니다. 예를들자면 record를 insert하자마자 바로 다시 delete하는 경우 말입니다. (혹은 같은 record를 아주 짧은 시간내에 두번 modify한다던지요)

이 경우, DynamoDB Stream은 자동적으로 서로 상쇄 가능한 이벤트를 삭제해 줍니다. Insert Delete의 경우 두 이벤트를 모두 보내지 않고, 연속으로 Modify 하는경우는 마지막 Modify Event만 오게 되죠.

좀 더 자세한 작동원리와 주의사항은 DynamoDB Stream를 참고하시면 되겠습니다.

데이터 처리

그럼 Queue에 쌓인 data는 어떻게 dequeue할까요? 빙글에서 주로 사용하는 방법은 Lambda Trigger입니다. DynamoDB Stream을 Lambda에 연결하면, 주기적으로 DynamoDB Stream에서 Event를 Dequeue하여 Lambda에 넘겨주는 식이죠.
(이때, Lambda가 Error를 내거나 하여 처리가 실패하면, DynamoDB Stream은 Dequeue된 event를 다시 queue에 넣고 다음 Lambda로 재시도 해줍니다).

EC2등으로 처리하는 infra부터 직접 구축하는것도 가능합니다. DynamoDB Stream은 내부적으로 Kinesis Stream과 매우 유사한데, (사실 실제로는 똑같다고 봐도 무방합니다.) 그러니 Kinesis Client Library 를 사용하여 직접 Stream에서 dequeue 하는걸 만드시면 되겠죠.

오늘은 이 둘 중에서도, 저희가 많이 경험한 Lambda를 사용하여 처리하는 방법에 대해 알아보겠습니다.

Let’s use DynamoDB Stream

DynamoDB Stream 이 뭔지 알아봤으니 DynamoDB Stream 을 사용하는 방법에 대해 간략하게 알아보도록 하겠습니다. 예제에서는 AWS Lambda, Node.js, Typescript, Serverless framework 을 사용했습니다.

일단 Stream data 를 쌓기 위해서 Stream 기능을 활성화시키는게 우선이겠죠? DynamoDB Table Overview 탭에서 Manage Stream 버튼을 누르시면 아래와 같은 팝업이 나오게 됩니다.

쌓을 Stream data 종류를 선택하는 건데요. 각 Type 별로 쌓이는 데이터의 타입이 조금씩 달라지게 됩니다. 문서 참고는 RecordsType 을 참고해주시면 됩니다. 문서에서 Response Syntax 에 정의 된 Object 구조를 보시면 Keys, NewImage, OldImage 가 있는데요. View type 에 따라 해당 attributes 들이 내려오는지 아닌지가 결정됩니다.

  • Keys only — Keys
  • New image — Keys, NewImage
  • Old image — Keys, OldImage
  • New and old images — Keys, NewImage, OldImage

이때 Keys는 DynamoDB의 “키값" 만 주는것을 의미합니다. HashKey / SortKey만 Event로 주는거죠. 반대로, Image는 record 전체를 주는것을 의미합니다.
차이는 Record Size입니다. DynamoDB는 record 한개에 최대 400kb 까지 넣을수 있는 document storage이기 때문에, image를 전부 보내면 Stream에서 데이터 처리를 너무 많이 해야하는 경우가 생길수 있습니다. 이를 피하기 위해, Key만 주고 image에 접근해야하는 경우에만 직접 DynamoDB에 다시 Query해서 사용할수 있도록 해주는거죠.

여기서 적절한 type 을 설정한 다음에 Enable 해주시면 아래와 같이 ARN 주소가 나오게 됩니다. 이 ARN 주소는 Stream data 를 받는 쪽에서 사용하게 됩니다. 저는 NewImage and OldImage 로 예제를 진행해보도록 하겠습니다.

AWS Console 에서 설정해주는 건 이게 끝입니다. 이제 코드로 넘어가도록 하겠습니다.

Serverless.yml setting

Serverless framework 는 간단하게 말하면 Lambda function 생성, 설정, 기타등등의 것들을 CLI 에서 처리하게 해주는 npm library 입니다. 자세한 내용은 serverless 를 참고해주세요.

여기서는 serverless.yml의 내용중 DynamoDB Stream Callback 에 붙을 Lambda function 을 생성하는 방법에 대한 설정만 정리해보겠습니다.

이런식으로 작성하시면 DynamoDB Stream Callback 용 Lambda function 이 생성되게 됩니다. 실제로 DynamoDB 에 연결됐는지 확인하려면 table trigger 탭을 확인해보시면 됩니다.

Write Lambda function code

Lambda function 배포설정까지 끝났으니 function을 구현해보도록 하겠습니다.
아래는 Lambda Function에 event parameter로 들어오게 되는 Object입니다.

이때, Record는 Event type 에 따라 Image type 이 다르게 내려옵니다. INSERT 일 경우는 NewImage, REMOVE 일 경우엔 OldImage, MODIFY일 경우 OldImage, NewImage 둘 다 내려오게 됩니다. 데이터를 처리하는 간단한 예제를 보겠습니다.

이런식으로 구현해봤는데요. 로직은 간단합니다. 판매도서 기록을 남기는 table 이 있다고 가정할 때 데이터가 들어올 경우 해당 도서의 sell count 를 올려주고 데이터가 지워질 경우 해당 도서의 sell count 를 내려주는 코드입니다.

An example is just an example

Stream 기능은 훨씬 다양한 용도로 사용할 수 있습니다. 예를 들면,

  1. 특정 record 변경에 따른 Email / Slack / App Notification 발송 (“Follower Record가 생기면 Followee 에게 Notification 발송!”)
  2. 검색 index 수정 (빙글에서 매우 Card / Talk / QnA 검색 모두 이렇게 되고 있습니다)
  3. Log storage 에 데이터 저장 (Card의 변경 내역을 변경이 일어날때마다 모두 기록한다던가..)
  4. Multi region database synchronize (Global Table이 출시되면서 굳이 이렇게 구현할 필요가 없어지긴 했지만, 여전히 가능하긴 합니다. 실제로 Global Table도 내부 구현은 Stream 을 사용하고요.)
  5. DynamoDB 를 S3 / AuroraDB 로 replicate

이렇게 다양하게 활용 가능한 것들을 여러분들이 상황에 알맞게 사용하시면 더 좋은 효율을 낼 수 있지 않을까 싶습니다.

허나 어떤 기술이 그렇듯 DynamoDB Stream 이 항상 올바른 선택은 아닙니다. 예를 들어보죠. user, user_token, user_stat 테이블이 있습니다. user 테이블에서 record 가 삭제되면 나머지 두 테이블의 해당 user 에 대한 record 도 동시에 삭제가 되어야 겠죠. 이때, DynamoDB Stream 은 적합한 툴이 아닙니다. Lambda 가 Stream records 를 1초 마다 polling 해줘서 실시간으로 돌아가는 것처럼 보이지만 Lambda 내부에서 records 가 처리되는 시간까지 감안하면 1초 이상이 걸릴 수도 있습니다. 이런 경우 API 가조금 느려지는 것을 감수하더라도 API Level에서 처리해주는게 맞습니다. 
물론, 빙글에서도 이런 실수를 꽤 했었습니다;; 기술 선택은 언제나 어렵죠.
이 글을 통해, 여러분들은 사용하기 전에 발생할 수 있는 문제와 얻게 되는 이득을 잘 고려해서 올바른 기술 선택을 하실 수 있으면 좋겠네요.

마치며

DynamoDB Stream 동작 방식이 Kinesis stream 과 유사하기 때문에 DynamoDB Stream 을 좀더 다양하게 사 용해보시고 싶으신 분들은 Kinesis stream 문서도 같이 찾아보시는 것을 추천드립니다. 이번 아티클이 여러분들께 도움이 되었으면 좋겠네요. 그럼 다음 아티클로 찾아 뵙겠습니다. 감사합니다.

빙글에는 이런 문제를 함께 풀어갈 사람을 언제나 기다립니다.