Real-time Anomaly Detection in VPC Flow Logs, Part 3: Kinesis Stream

Igor Kantor
4 min readFeb 12, 2018

--

Photo by Nathan Anderson on Unsplash

To recap Part 2, we are setting up a pipeline to capture and stream all of our VPC Flow Logs, for long-term archival and immediate anomaly detection.

In other words, we are building the stuff in red: a pipeline that accepts our VPC Flow Logs as input, analyses the stream for anomalies and produces an output that is an anomaly score.

Let’s do this!

First, we need to setup our Kinesis Stream.

aws kinesis create-stream --stream-name "VPCFlowLogs" --shard-count 1

We use a shard count of 1. Shard count is a well-covered topic in AWS documentation. In essence, it’s a unit of streaming capacity:

The data capacity of your stream is a function of the number of shards that you specify for the stream. The total capacity of the stream is the sum of the capacities of its shards.

Make sure the Kinesis stream was successfully created:

aws kinesis describe-stream --stream-name "VPCFlowLogs"

You should see something like this:

A sample Kinesis describe output

Create a JSON file for the IAM role. Note the us-east-1 in bold, substitute your region as needed.

cat > allowCloudWatchAccesstoKinesis.json
{
"Statement": {
"Effect": "Allow",
"Principal": { "Service": "logs.us-east-1.amazonaws.com" },
"Action": "sts:AssumeRole"
}
}

Ctrl+D to save the file.

Next, let’s create a role with the JSON file from above:

aws iam create-role --role-name CloudWatchToKinesisRole --assume-role-policy-document file:///home/igor.kantor/bash/allowCloudWatchAccesstoKinesis.json

You should get this back (instead of 31415926 you’ll see your account number).

{
"Role": {
"AssumeRolePolicyDocument": {
"Statement": {
"Action": "sts:AssumeRole",
"Effect": "Allow",
"Principal": {
"Service": "logs.us-east-1.amazonaws.com"
}
}
},
"RoleId": "AROAJLKEBEZGORPSXGMIA",
"CreateDate": "2018-02-06T16:24:31.670Z",
"RoleName": "CloudWatchToKinesisRole",
"Path": "/",
"Arn": "arn:aws:iam::31415926:role/CloudWatchToKinesisRole"
}
}

Next, we need a policy to attach the role to (swap out 31415926 for your account number):

cat > cloudWatchPermissions.json
{
"Statement": [
{
"Effect": "Allow",
"Action": "kinesis:PutRecord",
"Resource": "arn:aws:kinesis:us-east-1:31415926:stream/VPCFlowLogs"
},
{
"Effect": "Allow",
"Action": "iam:PassRole",
"Resource": "arn:aws:iam::31415926:role/CloudWatchToKinesisRole"
}
]
}

Again, Ctrl+D to save.

Now, let’s attach the role to the policy:

aws iam put-role-policy --role-name CloudWatchToKinesisRole --policy-name Permissions-Policy-For-CWL --policy-document file:///home/igor.kantor/bash/cloudWatchPermissions.json

That is all for the permissions.

Next, we need to create a subscription filter to send the VPC Flow Log entries to our Kinesis stream.

aws logs put-subscription-filter \
--log-group-name "VPCFlowLogs" \
--filter-name "VPCFlowLogsAllFilter" \
--filter-pattern "[version, account_id, interface_id, srcaddr != "-", dstaddr != "-", srcport != "-", dstport != "-", protocol, packets, bytes, start, end, action, log_status]" \
--destination-arn "arn:aws:kinesis:us-east-1:31415926:stream/VPCFlowLogs" \
--role-arn "arn:aws:iam::31415926:role/CloudWatchToKinesisRole"

And we are done!

Let’s check out our Kinesis stream to make sure data is actually flowing.

Get ready for some mad jq-fu!

Here’s a command to get you started:

aws kinesis get-records --limit 10 --shard-iterator $(aws kinesis get-shard-iterator --stream-name VPCFlowLogs --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON | jq -r ."ShardIterator") | jq -r .Records[].Data | base64 -d | zcat

Let’s break this down.

  1. First, aws kinesis get-records command simply gets data records from a Kinesis data stream’s shard. It accepts a shard-iterator as a parameter.
  2. How do we get that? With the aws kinesis get-shard-iterator command, of course! That’s the command in parenthesis above.
  3. If you are not familiar with bash, the syntax of $(doStuff) means, run the doStuff command and return its value.
  4. However, the value we get back is actually JSON. So, we pipe the output to jq — a very powerful Linux JSON processor. Here, we are only interested in the ShardIterator property of the JSON payload.
  5. Therefore, we are telling jq to return the raw value (-r parameter) and grab the ShardIterator only and feed it back to the aws kinesis get-records command.

So, at this point, we are here:

aws kinesis get-records --limit 10 --shard-iterator $(aws kinesis get-shard-iterator --stream-name VPCFlowLogs --shard-id shardId-000000000000 --shard-iterator-type TRIM_HORIZON | jq -r ."ShardIterator")

And, surprise — that command also returns JSON. See the pattern here?

Once again, we need to extract the Data property from the Records array.

This jq does the trick:

| jq -r .Records[].Data

But the data we get back is base64 encoded and compressed to boot!

No problem, we pipe the whole thing to these two handy utilities:

| base64 -d | zcat

The first one decodes and the second one uncompresses.

Done!

If it all works, you will get a massive output dump that will look something like this:

...
account_id":"31415926","interface_id":"eni-22222","log_status":"OK","bytes":"212","srcport":"58237","action":"ACCEPT","end":"1517935827"}},{"id":"33851098767603148713170041907970295510985708680010596642","timestamp":1517935767000,"message":"2 31415926 eni-8175917b 10.64.34.7 10.64.32.54 51143 2370 6 4 216 1517935767 1517935827 ACCEPT OK","extractedFields":{"srcaddr":"10.64.34.7","dstport":"2370","start":"1517935767","dstaddr":"10.64.32.54","version":"2","packets":"4","protocol":"6","account_id":"31415926","interface_id":"eni-22222","log_status":"OK","bytes":"216","srcport":"51143"
...

Needless to say, this is not easy on the eyes and is quite obviously not meant for direct human consumption.

However, the fields are well described and can be seen in the filter subscription above. You can also read the official Amazon documentation, if you so desire.

That’s all for the Kinesis setup. Next, we create a Kinesis Analytics application that will read the records from the stream and (hopefully!) automatically detect anomalies in the stream.

Machine learning for the win!

Read on for Part 4 .

--

--