SOC-X : Part 2— dataShark Framework Deep-Dive

In this era, unlike any other, we have seen an explosive growth of size of data captured/generated. Data growth has gone about a huge change, primarily influenced by cheaper computing power and the ubiquity of the internet.This has led to a paradigm shift in the e-commerce sector; as data is no longer seen as the byproduct of their business activities, but as their biggest asset providing: key insights to the needs of their customers, predicting trends in customer’s behaviour, democratising of advertisement to suits consumers varied taste, as well as providing a performance metric to assess the effectiveness in meeting customers’ needs.

But with such kind of data growth, we need to understand the problems associated with it and how traditional Security & Network analysis tools which majorly rely on rules/thresholds fail to capture smarter and more efficient attacks that occur on the platform.

Problems that we have observed with traditional Security & Network analysis tools/Frameworks:

1. Bad Bots are getting smarter, aren’t they ?
Price/detail scraping has always been a problem for e-commerce platforms and as bots get smarter with every passing day, patterns & signatures fail to classify them efficiently. To add to the problem, infrastructure is getting cheaper by the day!
2. Attackers are getting smarter too!
Attackers no longer rely on solely traditional mechanisms to scan & attack an infrastructure, but deploy evasive techniques using proxies, constantly changing IP addresses, changing attack vectors, patterns, user agents and even making malicious traffic look very real by even including a general user’s cookies and spawning multiple user tracking java-scripts — applications (more-so, business logic) are the prime target, not infrastructure alone.
3. Data data data…
With TBs of data being churned into the system, identifying malicious traffic / pattern is the real challenge. A lot of tools/frameworks do this, but what matters is which one’s do it efficiently and in the case of Website Security, which one’s are doing it real-time or near-real-time.
4. I’m a security analyst, not a developer!
Not everyone would prefer to get their hands dirty with code and rather leverage a tool that runs out of the box to solve analytics / ML use cases, which would help a security analyst uncover attack patterns or anomalies in data at scale.
Now that we’re clear on what the problem statements look like, I’ll move onto how dataShark aims to address some of these.

Introducing dataShark

dataShark is a Security & Network Event Analytics Framework built on Apache Spark that enables security researchers, Big Data analysts and operations teams to carry out the below tasks with minimal effort:
1. Data ingest from various sources such as file system, Syslog, Kafka
2. Write custom map / reduce and ML algorithms that operate on ingested data using abstracted Apache Spark functionality
3. The output of the above operations can be sent to destinations such as syslog, elasticsearch and can also be persisted in the file system or HDFS
Since we’re clear on what dataShark let’s now deep-dive into one of the use cases that it can be leveraged for.

Bot detection & Mitigation

For this use case to work we will need the following:

  1. Live Apache Access logs
  2. Training data for our model, either 60 seconds of good data/bad data
  3. Grok Parser for Apache HTTPD logs to extract fields that we need to work on

We’re interested in mainly the access logs (for this use case I will be concentrating on Apache HTTPD access logs) generated by a standard public facing web application.

This use case attempts to detect and identify bot / attack traffic from legitimate user traffic. We’ll use Apache Spark to analyse fast flowing or large static access logs to find interesting patterns / indicators.

Let’s look at a sample log line.

127.0.0.1 — — [05/Feb/2012:17:11:55 +0000] “GET / HTTP/1.1” 200 140 “-” “Mozilla/5.0 (Windows NT 6.1; WOW64) AppleWebKit/535.19 (KHTML, like Gecko) Chrome/18.0.1025.5 Safari/535.19”

Interesting fields for us:

  1. Source IP
  2. Date/timestamp
  3. Request Method
  4. Response Code
  5. User agent

First we extract these fields from the log lines:- (we are using a standard Grok parser for Apache HTTPD, this can be changed for customized log lines)

 HTTPD_COMMONLOG %{IPORHOST:clientip} %{HTTPDUSER:ident} %{HTTPDUSER:auth} \[%{HTTPDATE:timestamp}\] “(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})” %{NUMBER:response} (?:%{NUMBER:bytes}|-)

For more details on grok patterns, you can visit https://github.com/logstash-plugins

Now that we’ve extracted the fields from HTTPD logs that are flowing into the system lets start passing these to the dataShark model.

We require two files to get dataShark up and running:

  1. The use case configuration file
  2. The dataShark driver code (use-case specific)

Let’s take a look at our configuration file (web-anomaly-detection.conf):

type = streaming
training = train_access_log
code = web-anomaly-detection.py
name = Web Anomaly Detection
enabled = true
input = file
output = csv
apache_access_logs_pattern = "HTTPD_COMMONLOG %{IPORHOST:clientip} %{HTTPDUSER:ident} %{HTTPDUSER:auth} \[%{HTTPDATE:timestamp}\] “(?:%{WORD:verb} %{NOTSPACE:request}(?: HTTP/%{NUMBER:httpversion})?|%{DATA:rawrequest})” %{NUMBER:response} (?:%{NUMBER:bytes}|-)"
[in_file]        
folder_path = /opt/logs
[out_csv]        
path = /tmp/UseCase-web_anomaly.csv
separator = ','
quote_char = '"'
title = Use Case #1
debug = false

The configuration file assumes the logs are lying in a folder called /opt/logs and the output to be a flat csv file. P.S. : there are multiple ingest / output modes available in dataShark, kindly refer the github link for more details — https://github.com/makemytrip/dataShark

The dataShark driver code, let’s name it web-anomaly-detection.py:

import re
from math import sqrt

from pyspark.mllib.clustering import KMeans

# Function to create the feature-set
def make_features(a, b):
# Add up counts of 2xx and nxx response against a source IP
twoxx = a[0] + b[0]
nxx = a[1] + b[1]

return [twoxx, nxx]


# Function to process raw training and streaming data
def process_logs(data, pattern):
line = re.match(pattern, data)
logline = line.groupdict()

nxx = 0
twoxx = 0

# If response code is 2xx, we set the count for twoxx as 1
if str(logline['response_code']).startswith("2"):
twoxx = 1
# If response code is 5xx, we set the count for nxx is 1
elif str(logline['response_code']).startswith("5"):
nxx = 1

# Return data in the form (1.1.1.1, [1, 0])
return (logline['source'], [twoxx, nxx])


# Cluster Prediction
def predict_cluster(row, model):

# Predict the cluster for the current data row
cluster = model.predict(row[1])

# Find the center for the current cluster
center = model.centers[cluster]

return (row[0], {"cluster": model.predict(row[1]), "twoxx": row[1][0], "nxx": row[1][1]})


def load(streamingData, trainingData, context, conf):

# Process JSON Training Logs
rawTrainingData = trainingData.map(lambda s: process_logs(s, conf['apache_access_logs_pattern']))

# Reduce Training Logs to get counts of 2xx and 5xx response codes against an IP
rawTrainingData = rawTrainingData.reduceByKey(make_features)

# Convert data to format acceptable for K-Means
training_set = rawTrainingData.map(lambda s: s[1])

# Cluster count - needs to be tweaked basis the use case
k = 2

# Train the K-Means Model with Training Data
model = KMeans.train(training_set, k)

# Print out centers of the trained model
for i in range(k):
print model.centers[i]

# Process incoming JSON Logs from Kafka Stream
rawTestingData = streamingData.map(lambda s: process_logs(s, conf['apache_access_logs_pattern']))

# Reduce 60 seconds of incoming data in a 60 seconds sliding window and return the cluster which it belongs to
testing_data = rawTestingData.reduceByKeyAndWindow(make_features, lambda a, b: [a[0] - b[0], a[1] - b[1]], 60, 60).map(lambda s: predict_cluster(s, model))

return testing_data

Fire up dataShark in standalone mode and see the magic happen!

./standalone.sh -d conf/web-anomaly-detection/web-anomaly-detection.conf

In our use case, we’ve asked dataShark to look at the log lines and basis of an IP address, build a view that provides me the number of times it got a 200 vs a non-200 response code (which we call nxx).

An illustrative output per function:

# Function to create the feature-set
def make_features(a, b):
# Add up counts of 2xx and 5xx response against a source IP
twoxx = a[0] + b[0]
nxx = a[1] + b[1]

return [twoxx, nxx]

Let’s look at what one log line (or row) would look like as a result of the above:

1.1.1–20–500 #where 1.1.1.1 is the IP Address, 20 is the 200 response code count and 500 is the nxx response code count

Now we pass this data to our clustering model that we’ve built into dataShark using Apache Spark (PySpark):

# Cluster Prediction
def predict_cluster(row, model):

# Predict the cluster for the current data row
cluster = model.predict(row[1])

# Find the center for the current cluster
center = model.centers[cluster]

return (row[0], {"cluster": model.predict(row[1]), "twoxx": row[1][0], "nxx": row[1][1]})

When this log line enters our model, it would either lie in Cluster A or Cluster B using the K-Means clustering algorithm.

The labelling of a cluster as good/bad is decided by the training data we provide to the model — which is the key to the success of the ML function.

testing_data = rawTestingData.reduceByKeyAndWindow(make_features, lambda a, b: [a[0] - b[0], a[1] - b[1]], 60, 60).map(lambda s: predict_cluster(s, model))  
return testing_data

Let’s now analyze the output from the model:

IP 1.1.1.1 — ClusterA (or 0)
IP 1.1.1.2 — ClusterB (or 1)
IP 1.1.1.3 — ClusterA (or 0)
IP 1.1.1.4 — ClusterB (or 1)

Let’s say ClusterA identified good users and ClusterB malicious users, we can now determine how good or bad the user is by fine-tuning the algorithm further. For example we can calculate the distance of an object from it’s cluster center to determine how far the subject is from good or bad behaviour. Alternatively, the algorithm could also be used to classify different types of users and then determine how abnormal a subject it basis it’s distance from the respective cluster center.

Like we mentioned in Part 1 of our blog series, using ML needs certain considerations and what is most important is the model we use, the data we provide it to train and the feature set we use to build the model upon.

Taking this towards actionable implementation, we could setup alerting or action basis the distance from the center (or another calculated anomaly score). These scores can be persisted to data stores such as ElasticSearch, HDFS or sent over syslog to integrating systems such as SIEM.

Below is a sample dashboard built on the ELK stack once such data is available in ElasticSearch:

Now that we’ve wrapped up our use-case, we can extend datashark to anything that you can think of, you just need the idea, the data and the model.

dataShark is on github at https://github.com/makemytrip/dataShark

Since dataShark has many aspects to it, the ingest, processing and output layer; there are certain deployments that can significantly enhance dataShark’s capabilities — Apache Metron being one of them.

More on Apache Metron in “Part 3— Building The Pipeline”.