AWS Kinesis Data Stream 으로 대용량 데이터 수집을 편하게!

Heewin Kim
webling-tech
Published in
14 min readAug 9, 2021

--

Amazon Kinesis?

실시간으로 비디오 및 데이터 스트림을 손쉽게 수집, 처리 및 분석 하기 위한 AWS(Amazon Web Service)에서 제공하는 클라우드 서비스입니다.

Kinesis에서 제공되는 기능

Kinesis Data Streams
데이터 스트림으로 스트리밍 데이터를 수집합니다.
Kinesis Data Firehose
데이터 전송 스트림으로 스트리밍 데이터를 처리 및 전송합니다.
Kinesis Data Analytics
데이터 분석 애플리케이션을 사용하여 스트리밍 데이터를 분석합니다.

본 포스팅에서는 Kinesis Data Stream, Kinesis Data Firehouse 에 대해 다루며 내용이 꽤 길기 때문에 내용 순서를 요약하자면
Kinesis Data Stream 설명, Kinesis Data Firehouse 설명, Kinesis Data Stream 와Kinesis Data Firehouse를 활용한 데이터 수집/처리 예시 순으로 정리합니다.

Kinesis Data Stream?

Kinesis Data Stream Architecture

대량의 데이터를 실시간으로 수집,처리하는 서비스. 내부적으로 샤드로 구성되어 있으며 이를 동적으로 증가/감소할 수 있고 샤드 수에 따른 보장된 전송시간을 제공하기 때문에 이점이 많다. 주로 연속적으로 생산되는 대량의 데이터소스( 1MB 이하 )를 처리해야하는 파이프라인에 사용하기 좋다.

이와 비슷한 서비스로 Kafka, Rabbitmq, … 등이 있으며 이와 구별되는 Kinesis stream 의 특징으로는 아래와 같다.

  • AWS 클라우드 서비스로 제공되어 서버관리가 필요없음
  • AWS 보안을 사용하기 때문에 보안성이 좋음
  • 이미 AWS 클라우드 서비스를 사용하고 있다면 도입이 간편
  • 수집/처리 하는 데이터량을 당장 몰라도 손쉽게 변경/유지관리가 가능

Kinesis Data Stream에서 사용되는 개념

데이터 레코드
데이터 레코드 는 저장되는 데이터의 단위입니다. 데이터 레코드는 시퀀스 번호, 파티션 키 및 데이터가 있으며 데이터는 최대 1MB 입니다.

보존 기간
보존 기간은 데이터 레코드를 스트림에 추가한 후 데이터 레코드에 액세스할 수 있는 시간의 길이입니다. 스트림 보존 기간은 기본적으로 생성 후 24시간으로 설정됩니다. 이 기간은 추가적으로 더 늘릴 수 있으나 그에 할당하는 추가 요금이 적용됩니다. 일반적으로 기본 24시간으로 사용됩니다.

Producer
생산자는 Amazon Kinesis Data Stream으로 데이터를 입력하는 주체이며, 그림에서 와 같이 웹서버, 디바이스 등이 있습니다.

Consumer
소비자는 Amazon Kinesis Data Stream으로부터 데이터를 가져와 사용하는 주체이며, 기본적으로 그림 b와 같이 Kinesis Data Firehouse를 바로 연결하여 처리합니다.

Shard
샤드는 스트림에서 고유하게 식별되는 데이터 레코드 시퀀스입니다. 스트림은 하나 이상의 샤드로 구성되며 각 샤드는 고정된 용량 단위를 제공합니다. 각 샤드는 읽기에 대해 초당 최대 5개의 트랜잭션, 최대 총 데이터 읽기 속도 (초당 2MB), 초당 최대 1,000개의 레코드를 지원할 수 있습니다. 최대 총 데이터 쓰기 속도는 초당 1MB (파티션 키 포함) 입니다. 스트림의 데이터 용량은 스트림에 지정하는 샤드 수의 함수입니다. 스트림의 총 용량은 해당 샤드의 용량의 합계입니다. 이러한 샤드를 구축 후에도 언제든지 변경 가능하여 데이터속도, 용량에 대한 대비가 가능합니다.

파티션 키
A파티션 키는 스트림 내에서 샤드별로 데이터를 그룹화하는 데 사용됩니다. Kinesis Data Streams 스트림에 속한 데이터 레코드를 여러 샤드로 나눕니다. 각 데이터 레코드와 연결된 파티션 키를 사용하여 해당 데이터 레코드가 속한 샤드를 확인합니다. 파티션 키는 각 키에 대한 최대 길이 제한이 256자인 유니코드 문자열입니다. 파티션 키를 128비트 정수 값에 매핑하고 샤드의 해시 키 범위를 사용하여 연결된 데이터 레코드를 샤드에 매핑하기 위해 MD5 해시 함수가 사용됩니다. 애플리케이션이 데이터를 스트림에 넣을 때는 파티션 키를 지정해야 합니다.

시퀀스 번호
Kinesis Data Stream에서 내부적으로 할당하는 시퀀스 번호 입니다.

Kinesis Data Firehouse?

Amazon Kinesis Data Firehose는 실시간 스트리밍 데이터 를 제공하기 위한 완전 관리형 서비스입니다. 생산자(데이터 입력) — Kinesis Data Firehouse — 소비자와 같이 Kinesis Data Stream과 동일한 입출력 개념으로 사용되며 전송과정에서 데이터 변환이 가능하며 AWS의 S3, RDS 등을 소비자로 사용하기에 간편하게 구성되어있습니다.

일반적인 사용의 예시

Kinesis Data Firehouse 에서 사용되는 개념

기록
데이터 생산자가 Kinesis Data Firehose 전송 스트림으로 보내는 관심 데이터입니다. 레코드의 최대 크기는 1MB 입니다.
데이터 생산자
생산자는 레코드를 Kinesis Data Firehose 전송 스트림으로 보냅니다. 예를 들어 로그 데이터를 전송 스트림으로 보내는 웹 서버는 데이터 생산자입니다.
또한 기존 Kinesis Data Stream 에서 데이터를 자동으로 읽고 대상으로 로드하도록 Kinesis Data Firehose 전송 스트림을 구성할 수 있습니다.
버퍼 크기 및 버퍼 간격
Kinesis Data Firehose는 수신 스트리밍 데이터를 특정 크기로 또는 특정 기간 동안 버퍼링하여 대상으로 전달합니다. 버퍼 크기 는 MB 단위이고 버퍼 간격 은 초 단위입니다.

Kinesis Data Stream 사용법

Kinesis Data Stream를 사용하기 위한 인스턴스(?) 생성은 어렵지 않습니다만, 이후 Consumer 를 설정하기 위하여 즉 실시간 스트림 전송되는 데이터를 처리/전송/소비 하기 위한 설정은 다양한 방식이 있습니다.
스냅스에서는 Kinesis Data Firehouse를 Consumer로 사용하여 설정하는 방법을 채택하였으며 이후 포스팅에서도 그러한 흐름으로 정리됩니다.

본 사용법 정리는 보안그룹, VPC, IAM등의 설정이 다 완료되었다는 가정하에 진행됩니다.

Kinesis Data Stream 생성

아마존 Kinesis Data Stream의 생성은 위의 그림에서 1번을 클릭 함으로써 진행 가능합니다. 또한 비용계산을 위한 계산기는 그림의 2번을 누르면 사용 가능합니다.

데이터 스트림 생성

먼저 데이터 스트림 이름에 원하는 이름을 적고 샤드 수를 선택합니다. 샤드 수에 따른 데이터 처리속도와 요금이 설정되지만 이후 유연하게 수정이 가능합니다. 본 포스팅에서는 테스트용으로 열린 샤드 수를 1로 설정합니다.

Amazon Kinesis 메인 화면

다음으로 Kinesis Data Firehouse를 생성합니다. 위의 그림에서 Create Delivery Stream을 클릭하여 생성하면 됩니다.

Kinesis Data Firehouse 설정

이름을 입력하고, source(생산자)에서 이전에 만들어둔 Kinesis Data Stream을 선택합니다. 그리고 Next!

테스트에서는 데이터 Transform 과정을 하지 않기때문에 기본 Disable값들 그대로 Next를 누릅니다. ( 이 과정에서 Lambda를 통해 데이터를 변환하거나, 레코드 자체의 포맷을 변환 가능합니다.)

원하는 Destination을 선택합니다. 테스트에서는 S3를 선택후 저장될 버킷을 생성 or 선택하면 됩니다.

레코드들이 S3에 파일로 저장될 때의 파일 경로/이름 형식을 지정해야 합니다.
예를들어 Prefix에는
data/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/

Error-Prefix에는
data/year=!{timestamp:yyyy}/month=!{timestamp:MM}/day=!{timestamp:dd}/hour=!{timestamp:HH}/!{firehose:error-output-type}

와 같이 입력합니다. 이후 Next를 클릭!
위와같이 입력하는 이유는, 후에 데이터를 분석하기위해 Athena에서 분석을 하기 위해 테이블 생성시 파티셔닝에 유리하기 때문입니다. 자세한 prefix 문법은Custom Prefixes for Amazon S3 Objects — Amazon Kinesis Data Firehose 를 참조하세요

Buffer Size 및 Buffer Interval은 테스트이므로 위와 같이 입력하며 아래의 S3 저장시에 압축과 암호화는 Disable의 기본값을 사용합니다.

위의 옵션들은 모두 기본값으로 냅두면 됩니다. (Cloud Watch를 통한 에러 로깅, 태그, IAM 역할 설정 ) 그리고 Next! 후에 설정된 옵션들을 모두 확인하고 최종적으로 Create Delivery Stream 을 클릭하여 생성!

수고하셨습니다. 이미지가 많아 스크롤 압박이 심하지만 의외로 어려운 부분은 없었다고 생각합니다. 그리고 Kinesis Data Firehouse 생성중 옵션들이 꽤나 많은데 본 포스팅에서 다 설명하기에는 복잡해지고 공식 문서에서도 충분히 잘 설명되어있기 때문에 다루지 않았습니다.

Kinesis Stream 에 데이터 입력하기

AWS Kinesis에서 데이터를 입력하는 방식은 공식적인 문서 Sending Data to an Amazon Kinesis Data Firehose Delivery Stream — Amazon Kinesis Data Firehose 에서 다양한 방법을 확인할 수 있습니다.
그 내용을 요약하면 아래와 같습니다.

지원되는 데이터 입력 방법들

본 포스팅에서는 가장 확장성이 좋은 AWS SDK를 이용하여 데이터를 입력해 보겠습니다. 전체적인 흐름을 다시 상기시켜보면 Kinesis Data Stream과 Firehouse가 연결되어있으므로 Kinesis Data Stream으로 데이터를 입력시 소비자인 Firehouse가 그 데이터를 받아 지정된 S3에 저장하는 흐름이 진행될 것입니다.

AWS SDK 설정

EC2에서 python기반 어플리케이션이 운영중이며 해당 어플리케이션의 로그를 Kinesis Data Stream으로 보내는 상황을 전제로 합니다.
AWS SDK를 사용하기 위해서는 서버에 awscli가 설치 되어있어야 합니다. https://aws.amazon.com/ko/cli/ 참조. 또한 설치된 awscli의 사용을 위해 아래와 같이 데이터를 보낼 서버에서 aws configure 명령어를 통한 aws key값이 설정되어있어야합니다.

$ aws configure
AWS Access Key ID [****************EETA]:****************** (보안)
AWS Secret Access Key [****************CixY]:***************** (보안)
Default region name [None]: ap-northeast-2
Default output format [None]:

본 포스팅에서는 python언어를 사용하며 위의 awscli를 파이썬에서 이용하기 위해 aws에서 제공되는 파이썬 패키지인 boto3가 설치되어야합니다.

pip3 install boto3 -y

전송을 하기 위한 파이썬 코드와 전송할 샘플데이터를 다운로드 합니다 (파이썬이 설치되어있어야합니다.)
소스 코드

샘플 데이터 csv 파일

https://github.com/heewinkim/kinesis-example/blob/main/sample_stream_data.csv

준비가 되었으면 서버 안에서 아래와 같이 gen_kinesis_data.py를 실행하여 데이터를 전송합니다.

python3 gen_kinesis_data.py --stream-name={생성한 kinesis data stream의 이름}

실행 후 S3 Prefix로 설정된 경로를 따라가보면 (본 포스팅 내용대로라면 data/year=2021/month=08/day=09/hour=14 위치에 생성된 파일에 전송된 데이터가 잘 저장되어있음을 확인할 수 있습니다.

본 포스팅에서 Kinesis에 대한 설명과 활용을 위한 간단한 예제까지 진행해 보았습니다. 다른 프레임워크의 앱보다 훨씬 간편하다는 장점이 있는것 같습니다. 또한 여기에서는 S3에 저장하는 예시만 정리했지만 Document검색을 위한 ElasticSearch,RDS 등을 Destination으로 하거나 데이터 전후처리를 위한 AWS Lambda를 활용하는 등 다양한 ETL 작업을 손쉽게 할 수 있어 확장성에 유리하다고 생각되었습니다.

다만 AWS의 보안,권한을 위한 시스템을 잘 이해하고 시스템 구축시에 필요한 설정값을 잘 설정해야한다는 진입장벽이 있다고 생각됩니다.

추가적으로 궁금증이 생길만한 점을 정리하고 본 포스팅을 마치겠습니다.

  • Kinesis Data Firehouse를 왜 단독으로 사용하지 않고 굳이 앞에 Stream을 붙이나요?? → Stream을 앞단에 연결함으로써 많은 요청이 한꺼번에 들어오거나 여러 오류로부터 데이터의 안정성, 무결성을 유지할 수 있습니다.
  • 레코드(데이터 한 건)의 크기는 1MB 보다 더 크게 사용할 순 없나요? → 정식적으로 지원하는 기능은 없으나 요청데이터 분할 및 후처리로 분할된 데이터 병합 등의 방법으로 사용할 순 있을것 같습니다만… 권유되지 않습니다.

--

--