Analytics Vidhya
Published in

Analytics Vidhya

AWS Data Analytics — Kinesis

How to move data on AWS ?

3 different categories

1) Real time

  • Kinesis Data Stream
  • Simple Que Service
  • IoT

2) Near-Real time (Reactive action)

  • Kinesis Data Firehose
  • Database Migration Service

3) Batch — History Analysis

Note: This usually called when you want yo move large amount of data

  • Snowball
  • Datapipeline

AWS KINESIS OVERVIEW

  • Alternative to Kafka
  • It’s great if you want to gather data such as application logs metrics IoT data or click streams.
  • It integrates with a lot of processing framework for string pressing frameworks such as Spark or NiFi

Kinesis Streams: Low latency streaming

Kinesis Analytics: Perform real time analytics on Streams using SQL

Kinesis Firehose: Load Streams into s3, Redshift, ES, Splunk

Architecture Kinesis

Step by step;

Source: ClickStreams, IoT, Metric and Logs

Source goes to Amazon Kinesis Streams if you want to make analyse, compute metric or make a alert you should be use Amazon Kinesis Analytics so if want to store or make realtime dashboard you have to use Amazon Kinesis Firehose and what does Kinesis firehose? Actually it can be deliver data to S3, ES, Redshift or Splunk.

Kinesis Streams Overview

Streams are divided in ordered shards and partitions. Shards meaning is equivalent of partition.

For example: Producer -> 3 * Shard (shard-1, shard-2, shard-3 )->Consumers

Consumer read data with shards. Kinesis Streams does not store your data forever it just store 24 hour by default so you basically store just 1 day. But if your data is so critical you can store up to 7 days.

  • Kinesis have ability reprocess, replay data.
  • Multiple application can consume the same stream
  • It’s not a database!
  • Once data is inserted in kinesis, it can’t be deleted or immutability it’s append only stream

Kinesis Streams Shards

  • One stream is made of many different shards or partitions.
  • İmportent case is billing! so many shards, many dollar, many billing. You get billed per shard provisions.
  • Batching available or per message put.
  • Number of shards can evolve over time so you can rashard or merge in these operations.
  • Records are ordered per a shard

Producer -> 3 * Shard (shard-1, shard-2, shard-3 )->Consumers.

What our producer do these shards?

Kinesis Streams Records

  • Our producer records made of data blob. Data blob serialized as bytes up to 1MB and represent anything.
  • Record Key: Helps kinesis know to reach shard to send that data to. It’s like user ID.
  • Sequence number: It’s not something the producer sends is something that get added by kinesis after ingestion. Unique identifier for each records put in shards.

LIMITS!

  • Producer can only 1 mb or 1000 per message at write per shard. For example if you have a 10 shards you get 10mb per a second or 10k message.
  • If you over that limit you called ProvisionedThoughputException.

Kinesis have two type consumer.

  1. Consumer Classic
  • 2 mb to sec. to read per a shard across all consumer.
  • 5 API call per second per shard.

2. Consumer Enhanced Fan-Out

  • No API call
  • 2 mb to sec. to read per shard, per enhanced consumer

Kinesis Producer

The first one is the SDK (Software development kit) allows the write you code or use CLIA to directly send data into Kinesis. Second one is Kinesis Producer (KP) thirt one is Kinesis agent and this agent basically allows you to get a log file. Last part is 3th lib. spark, flume etc. all these things will send data to kinesis.

Kinesis Producer SDK

  • Used PutRecord (one) and PutRecords (many)
  • PutRecords used batching you can send records as part of one HTTP request and therefore you’re saving in the HTTP
  • ProvisionedThoughputExceeded if we go over limit.

Managed:

  • AWS IoT
  • CloudWatch
  • Kinesis Data Analytics

AWS Kinesis Exceptions

  • ProvisionedThoughputExceeded: Exceeding MB/s or TPS for any shard
  • Bad key in partition

Basic Solutions for Exception

  • Retries with backoff
  • Increase Shards

Kinesis Producer Library

Synchronous and Asynchronous API but butter performance in async, you can watch with CloudWatch for monitoring. Very important mechanism is batching so this batching divided two part first part is Collect and second is Aggregate.

Collect records and write to multiple shards in the same PutRecords API

Aggregate increased latency capability to store multiple records in one record go over 1000 record per a sec. limit max limit is 1 mb /sec.

Important things: For example you have 3 part data 20KB- 30 KB- 200KB and all this data come in respectively 3 ms, 5 ms, 6 ms actually all this datas <1mb and you can use PutRecords API called one times. But how? introducing some delay with RecordMaxBufferdTime(default 100ms). Basically you’re saying I’m willing to wait 100ms so that’s adding a little bit of lateceny at the tradeoff of being more efficient.

Last part is Kinesis Agent so this agent monitoring log files and sends them to kinesis data streams its built on top KPL (Kinesis Producer Library) write from multiple directories, the agent handles file rotation, checkpoints and rety upon failures and emits metrics to CloudWatch for monitoring

Kinesis Consumer Classic

Steams goes everwhere for example Lambda, Firehose, Spark, SDK,client etc.

Classic Kinesis — Records are polled by consumers from a shards. Easch shard has 2 MB total aggregate. How it’s work? Consumer talking to for example shard-1 and say hey dude! GetRecords() and shard-1 saying ok buddy this is for you “data”. GetRecords return up to 10 mb of data then throttle for 5 sec. or you can get up to 10.000 records. Maximum of 5 getrecords api calls per shard per second 200ms.

Important: If 5 client consume from same shard means every consumer can poll once a sec. less than 400kb/s.

Kinesis Client Library

Read records from Kinesis Producer.

Share multiple shards with multiple consumers in one group it’s shards discovery.

Checkpoint future to resume progress! So if application goes down and comes back up it’s able to remember exactly where it was consuming last in order to resume the progress.

So how does checkpoint works and all the shard discovery?

Well basically uses an DynamoDB. DB table to check point the progress over time and synchronize to see who is going to read which shard. So DynamoDB will be used for check pointing and it will have one row in the table for each shard to consume from.

Kinesis Connector Library

Kinesis data streams outputs is used to write data to S3, DynamoDb, Redshift or ES. This connector running on EC2 it’s be must!

Kinesis Enhanced Fan Out

It’s work on KCL 2.0 and lambda. What is kinesis enhanced fan out so each consumer will get 2 mb per second per a shard. It’s look similar as before but it’s not exactly same thing. Producer goes to Kinesis Data Streams and consumer use SubscribeToShard() so we just start and that’s all. Not pulling anymore. That’s means if we have 20 consumer overall we’ll get 40mb/s per a shard. No more 2mb limit!! reason is kinesis pushes data to consumer over http/2. Reduce latency 70ms.

Enchanced vs Standart Consumer

Standard consumer use low number 1–2–3 bla bla, can tolerate 200ms latency, minimize cost

Enhanced is multiple consumer applications for the same Streams, low latency 70ms. Default limit of 5 consumers using enhanced fan-out per data stream.

Kinesis Operations -> Adding Shards

Also called shard splitting, can be used increase the streams capacity(1mb/s data in per shard) if you have 10 shard you have 10mb/s.

Can be used divide a “hot shard”

So what happens when you split a shard? Well the old shards is closed and will be deleted one the data.

For example we have 3 shard and all same space.

| Shard-1| Shard-2| Shard-3|

and let’s imagine shard-2 very hot and we want the split it to increase throughput on this key space of shard-2. So we’re going to do split operation what’s going to happen is that gonna be shard-4 and has beedn created and shard-5.

| Shard-1| Shard-4|Shard-5| Shard-3|

So shard-2 will be available as long as the data in it is not expirend but when it’s expired it will be gone.

Kinesis Operations -> Merging Shards

Decrease the stream capacity and save cost and can be used to group two shards with low traffic.

Again old shards are closed and deleted based on data expiration.

| Shard-1| Shard-4|Shard-5| Shard-3|

for example we merge shard-1 and shard-4 ’cause did not get so much traffic so we can merge them together and save some cost.

| Shard-6|Shard-5| Shard-3|

So what about say auto-scaling.

  • Auto scaling is not native feature of kinesis
  • The API call to change the number of shards in kinesis with UpdateShardCount
  • We can implement AutoScale with AWS Lambda

Notes:

  • Resharding can’t be done in parallel. You need plan and capacity in case.

And don’t do that list!

  • Scale up to more than double your current shard count for a stream
  • Scale up to more 500 shards in a stream
  • Scale up to more than the shard limit for your account

That’s scale resharding can not be done in parallel and you need to basically resharding takes a few second for each shard.

Kinesis Security

  • Control access / auth using IAM
  • Encryption in HTTP endpoint
  • Encryption at rest using KMS
  • VPC available for kinesis to access within VPC

AWS Kinesis Data Firehose

  • Fully managed service, no administration
  • Near- real time! (60 sec latency) ok but why? when you see the batching there is a 60 sec. latency minimum if your batch is not full so we don’t have the guarantee the data will be right away in a destination.
  • Load data into S3, ES, Splunk, Redshift
  • AutoScaling
  • Data Conversions from json to parquet (only s3)
  • Data Transformation AWS Lambda csv -> json
  • Compression GZIP, ZIP, SNAPPY
  • Only GZIP is the data is further loaded into Redshift
  • Spark or Kinesis Client Library can’t read from Kinesis Data Firehose

SDK, Kinesis Agent, Kinesis Data Streams,Cloud Watch, IoT send data to kinesis firehose and firehose is able to do some transformation and we can use lambda function after that we store on s3, ES, splank, redshift

Buffer size is 32 mb and time is 2 min if reached it’s flushed.

Kinesis Data Streams vs Firehose

Streams

  • Going to write custom code(producer/consumer)
  • Real time
  • Must manage scaling (merging/splitting)
  • Data store 1–7 day

Firehose

  • Fully managed send to s3, es, splkunk, redshift
  • Serverless data transformation with lambda
  • Near realtime
  • AutoScaling
  • No data store

--

--

Get the Medium app

A button that says 'Download on the App Store', and if clicked it will lead you to the iOS App store
A button that says 'Get it on, Google Play', and if clicked it will lead you to the Google Play store