AWS Kinesis Data Streaming with Lambda and Serverless

Pranay Mendhe
4 min readMay 5, 2020

--

Today we are going to explore AWS Kinesis Data Streaming with Lambda functions. So, Amazon Kinesis is a managed, scalable, cloud-based service that allows real-time processing of streaming large amounts of data per second.
Kinesis Data Streams is part of the Kinesis streaming along with Kinesis Data Firehose, Kinesis Video Streams, and Kinesis Data Analytics.

Implementing

In the first step, you have to log in to your AWS Console and Search for kinesis.

select kinesis form search result, you will see the following screen,

Click on Create Data Stream, you will see Data stream configuration.
write kinesis name and a select number of shards, A stream can be composed of one or more shards. One shard can read data at a rate of up to 2 MB/sec and can write up to 1,000 records/sec up to a max of 1 MB/sec. A user should specify the number of shards that coincides with the amount of data expected to be present in their system.

After clicking on the crate data stream, it will take a few seconds to create a stream, once a stream created you will get Stream ARN.

Next step, Create a serverless project by following steps:

serverless create --template aws-nodejs --path AWS-Kinesis
cd AWS-Kinesis
npm init (use all defaults)
touch index.js
mkdir config
touch development.yml AND touch production.yml

after the above steps, you will get handler.js, package.json and serverless.yml file

In config/development.yml and config/production.yml file :

NODE_ENV: development and production
ACCESS_KEY_ID: ***************
SECRET_ACCESS_KEY: ***********************

In serverless.yml file :

service: aws-kinesiscustom:
configFile: ${file(./config/${self:provider.stage}.yml)}
stage: ${opt:stage, self:provider.stage}
provider:
name: aws
runtime: nodejs12.x
stage: ${opt:stage, 'development'}
region: ap-south-1
environment:
NODE_ENV: ${self:custom.configFile.NODE_ENV}
ACCESS_KEY_ID: ${self:custom.configFile.ACCESS_KEY_ID}
SECRET_ACCESS_KEY: ${self:custom.configFile.SECRET_ACCESS_KEY}
functions:
sendDataToKinesis:
handler: index.sendDataToKinesis
reciveDataFromKinesis:
handler: index.reciveDataFromKinesis
events:
- stream: (copy paste Stream ARN here)

In index.js file :

var AWS = require('aws-sdk')
const uuid = require('uuid')
var kinesis = new AWS.Kinesis({
apiVersion: '2013-12-02',
region: 'ap-south-1',
accessKeyId: process.env.ACCESS_KEY_ID,
secretAccessKey: process.env.SECRET_ACCESS_KEY
})
let awsfun = (type, method, params) => {
return new Promise((resolve, reject) => {
type[method](params, (err, data) => {
if (err) {
reject(err)
} else {
resolve(data)
}
})
})
}
module.exports.sendDataToKinesis = async (event, context, callback) => {
try {
const partitionKey = uuid.v1()
let data = {name: "John", age: 31, city: "New York"}
let params = {
Data: JSON.stringify(data),
PartitionKey: partitionKey,
StreamName: 'kinesis-test'
}
await awsfun(kinesis, 'putRecord', params)
} catch (e) {
console.log(e)
}
}
module.exports.reciveDataFromKinesis = async (event, context, callback) => {
try{
context.callbackWaitsForEmptyEventLoop = false
const output = event.Records.map((record) => {
var payload = Buffer.from(record.kinesis.data, 'base64').toString('ascii')
payload = JSON.parse(payload)
return payload
})
console.log(`Kinesis stream "data-receiver" output :\n${JSON.stringify(output, null, 2)}`)
callback(null, JSON.stringify("done"));
} catch (e) {
console.log(e)
}
}

Deployment

for production deploy command
`serverless deploy — stage production -v`

for development deploy command
`serverless deploy — stage development -v`

Test

to check your deployed lambda functions go to AWS Lambda,

open sendDataToKinesis Lambda function, you can check all configuration of function,

next, click on Test and configure test event and save it.

select your test event and click the Test button again, it will execute lambda function and send result as succeeded or failed,

once you get result as succeeded, check reciveDataFromKinesis function and go to Monitoring Tab and click on View logs in CloudWatch, it will redirect to CloudWatch > Log Groups,

here you can check data received by Kinesis.

Thanks for reading the article, If you have any queries contact me at pranaymendhe@gmail.com.

--

--