Node.js 에서 DynamoDB Streams 다루기

DynamoDB는 aws에서 제공하는 managed NoSQL 서비스이다. DynamoDB를 사용함에 있어서 여러가지 장점이 있겠지만, 이 글에서는 DyanmoDB Streams 를 Node.js (javascript) 에서 다루는 방법에 대해 기술한다.

DynamoDB는 테이블의 데이터 변화를 Stream형태로 제공한다. 이를 통해 DB의 데이터 변화를 실시간으로 유저에게 전달한다거나 cross region 별 replica를 생성한다거나 하는 등의 작업을 수행 할 수 있다. 또한, 데이터의 변화를 시간의 순서대로 전송하기때문에 데이터 변화 히스토리를 정확한 순서대로 재연할 수 있다.

aws lambda 서비스의 경우 event의 소스로 dynamodb streams를 선택 할 수 있기 때문에 데이터 변화의 이벤트를 포착하고 처리하는게 매우 쉽지만, 데이터 변화의 양이 많을경우에는 lambda의 비용압박(?) 때문에 사용하기가 꺼려질 수 있다. 따라서 일반적인 node.js 프로세스에서 dynamodb streams를 획득하고 처리하는 방법에 대해 기술하도록 하겠다.


위의 링크에서 어떻게 스트림을 획득하고 처리하는지에 대한 가이드가 나와있지만 처음 개념이 잡히기 전에는 바로 이해가 가지 않았었기 때문에 각 용어의 개념부터 정리하는 것이 좋을 것 같다.

DynamoDB Streams 구조
  • DynamoDB Stream : 데이터의 변화(추가,수정,삭제) 에 대한 기록이 일련의 데이터구조로(shard) 전달되는 형태
  • Shard: 데이터변화의 묶음(변화되는 데이터들의 특정 그룹으로 묶어놓았다고 이해하면 되겠다)
  • Shard Iterator: 특정 Shard 안에서 data를 찾아갈 수 있도록 해 주는 인덱스 키

이 정도로 데이터 구조를 파악했으면, 실제로 어떻게 사용하고 API는 어떻게 호출해야 하는지 알아보도록 하자.


DynamoDB 테이블의 Overview 탭을 보면 Stream을 활성화 하기 위한 “Manage Stream” 버튼이 있다. 이를 클릭하고 view type을 설정하면 ARN이 생성되면서 해당 테이블은 스트림을 사용할 수 있게 된다.

여기서 view type은 4가지 중 고를 수 있으며 이 타입에 따라 streams 안에 담겨져 오는 데이터가 달라진다.

  • Keys only: 키 값만 전송
  • New Image: 새로운 값만 전송
  • Old Image: 변경되기 전 값만 전송
  • New and Old Images: 변경되기 전,후 값 모두 전송

Node.js에서 사용할 API에 대한 자세한 설명은 아래의 링크에서 확인 할 수 있다.

크게 4가지 API를 사용해서 스트림을 처리 할 수 있다.

  • listStreams: 계정이 가지고 있는 stream 리스트를 반환한다.
  • describeStream: 특정 stream의 현 상태를 반환한다. (Shards 리턴)
  • getShardIterator: 특정 shard의 iterator를 반환한다.
  • getRecords: shardIterator를 기준으로 실제 변화 데이터를 반환한다.

listStream는 사용하지 않을 것이기 때문에 describeStream을 통해 shards를 가져오고, 각 shard 별로 getShardIterator를 호출하여 iterator를 획득 한 후 이를 가지고 getRecords 를 호출해 실제 변화 데이터를 가져온다.

여기서 미리 알고 있으면 좋은 점들을 몇 가지 짚고 넘어가겠다.

  • DynamoDB 스트림은 최대 24시간 저장된다.

이 말은 내부적으로 스트림을 24시간 저장한다는 의미이며, 우리가 만들 프로그램에서 describeStream 을 호출 했을 때 실시간으로 그 이후 변화하는 데이터만을 리턴해주는 것이 아니라 최근 24시간의 shards를 리턴한다는 의미이다. 즉, describeStream을 호출 할 때, 이미 처리한 shards를 제외 하려면 ExclusiveStartShardId 파라미터를 설정해 주어야 하며 이를 설정해 주지 않으면 매번 호출 할 때마다 이미 처리한 shard라도 생성 된 지 24시간 이내라면 다시 리턴해 준다는 뜻이다.

  • 실제로 데이터의 변화가 없더라도 shard는 열리고 닫힌다.

테이블에서 데이터의 변화가 없더라도 describeStream을 통해 shard는 리턴되고 그 shard안에서 데이터를 가져오려고 getRecords를 호출하면 empty array 를 반환한다. 꼭 데이터의 변화가 있어야만 shard를 생성하는 것이 아니라 timeslot을 기준으로 생성한다고 생각하는 편이 이해하기 쉽겠다.

  • 모든 데이터 변화에 대한 처리는 순차적으로 처리되어야 한다.

여러개의 shard가 리턴 되더라도 순차적으로 데이터의 변화과정을 처리해야 데이터 정합성이 맞는다.

  • describeStream은 계정별로 1초당 최대 10번만 호출 할 수 있다.
  • 이미 닫힌 shard는 describeStream 호출 시 EndingSequenceNumber를 반환한다.

closed 된 shard는 한번만 처리하면 되지만, 아직 closed되지 않은 shard는 동일 shard를 여러번 참조해야 할 수 있다.

  • 하나의 shard 안에서 특정 위치부터 찾고자 할때는 getShardIterator 호출 시 SequenceNumber 파라미터를 설정한다.

아직 오픈된 shard를 여러번 참조해야 할 경우 동일한 레코드를 여러번 참조하지 않기 위하여 getRecords 호출 시 결과로 반환받은 sequence Number를 iterator 호출 시 파라미터로 전송하여 다시 참조하지 않도록 한다.

  • getRecords 호출 시, NextShardIterator 를 반환하지 않는 다면 그 shard는 닫혀있고 더이상 해당 shard에서 읽을 데이터가 없다는 의미이다.
  • open되어 있는 shard의 경우 getRecords 호출 시 항상 NextShardIterator를 반환한다.

이를 가지고 다음 번 getRecords 호출의 파라미터로 사용 할 수 있다.


dynamoDB streams를 처리하는 실제 코드는 아래와 같다.

간단히 설명하면, config.aws.dynamodbStreams 에 스트림을 처리하고자 하는 테이블의 arn을 {tableName:arn} 형태로 저장해 놓고 describeStream, getShardIterator, getRecords 의 순으로 API를 호출하여 변화된 데이터의 레코드를 획득한다. 코드 중간에 있는 Immediately invoked function 은 데이터 정합성을 위해 loop를 순서대로 처리해야 할 필요가 있기때문에 recursive 형태로 구현한 것이다.

마지막 _processRecord 함수에서 실제로 획득한 record를 출력해보면 아래 그림과 같은 형태를 가지는 것을 확인 할 수 있다.

eventName을 가지고 분기하여 사용자의 UI에 Notify 해준다거나 하는 작업을 수행 할 수 있을 것이다.