Process Events with Kinesis and Lambda

Serkan Özal
9 min readMay 4, 2020

--

Processing Kinesis Events with Lambda

Real-time data processing, also known as stream processing, has become almost a must-have feature in different applications covering various scenarios, from handling pizza orders to processing data from gauges on a spaceship. In this guide, we will learn the basics of stream processing with AWS Kinesis and Lambda, as serverless is perfectly suited for such tasks.

Why Serverless?

AWS Lambda allows developers to concentrate on the code rather than on servers or infrastructure. The serverless concept includes such important features as auto-scaling according to load and a pay-as-you-go billing model, making AWS Lambda the most cost-effective tool for building stream processing applications.

AWS Kinesis Data Streams

AWS Kinesis Data Streams is a service designed for real-time capturing and streaming of huge amounts of data. Messages can come to Kinesis from hundreds and thousands of different sources, allowing you to implement scalable, complex systems for real-time or batch data analytics. All these messages, called “data records,” are collected into the Kinesis data stream and are then ready to be retrieved by the consumers: a Lambda function, an EC2-based application, Kinesis Data Analytics, or even Apache Spark on AWS EMR.

AWS Kinesis vs. AWS SQS

Those who are familiar with AWS SQS will probably notice some similarities between these services, as at first sight both are implementations of message storage and delivery mechanisms. However, there are several significant distinctions between them.

One of the key differences is that Kinesis allows you to read / replay streams from any point at any time, while SQS doesn’t provide selective message retrieval and actually just takes the last messages from the stack. Another important difference is that Kinesis supports multiple consumers, while SQS supports only one consumer per message. And of course, Kinesis was designed to handle real-time ingestion of huge data volumes with minimal delays, so if that is applicable to your app, Kinesis should do the trick better than SQS.

In turn, SQS has its own features, like the options to delay messages and to configure separate dead letter queues (DLQs) for messages that cannot be processed for any reason. DLQs are especially useful because they let you isolate incorrect or corrupted messages for further investigation or special processing. This means that you don’t have to delete them to prevent them from blocking the rest of the queue. It is also worth noting that SQS infinitely scales itself according to the load, while the Kinesis streams’ performance depends on the number of shards you’ve configured that are currently running.

Finally, your monthly SQS bill depends on how many requests you’ve made to its API, while Kinesis stream usage is billed per hour per shard plus the number of PUT operations. For this reason, improper use of this service may lead to dramatically increased cost without any improvement in performance.

A Sample Implementation Scenario

Imagine gas-cooled reactors in a regular nuclear plant, where the normal mean temperature of the coolant leaving the reactor is around 640° C. We are now going to build an oversimplified application for monitoring, logging temperature sensor data, and notifying the rest of the system about abnormal values to prevent overheating and thus emergencies.

The system will continuously send temperature readings to the Kinesis data stream. The Lambda function will consume events from the stream by polling records; check the temperature, and send messages to the dedicated SNS topic if the readings are too high. The same Lambda will also save all the readings to the DynamoDB table.

Development Prerequisites

You will need an AWS account, together with an installed and configured AWS Serverless Application Model ( SAM), which allows you to easily define the necessary infrastructure and deploy the application. It is also recommended that you install the AWS CLI, as it will be useful for initial testing of the app. After successfully installing all the items listed, you are ready to set up the basic project from one of the default boilerplates. To do that, open terminal and run the following command:

$ sam init -r nodejs10.x -n reactor-monitor

Figure 1: Initializing the project

Select “AWS Quick Start Templates” and on the next prompt, select “Quick Start from Scratch.” This will create a directory named reactor-monitor with a number of files inside. The ones that interest us are template.yml, which contains the list of all resources needed for our application, and .js files in the /src/handlers/ directory, which is where we will write the JavaScript code.

Prepare the Resources Needed

According to the scenario, our application consists of at least one Kinesis stream, one Lambda function, one DynamoDB table, and a SNS topic. There are several ways to create them: manually with web console, with AWS CLI, or by defining and deploying the AWS CloudFormation template. AWS SAM, which we mentioned previously, is actually the extension of CloudFormation, with some additional functionality related to serverless. Given this fact, the most suitable way for us to create the necessary resources is to declare all the resources in our project’s template.yml file.

First, let’s delete everything below the “Resources” section and write the description of the first resource needed:

Resources:
KinesisStream:
Type: AWS::Kinesis::Stream
Properties:
Name: indications-stream
ShardCount: 1

Figure 2: Declaring the Kinesis stream

We have declared the Kinesis stream responsible for capturing gauge readings. In the same way, we are going to define the DynamoDB table and SNS topic, so let’s just add these lines right below the stream declaration:

IndicationsDynamoDBTable:
Type: AWS::DynamoDB::Table
Properties:
AttributeDefinitions:
- AttributeName: recordId
AttributeType: S
- AttributeName: temp
AttributeType: N
KeySchema:
- AttributeName: recordId
KeyType: HASH
- AttributeName: temp
KeyType: RANGE
ProvisionedThroughput:
ReadCapacityUnits: 5
WriteCapacityUnits: 5
NotificationsTopic:
Type: AWS::SNS::Topic
Properties:
TopicName: reactor-notifications

Figure 3: Declaring DynamoDB table and SNS topic

The above code will tell AWS to create the necessary resources during deployment, but if this is not clear enough, AWS provides great documentation on template syntax and structure.

Before diving into the Lambda code, we also need to include the Lambda template in the template file:

ProcessIndicationsLambda:
Type: AWS::Serverless::Function
Properties:
Handler: src/handlers/processIndications.handlerCOMPANY
Runtime: nodejs10.x
MemorySize: 256
Timeout: 10
Environment:
Variables:
NOTIFICATIONS_TOPIC_ARN: !Ref NotificationsTopic
Events:
Stream:
Type: Kinesis
Properties:
Stream: !GetAtt KinesisStream.Arn
StartingPosition: LATEST
BatchSize: 1
Policies:
- AmazonDynamoDBFullAccess
- AmazonSNSFullAccess

Figure 4: Declaring the Lambda function

According to the template, the future Lambda function will have access to the DynamoDB and SNS. (Note: We’ve given it full access just for simplicity, but you can manage policies more granularly.) Since “BatchSize” is set to 1, the function will always take a maximum of one event from the stream (the default is 100). “StartingPosition” is set to LATEST, meaning that the function should process only fresh events appearing after it is deployed. The “HANDLER” property targets the JavaScript file with the code of our Lambda, which is expected to export a function called “handler.” We have also added an environment variable for storing the ARN of the SNS topic, which we will need in the code so we can publish messages to the topic.

Coding the Lambda Function

Remove everything from the /src/handlers/ directory and create a processIndications.js file with the following code:

const AWS = require('aws-sdk');
const dynamodb = new AWS.DynamoDB();
const sns = new AWS.SNS();

const WARN = 700; // warning threshold
const EMERG = 800; // emergency threshold

module.exports.handler = async (event) => {
const [record] = event.Records;
try {
const payload = Buffer.from(record.kinesis.data, 'base64').toString();
const {temp, reactor, date} = JSON.parse(payload);
// save records to the dynamodb
await dynamodb.putItem({
TableName: 'indications-table',
Item: {
recordId: { S: record.eventID },
temp: { N: String(temp) },
reactor: { S: reactor },
date: { S: date }
}
}).promise();

if (temp >= WARN) {
console.log('Overheat!');
// publish messages to special SNS topic
await sns.publish({
TopicArn: process.env.NOTIFICATIONS_TOPIC_ARN,
Subject: temp >= EMERG ? 'emergency' : 'warning',
Message: `${reactor} coolant temperature is ${temp}°C!`
}).promise();
}
} catch (err) {
console.log(err);
}
};

Figure 5: Coding the Lambda function

Note that we first decode the record’s data, as it initially comes encoded in base64. Then, the readings received are saved to DynamoDB and a message is published to SNS if the temperature is higher than the warning or emergency level. We did not install the “aws-sdk” module via npm. This is because it is needed only for local testing of the application, since the module is included by default in the AWS Lambda environment and doesn’t have to be part of the final package.

Deploy and Test the Application

Now we are ready to run the main command:

$ sam deploy --guided

Figure 6: Starting deployment

This will start the deployment process, but first we will need to answer a few questions appearing in the CLI, such as stack name and region. If everything is all right and the process is finished correctly, you should see a message saying “Successfully created/updated stack — reactor-monitor in eu-west-1.”

In order to test the functionality, we’ll need to put some records into the Kinesis stream. Let’s do that using the AWS CLI.

$ aws kinesis put-record --stream-name indications-stream --partition-key 1 --data '{"temp":750,"reactor":"reactor 1","date":"2020-03-07T16:29:50+02:00" }'

Figure 7: Sending a record to the Kinesis stream

Next, navigate to the AWS web console, select “Lambda,” find your function, and check the logs: You should see only the “Overheat!” log and no errors. Then navigate to the “DynamoDB” panel, select “table,” and check the list of items.

Figure 8: Item added to the DynamoDB

Change values in the “data” property and send a few more events, and you will see new items in the database. For a more realistic data stream you can use the AWS Kinesis Data Generator, which lets you fill the Kinesis stream with the records according to the specified template. But first you’ll have to carefully configure the tool step by step and then set up the template and run-generator.

Tracing and Monitoring

Having a monitoring tool for your nuclear plant is awesome, but it’s also important for you to be able to observe the tool to be sure that it works as expected. To achieve this, let’s enable Thundra for our project using the Quick Start Guide.

Now we need to instrument our “reactor-monitor” function to be able to get insights and detailed information about how the system works: Select the function from the list, click “Instrument,” and then click “OK.”

Figure 9: Instrumenting the Lambda function to achieve detailed observability

Great! Instrumenting may take some time, so go grab some coffee and return in a few minutes.

Make some new records into the stream using the methods mentioned above and check Thundra’s functions list, where you’ll already see the basic statistics.

Figure 10: General statistics for the function

Click on the function’s name and you’ll go to the invocations list, where you can check details on every particular invocation, such as latency breakdown, resource usage, and logs.

Navigate to the “Unique Traces” section via the left menu and check out one of the coolest features, which allows you to track the whole chain of calls to AWS services involved in the business flow of your application. Thundra detects unique flows automatically and groups all the executions by the corresponding flow. You can give each known flow its own alias to make everything even cleaner!

Figure 11: Unique traces in the application

Here you can see that there are three main flows, which differ according to the services involved. The first utilizes only DynamoDB, as there is no need to send SNS messages if the temperature is okay; the second uses both services; and the third doesn’t use any service due to an error resulting from a corrupted json in the input data.

Click on any of the aliases to go to the list of all traces, and select any of them to open the map with a detailed description of that particular trace:

Figure 12: Trace map

Kinesis Is Not a Single Service

Today we have built a very simple application, demonstrating the basics of the Kinesis + Lambda implementation. However, Kinesis can be used in much more complicated scenarios, with multiple sources and consumers involved.

It’s also important to know that data streaming is only one of four services from the Kinesis group. There is also Kinesis Data Firehose for streaming data directly to storage, Kinesis Video Streams for handling media streaming, and Kinesis Data Analytics for analyzing and querying streamed data on-flight.

Appropriate use of these services, together with Lambda and other AWS features, is a powerful combination for creating amazing real-time applications!

Originally published at https://blog.thundra.io.

--

--

Serkan Özal

AWS Serverless Hero | Founder & CTO @ Thundra | Serverless Researcher | JVM Hacker | Oracle OpenSource Contributor | AWS Certified | PhD Candidate