Real-time Anomaly Detection in VPC Flow Logs, Part 5: Anomaly Detection

Igor Kantor
7 min readFeb 15, 2018

--

Something doesn’t seem right here…

Let’s do a quick recap.

In Part 4 we configured our Kinesis Analytics application and hooked it up to our Kinesis Stream from Part 3.

In turn, the Kinesis Stream sends our VPC Flow Logs to the Kinesis Application created above. Again, here is a quick diagram so you can keep track of where we are:

So, we have the Source part done, time to do the SQL-based analysis! Specifically, time to perform the machine learning magic using Amazon’s own RANDOM_CUT_FOREST_WITH_EXPLANATION anomaly detection algorithm to see if we can detect anomalies in the bytes transferred.

At this point, I think it’s worth asking, “What exactly constitutes an anomaly?”

From the link above:

The anomaly score for a record indicates how different it is from the trends that have recently been observed for your stream.

In other words, the algorithm “learns” what is typical and what is not and then flags values that it considers to be atypical.

In fact, it is not hard to see how this approach (if it works!) is superior to standard threshold-based problem detection.

For example, you may set Nagios or CloudWatch to page you if a server’s CPU is pegged at 100% for more than 5 minutes. Of course, very few seasoned admins will actually do this in practice.

Reason is, while this might work OK for a while, the sheer number of false positives will overwhelm any humans who actually like to sleep on a nightly basis!

Moreover, a CPU utilization of 100% may not necessarily be indicative of a problem. A busy machine is a Good Thing, after all — isn’t that what we are paying machines to do?

However, a CPU utilization of 0%, in the middle of a working day, on a machine that is normally serving thousands of requests per second is definitely an issue. But… it is only an issue if it’s 0% during the daytime hours; not really an issue in the middle of the night when the customers are asleep.

So, the whole thing quickly becomes a nightmare of thresholds, alerts, noise suppression algorithms, etc. Surely, there is a better way of doing this?

Maybe… In theory, a sophisticated algorithm that learns what is normal and what isn’t and is able to adopt to changes all by itself, would be highly valuable and superior to static thresholds.

And with that out of the way, let’s configure our Kinesis Analytics anomaly detection routine!

First, please keep in mind that Kinesis Analytics runs data through the SQL. This is like a standard database turned upside down: in a regular RDBMS scenario, you run a SQL query through the data; Kinesis Analytics is the opposite — you run data through the SQL query.

Conceptually, you can think of the Kinesis Analytics as a sieve through which the Kinesis Streams flow:

Kinesis Analytics high-level overview.

OK, with the theory out of the way, let’s write some code!

Next,click on the blue Go to SQL Editor button to add our processing logic:

SQL Editor

At this point, you can either “Add SQL from templates”, pick the Anomaly Detection template, and customize it.

Or, you can drop the code below and that should work also:

-- ** Anomaly detection **
-- Compute an anomaly score for each record in the source stream using Random Cut Forest
-- Creates a temporary stream and defines a schema
CREATE OR REPLACE STREAM "TEMP_STREAM" (
-- "APPROXIMATE_ARRIVAL_TIME" timestamp,
-- "srcaddr" varchar(16),
-- "dstaddr" varchar(16),
"bytes" DOUBLE,
"ANOMALY_SCORE" DOUBLE,
"ANOMALY_EXPLANATION" varchar(512));

-- Creates an output stream and defines a schema
CREATE OR REPLACE STREAM "DESTINATION_SQL_STREAM" (
-- "APPROXIMATE_ARRIVAL_TIME" timestamp,
-- "srcaddr" varchar(16),
-- "dstaddr" varchar(16),
"bytes" DOUBLE,
"ANOMALY_SCORE" DOUBLE,
"ANOMALY_EXPLANATION" varchar(512));

-- Compute an anomaly score for each record in the source stream
-- using Random Cut Forest
CREATE OR REPLACE PUMP "STREAM_PUMP" AS INSERT INTO "TEMP_STREAM"
-- SELECT STREAM "APPROXIMATE_ARRIVAL_TIME", "srcaddr", "dstaddr", "bytes", "ANOMALY_SCORE", "ANOMALY_EXPLANATION"
-- SELECT STREAM "srcaddr", "dstaddr", "bytes", "ANOMALY_SCORE", "ANOMALY_EXPLANATION"
SELECT STREAM "bytes", "ANOMALY_SCORE", "ANOMALY_EXPLANATION"
FROM TABLE(RANDOM_CUT_FOREST_WITH_EXPLANATION(
CURSOR(SELECT STREAM "bytes" FROM "SOURCE_SQL_STREAM_001"), 100, 256, 100000, 1, true
)
);
-- Sort records by descending anomaly score, insert into output stream
CREATE OR REPLACE PUMP "OUTPUT_PUMP" AS INSERT INTO "DESTINATION_SQL_STREAM"
SELECT STREAM * FROM "TEMP_STREAM"
--WHERE ANOMALY_SCORE > 3.0
ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;

OK, let’s take a few minutes to go through this code in detail.

First, the lines prepended with dashes — — are comments. I left those in on purpose, to show the various things I’ve tried that didn’t work.

In fact, I’m pretty sure including srcaddr and dstaddr in the anomaly detection routine messed up the anomaly scores. So, I had to take them out.

NOTE: The fix above is why I owe a very special and huge thanks to my colleague Curtis Mitchell who not only pointed out the error in the ML code but is altogether an awesome engineer in many other areas also!

OK, back to the code. The first two statements CREATE OR REPLACE STREAM create Kinesis “in-application” streams. You can think of these streams as SQL “tables”.

NOTE: Kinesis Analytics in-app streams are not the same as Kinesis Streams. The latter feed data into Kinesis Analytics which runs streams internally.

As you can see, two in-app streams are being created here. Park that thought for a second, while we go through the rest of the code to see why we need two.

NOTE: We are using RANDOM_CUT_FOREST_WITH_EXPLANATION anomaly detection. There is another function, called RANDOM_CUT_FOREST. We are using the _WITH_EXPLANATION one because of this snippet in the documentation:

To detect anomalies in individual record columns, see RANDOM_CUT_FOREST_WITH_EXPLANATION.

Since we need to detect anomalies in the “bytes” column of the VPC Flow Log, we need the _EXPLANATION anomaly detection.

Next come the pump definitions. A pump is a

continuous insert query running that inserts data from one in-application stream to another in-application stream.

So, a pump (just like its name suggests) feeds data into an in-application Kinesis Stream.

In fact, if you are familiar with Linux pipes, this is a similar construct. You can daisy-chain these pumps and streams to achieve some very powerful streaming analytics. For now, however, we are keeping it simple with two streams and two pumps.

OK, let’s go back to why we need two streams and pumps.

Take a look at the diagram below:

Kinesis Analytics conceptual overview

So, the reason why we need two is because the first stream and pump merge the VPC Flow Log data with the corresponding anomaly scores.

The second pump and stream select the last records and send them to the output stream. Specifically, this line here:

ORDER BY FLOOR("TEMP_STREAM".ROWTIME TO SECOND), ANOMALY_SCORE DESC;

Rounds down the special ROWTIME column (a column with a value that is always increasing, also known as “monotonic”) to the nearest second and sorts first by the ROWTIME, then by the ANOMALY_SCORE, in a descending order.

NOTE: To be honest, I’m not sure why this is needed. For now, this was recommended by Amazon, so I’m leaving it in but I do plan on going back and taking that line out to see what happens.

OK, let’s test this! Click on “Save and run SQL”, pick the Real-time analytics tab and make sure DESTINATION_SQL_STREAM is selected.

Assuming everything was setup correctly, you should have an output similar to this:

Working anomaly detection pipeline!

How amazing is this, truly?!

A fully working, ingest →process →output anomaly detecting, super awesome, machine learning pipeline!

You can see the automatically generated ROWTIME (the one that’s monotonically increasing). To its right are the bytes value, the one we are trying to detect anomalies in. Right still is the actual ANOMALY_SCORE. The higher the score, the more anomalous the algorithm thinks the value is from the other values it has seen thus far.

Finally, the ANOMALY_EXPLANATION field contains the following data fields:

Attribution score: A nonnegative number that indicates how much this column has contributed to the anomaly score of the record. In other words, it indicates how different the value of this column is from what’s expected based on the recently observed trend. The sum of the attribution scores of all columns for the record is equal to the anomaly score.

Strength: A nonnegative number representing the strength of the directional recommendation. A high value for strength indicates a high confidence in the directionality that is returned by the function. During the learning phase, the strength is 0.

Directionality: This is either HIGH if the value of the column is above the recently observed trend or LOW if it’s below the trend. During the learning phase, this defaults to LOW.

The emphasis is mine. If you have more than one numeric value (we only have one, the bytes column) then you would want to know which column exactly contributed to the anomaly score.

For example, if you are tracking memory utilization, CPU utilization, network I/O, disk I/O, etc. and you get a high anomaly score, you would want to know which component exactly contributed to the high score.

Now, if you are like me and you believe visualization aids comprehension, then you want to plot these values and see what is going on here!

Part 6 does exactly this!

--

--