Real-time Anomaly Detection in VPC Flow Logs, Part 4: Kinesis Analytics

Igor Kantor
4 min readFeb 13, 2018

--

Photo by Dominik Scythe on Unsplash

Now that we have our Kinesis Stream configured in Part 3, let’s hook up a Kinesis Analytics application to it.

For reference, it’s the “Kinesis Analytics Anomaly Detection” box on the right:

Also, please keep in mind that the Kinesis Analytics component is itself a mini-pipeline. If we were to take a peek under the hood, it would look something like this:

Basically, it is your standard Input → Process → Output module.

This process has several steps:

  1. Create a new Kinesis application and hook it up to the Kinesis stream (Input)
  2. Pre-process the data payload (remember, the VPC Flow Logs are base64-encoded and gzipped) (Process)
  3. Feed the records into a machine learning anomaly detection routine (Process)
  4. Assign an anomaly score to the bytes transferred column (Process)
  5. Send the scores somewhere for notifications and trend analysis (Output)
  6. Profit!

In other words, we are looking for anomalies in bytes transferred.

Specifically, we are looking for sudden spikes in traffic or sudden drop in traffic. Here, a collapse in network traffic could mean an outage, while a sudden spike in traffic could mean our website has gone viral or is under a DDOS attack or maybe something else entirely.

In either case, an anomaly to be investigated!

Now, let’s go through the steps, one by one.

First, we need to create the application itself.

Navigate to the Kinesis Analytics page in the AWS console and click on Create Application.

Name it VPCFlowLogsAnalytics and give it a good description, so you will know what this does later.

Hook it up to the VPCFlowLogs Kinesis stream you created earlier.

VPCFlowLogs as the Kinesis Application source

Then, enable Lambda pre-processing and say you want a new Lambda function. We need this function to decode and unzip the VPC Flow Log payload in Kinesis.

Luckily for us, Amazon already provides a blueprint for a Lambda that does just this.

You want the Compressed Input Processing Lambda blueprint that does the following:

A Kinesis Data Analytics record processor that receives compressed (GZIP or Deflate compressed) JSON or CSV records as input and returns decompressed records with a processing status.

Perfect, just what we need!

You are now taken to a new page, the Lambda config console. I named my Lambda KinesisAnalyticsProcessCompressedRecord but you can call it whatever you wish.

In general, the code works as is but we need to tweak a few settings to make it deployable.

First, make sure in the Execution role section of the Lambda config, you either create a new role with Kinesis access or assign an existing one with Kinesis access.

Role assigned to pre-processing Lambda

Second, you need to increase the Timeout setting to at least 1 minute to allow Lambda to process all the records.

Increased timeout for the Lambda function

Now, onto the less pleasant part. When you click on Discover Schema, the pre-processor Lambda will run, decoding the records.

Unfortunately, this step will fail frequently, due to high number of records given to Lambda to process. If that happens, you will see an error message and you will be unable to save your application.

Moreover, aside from old and still open Github issues, I have not been able to find any work-arounds, other than click the Discover schema over and over until it works.

You really need to either shut off your stream so the Kinesis application can be instantiated or simply keep trying until it works. If you have any suggestions on how best to fix this, please let me know!

But once this is all done correctly, you should be able to expand your application, click on Application details and see something similar to the following:

Newly instantiated Kinesis Analytics application

Now, let’s add some analytics to actually perform the anomaly detection, finally! Part 5 does exactly this.

--

--