Streaming Data Analytics Using Kinesis Streams for Anomaly Detection via the Random Forest Algorithm (ML)

Mrfaizanrafiq
Cloud Computing | AWS | Faizan Rafiq
10 min readApr 16, 2020

Author: Faizan Rafiq

Introduction

This article contains content on temperature data anomaly detection using the RandomCutForest ML algorithm. We have used cloud vendor Amazon Web Services as a development platform for our anomaly detection application. As a simulation of an actual IoT temperature sensor device, a python program is developed which sends the data real-time to the kinesis data stream, the ingested data get processed by kinesis data analytics application using a random cut forest algorithm. Normal temperature data was supposed to be 10 to 20 degrees in range and 100 to 120 range of data was being sent to stream as a known anomaly supposed to be detected by the system.

Algorithms score a number against each real-time temperature record, and processed data get stored on target location Amazon S3 processed bucket using Kinesis Firehose stream. For the visualization of processed data with error classification on a graphical presentation, we have used the Amazon QuickSight tool.

Use Case

Temperature Data Anomaly Detection using Random_Cut_Forest ML algorithm of Kinesis Data Analytics.

Implementation Architecture Diagram

Article Architecture Diagram

Introduction to Cloud Services Used in This Article

Amazon Kinesis Data Streams

You can use Amazon Kinesis Data Streams to build your own streaming application. This application can process and analyze streaming data by continuously capturing and storing terabytes of data per hour from hundreds of thousands of sources.

Kinesis Stream’s services are not part of AWS Free Tier, so please consult below page https://aws.amazon.com/kinesis/data-streams/pricing/

Amazon Kinesis Data Analytics

Kinesis Data Analytics provides an easy and familiar standard SQL language to analyze streaming data in real-time. One of its most powerful features is that there are no new languages, processing frameworks, or complex machine learning algorithms that you need to learn.

Kinesis Data Analytics Cost

With Amazon Kinesis Data Analytics, you pay only for what you use. There are no resources to provision or upfront costs associated with Amazon Kinesis Data Analytics.

You are charged an hourly rate based on the number of Amazon Kinesis Processing Units (or KPUs) used to run your streaming application. A single KPU is a unit of stream processing capacity comprised of 1 vCPU compute and 4 GB memory. Amazon Kinesis Data Analytics automatically scales the number of KPUs required by your stream processing application as the demands of memory and compute vary in response to processing complexity and the throughput of streaming data processed.

Amazon Kinesis Data Firehose

Kinesis Data Firehose is the easiest way to load streaming data into AWS. It can capture, transform, and load streaming data into Amazon S3, Amazon Redshift, and Amazon Elasticsearch Service.

Solution summary step by step

The following is a quick walkthrough of the use case which has been implemented.

Step 1: Kinesis Data Stream (Source)

Created a data stream with shard 1.

Step 2: Real-Time Stream Data Ingestion — Live Data Feed

IoT sensors send streaming data into Kinesis Firehose Streams. For this, use a Python script to simulate an IoT temperature sensor device that sends the streaming data.

Step 3: Create a Kinesis Analytics Application on AWS

By using the built-in RANDOM_CUT_FOREST function in Kinesis Data Analytics, you can detect anomalies in real-time with the sensor data that is stored in Kinesis Data Streams. RANDOM_CUT_FOREST is also an appropriate algorithm for many other kinds of anomaly-detection use cases, for example, the media sentiment example mentioned earlier in this post.

Step 4: Store Processed Data to S3 (Processed)

The processed anomaly data is then loaded into the Kinesis Data Firehose delivery stream. By using the built-in integration that Kinesis Data Firehose has with Amazon S3, you can easily export the processed anomaly data into the S3 bucket.

Step 5: Show the Visualization of S3(Processed)

Use AWS QuickSight to visualize the insights of Processed data.

Implementation detail on Steps Mentioned Above

Step 1: Kinesis Data Stream (Live)

Created Kinesis Data Stream, to have a real-time data feed on a live source.We will use a python script as a dummy IoT sensor, which will ingest data to the kinesis stream in JSON format. Kinesis Data Stream has been set for the data retention period of 24 hours.

Step 2: Stream Ingestion

Developed and executed a python program which is sending random temperature data to kinesis stream, This python script is the main source of sending data to stream real-time. For our anomaly detection use case, the temperature range is 10 to 20 degrees, for the artificial anomaly, we ingest 100 to 120 anomaly degree data to the stream which will be sent to stream randomly. When a successful message sent to a stream python program will print a message that “Data Placed on Kinesis Stream, Shard Id:shardId-000000000000”.

import json
import random
import boto3

# Kinesis boto3 client to place/put data on the AWS Kinesis stream
KINESIS_CLIENT = boto3.client('kinesis', region_name='us-east-1')


kinesis_stream_name = "STREAM-NAME"
def getData(iotName, lowVal, highVal):
data = {}
data["iotName"] = iotName
data["iotValue"] = random.randint(lowVal, highVal)
return data

while 1:
rnd = random.random()
if (rnd < 0.01):
data = json.dumps(getData("DemoSensor", 100, 120))
kinesis_put_response = KINESIS_CLIENT.put_record(StreamName=kinesis_stream_name,
Data=data,
PartitionKey=str(rnd))
# Get shard_id from put response of kinesis stream where data is placed
if not kinesis_put_response["ShardId"]:
raise Exception('Error: Data was not placed on kinesis stream')
else:
data_placed_shard_id = kinesis_put_response["ShardId"]
print("Data Placed on Kinesis Stream, Shard Id:" + data_placed_shard_id)
else:
data = json.dumps(getData("DemoSensor", 10, 20))
kinesis_put_response = KINESIS_CLIENT.put_record(StreamName=kinesis_stream_name,
Data=data,
PartitionKey=str(rnd))
# Get shard_id from put response of kinesis stream where data is placed
if not kinesis_put_response["ShardId"]:
raise Exception('Error: Data was not placed on kinesis stream')
else:
data_placed_shard_id = kinesis_put_response["ShardId"]
print("Data Placed on Kinesis Stream, Shard Id:" + data_placed_shard_id)

Real-Time Data Ingestion

Above mentioned python program is in execution, Data Streaming from Local Python Program to kinesis stream analysis process.

Step 3: Create a Kinesis Data Analytics Application

Created a consumer application for the created kinesis data stream as Kinesis Data Analytics Application, which will use Random_Cut_Forest Algorithm to detect the anomalies from the live data. The detection is actually a calculation of score of the anomaly, Algorithm itself calculates the score of each temperature (a record) in the stream. We can then expose the data set results to the visualization of data where we can see the anomalies with high score data and normal data set with low scores by the algorithm.

Below are the references to the created application on the AWS Kinesis services:

Configuration Details:

Configured created kinesis stream as source of analytic application, analyzed its data schema

Successful Configuration of Source(Kinesis Stream) of Application

Successfully Configured Kinesis Data Application (Source)

Write Processing Logic (Random_Cut_Forest Algorithm):

Go to SQL Editor, Use predefined SQL operation templates, as below

Selection of Template given by Kinesis Data Analytics (Logic)

Selected Anomaly Detection Template, which involves Random_Cut_Forest algorithm of Machine Learning. It’s a voting-based algorithm that creates decision trees.

Kinesis Data Analytics Application — Logical Processing Code

Applied SQL Operation on the Schema, with Random_Cut_Forest ML algorithm which is a built-in operation within Kinesis data analytics:

Kinesis Data Analytics Application:

Data Ingestion Graph by Kinesis Data Analytics:

The below graph shows the data ingestion stats to the application.

Real-time data ingestion graph of Kinesis Streams

Stream Data is being ingested from a python script, analyzing it on Kinesis Data Analytics Application, we can see the live data incoming to the stream and be processed by the application.

Real-Time Processing By Random Cut Forest Kinesis Application

Finalized Configured Kinesis Data Analytic Application:

  • Source: Kinesis Data Stream
  • Processing: Random_Cut_Forest Algorithm anomaly detection
  • Destination: Firehose (Step 4 Below)
Successfully Configured Kinesis Data Analytics Application (Output)

Kinesis Processing Units Consumption Graph:

Below are the processing units being used during our live data feed and data analytics processing.

An in-application data stream is an entity that continuously stores data in your application for you to perform processing. Your applications continuously writes to and reads from in-application data streams. Kinesis Data Analytics elastically scales your application to accommodate the data throughput of your source stream and your query complexity for most scenarios. Kinesis Data Analytics provisions capacity in the form of Amazon Kinesis Processing Units (KPU). One KPU provides you with 1 vCPU and 4GB memory.

You are charged an hourly rate based on the number of Amazon Kinesis Processing Units (or KPUs) used to run your streaming application.

Amazon CloudWatch Analysis on Compute Units used by Streams

Step 4: Store Processed Data to S3 (Processed)

Destination of processed data by Step-3 (Kinesis Data Analytics App), We have created a Kinesis Firehose Stream destination with the configuration of S3 bucket, where files will be placed with timeStamp folder format. Files are stored in CSV format automatically by the Kinesis Firehose stream.

Configured Amazon Kinesis Firehose Stream with S3
Kinesis Firehose Setup Console (Demo Data to S3)

Destination Processed Data:

Amazon S3 Bucket for Processed Files

Sample Values of processed data:

DemoSensor,14,0.0

DemoSensor,10,0.0

DemoSensor,100,3.0

DemoSensor,20,0.0

Step 5: Show the Visualization of S3(Processed)

Created an Amazon QuickSight account with the free package, In free 1 GB storage of QuickSight, there will be SPICE we need to import data in it in order to visualize the data on graphs and metrics visuals.

To create visual below are the steps:

  1. Created Data Analysis to the AWS S3 Bucket (Processed Data), which will be having data with scored anomalies of temperature records by Random_Cut_Forest.
  2. Once the data set is created in QuickSight, the columns will be loaded into the SPICE.
  3. Select the columns with appropriate dimensions and measures ( X-axis & Y-axis ), It will visualize the data to graphs. You can select the visuals as per your choice and report format needed. I-e Donut Chart or Pie Chart.

Analysis Report on Amazon QuickSight:

  • Column-2
    This column shows the temperature ranges from 0 to 120, where the normal temperature is between 10–20.
  • Column-3
    It is the anomaly detection score by the Random_Cut_Forest ML algorithm. This stats demonstrate that for example, 107 is the anomaly detected in the system and algorithm scored this count 2, which is its truth precision

FAQs | Important Questions

How many virtual sensors were used in this article?

There was only one IoT sensor simulation used, the simulation was done in a python script, which was used to send the raw temperature data exactly the same way the temperature sensor can be sent to streams or any data source.

How much temperature data is generated?

More than 38000 temperature samples data were generated by the python program and ingested to Kinesis stream, which gets processed by Kinesis Data Analytic Application.

Below is the snapshot of the dataset processed, viewed in Amazon QuickSight.

How much were the processing time and memory usage saved?

A single KPU was used as a unit of stream processing capacity consisting of 1 vCPU compute and 4 GB memory. We have used 1 kinesis shard in our source data stream for which the raw data retention period was 24 hours.

At which level do we detect anomaly data?

When data ingestion is completed to Kinesis Data Stream from the Python Simulation program, just after the ingestion Kinesis Data Analytic Application calculates the random cut forest calculation on the live data in real-time. So the application itself detects the anomaly in its IN-APPLICATION stream “TEMP_STREAM”, where each real-time temperature sample record will contain an anomaly score, so the highest the amount of anomaly score will demonstrate the high chances of its anomaly in the data.

So, Anomaly is detected before the data arrives at the target (S3 Processed).

Are detected anomalies stored in S3?

Yes, processed data is being stored in the Processed S3 Bucket.

Kinesis Data Analytics Application destination has been configured with kinesis firehose, which sends the data to the S3 bucket in a timestamp folder structure.

What happens to the detected data?

Random Cut Forest algorithm calculates each temperature record with an Anomaly Score figure, High score number indicates the high accuracy of the value to be described as Anomaly. There comes an extension of the anomaly score column to the raw data.

--

--