Cyber Analytics Use Case: Streaming Beacon Detection with Spark

Jonathan Ticknor
security analytics

--

I hope to convey two messages with this post: 1) A use case for cyber analytics that isn’t DGA or phishing, and 2) A few tricks that you can use with Spark’s Structured Streaming to enhance your analytic performance. I chose beaconing because it’s an easy concept to understand and the intuition to build the analytic can be misleading if you don’t understand how the data is generated or what artifacts may be present. It’s become clear over the course of my career that data scientists without sufficient domain expertise are often leaving quite a bit on the table. I’m going to cover the basics of why you would use Spark Streaming and what behavior I’m trying to detect with this analytic. If you want to get straight to the code and use case, go ahead and jump to Section 2.

Section 1: Spark Streaming and Beacon Basics

Spark Streaming

For any of those who have worked in the field of big data or data science, Spark has been a critical component of your workflow at some point in your career. Although many use cases utilize the batch processing capabilities of Spark, it’s the streaming components that can provide immense value for any cyber security data workflow. Spark offers two capabilities for those interested in stream processing: 1) Spark Streaming (DStreams) and 2) Structured Streaming. For those familiar with the traditional RDD batch processing capability, Spark Streaming is simply an extension of this methodology. Internally, the stream of data is represented as a sequence of RDDs and is processed as such. In order to aggregate data for processing across multiple time frames, Spark Streaming offers window functions which effectively combines the RDDs within the desired time window for processing all at once.

This framework is incredibly powerful, but what if I have a use case with a very large time window (or unknown)? And what if I’m processing really large volumes of log data, like in an enterprise security use case? The answer is that you better bring a lot of servers to the party. There is another way to do this though, and it utilizes the Spark SQL engine; it’s called Structured Streaming. The input data stream is effectively an unbounded table which appends new data upon arrival. That data is then processed and generates a results table which can be written to console, pushed to another source, etc. The power of Structured Streaming is the fact that it processes the newly received data to update the results table incrementally and discards the source data. We can leverage this capability in some neat ways to help keep the memory, and compute, requirements down compared to standard Spark Streaming, thus saving us money and time in processing logs. In Section 2 I will highlight the areas where we can get the most bang for our buck in this use case.

Beaconing

Any team that is trying to build security analytics has most likely tried to build a beacon detector to find command and control communication. Lazy malware authors, or those that don’t care/think they will get caught, will often program a beacon to their infrastructure from the compromised host in some periodic fashion (e.g. every 5 minutes look for new command). This task gets more complicated when the author cares about staying hidden (there are ways to do this, but that’s for another post). If we consider some kind of network log (NetFlow, Firewall, Proxy, etc.) as the data source, we should look at the frequency of connections between an internal IP address and an external IP address. We can make this more complicated and add in ports, protocols, amount of data, etc. but let’s keep it simple for now. What we want to see is some repetitive nature between two hosts. Any beacon detector will find hundreds to thousands of benign software beacons in an enterprise network, so we will ultimately need to create smart filters for a production deployment. Now that we know what a beacon is and how we can utilize Spark for stream processing, let’s build a simple streaming beacon detector to find communications in real time.

Section 2: Building a Beacon Detector

The first step in building a beacon detector is to decide what technique we are going to use. For this simple example, I’m going to use an entropy calculation, specifically a normalized entropy which returns a value between 0 and 1 (where a value of 0 indicates no entropy). Ultimately I will subtract that value from one so that connections with a score of one would indicate a perfectly periodic connection. The data that will be fed into the entropy calculation are time values between discrete communications of hosts in the log data. Now that I know what technique and data I’m going to use, let’s get into some code (find the full code here).

Let’s go ahead and do our standard imports for Spark Structured Streaming as well as some of the specific functions we are going to need. The udf capability will be very important because that’s where we will write our analytic and apply it to the data table. Would this really be an analytic without importing numpy?

from pyspark.sql import SparkSession
from pyspark.sql.functions import collect_list
from pyspark.sql.functions import split
from pyspark.sql.functions import udf
from pyspark.sql.types import FloatType
import numpy as np

Now I need to define a few parameters that will be relevant to the periodicity detection algorithm.

# Padding allows for consideration of log time stamp issues,
# network delay, etc.
padding = 1.0
# Minimum data points to consider activity periodic
min_dp = 5
# Minimum score to be deemed periodic
p_criteria = 0.9

The first parameter is absolutely critical for the development of a useful beacon detector across data types. The padding allows us to smooth out some of the noise in our interstitial time values for a few reasons: 1) issues with log time stamp reporting (e.g. rounding), 2) some network delay which causes a small perturbation, or 3) other unknown noise producers. In my experience, leaving this value at 1, which means the analytic uses the timestamps as is, leads to a loss in result fidelity. It is quite rare that you create a bucket which makes something look periodic which is not. The min_dp parameter lets me decide how many data points I want to consider before I call something periodic. The value of 5 is more of a rule of thumb I have noticed, but the larger the value, the more certain we can be. Finally, p_criteria is an adjustable parameter to provide a cutoff for those results worthy of investigation. Depending on the size of the network, this parameter can be moved up and down to support triage capabilities (higher value means more strict, less results).

Now the fun part, the periodicity algorithm itself. This analytic is rather simple, but let’s take a look at some key pieces.

# Periodicity detection
def periodicity(df):
if len(df) <= min_dp:
return 0.0
time_df = sorted([int(val) for val in df])
time_diff = [round((time_df[i] - time_df[i-1]) / padding) * padding for i in range(1,len(time_df))]
values, counts = np.unique(time_diff, return_counts=True)
probs = counts/len(time_diff)
en = sum([-prob * np.log(prob) for prob in probs]) / np.log(len(time_diff))
entropy = 1.0 - en
return float(entropy)

First I want to filter out connections that don’t meet our data threshold criteria and then sort the timestamps. Next, using the padding value I created above, I want to create an array of the interstitial times from the original timestamps in the logs. Next, I use the numpy unique function to gather the counts of each interstitial time we have, e.g. [2, 2, 2, 4] -> [(2,3), (4,1)]. Next I apply the entropy function and divide by the length of the array to get the normalized entropy value. Finally, I subtract that value from one to get the final periodicity score, where those connections with a value closer to one are more periodic.

The next function will be used to filter the data before I run it through the periodicity detector. The filter value used here could be modified for your application, but my experience is that 25 connections is more than sufficient to make a determination on periodicity.

# Only retain last 25 timestamps
def filter_tstamp(df):
if len(df) > 25:
return df[-25:]
else:
return df

This udf function is incredibly important because it allows me to reduce the amount of data I am carrying over time, allowing me to prevent memory issues and reduce the complexity of the calculation. For example, some hosts communicate multiple times per second, which makes the array enormous and bogs down computation. The ability to use this udf to control the size of the results table provides a huge win compared to DStreams, one that I saw very quickly in testing and production deployments.

Next, I set up some basic udf functions to call later on the table of raw data.

tstamp_udf = udf(filter_tstamp)
periodic_udf = udf(periodicity, FloatType())

Next I put in some boiler plate code to create the SparkSession and give my app a name. That is followed by the method in which I am retrieving the data, in this case from a socket connection on the localhost, port 9999.

spark = SparkSession \
.builder \
.appName("StructuredPeriodicity") \
.getOrCreate()
lines = spark \
.readStream \
.format("socket") \
.option("host", "localhost") \
.option("port", 9999) \
.load()

Next I need to take the raw data feed, in this case the fake log I am generating, and split it into its component parts. The data I am feeding in is comma separated (i.e. source_ip, destination_ip, timestamp).

# Split the line into values
split_col = split(lines.value, ",")

Now it’s time to create the table. Using the spark split_col function, I create new columns in the table and give them useful names (e.g. destination_ip). Finally, I drop the value column, which is simply the raw log itself. You could keep this value, but since I want to do aggregations and I already retained all of the component parts, it makes sense to discard the field.

n_log = lines.withColumn("source_ip", split_col.getItem(0)) \
.withColumn("destination_ip", split_col.getItem(1)) \
.withColumn("timestamp", split_col.getItem(2)) \
.drop("value")

Here is where all the work is really being done. Now that I have a nice table of new raw data, it’s time to do something useful with it. The first step is a simple groupBy command to create a (source_ip, destination_ip) tuple and aggregate all of the corresponding timestamps. Next, I want to apply the udf I created earlier which filters out anything other than the last 25 data points. Finally, I apply my periodicity analytic via periodic_udf and I have scores for each of the unique connections in the environment. I have avoided many of the filtering steps you would likely apply in a real environment (e.g. eliminate internal-internal connections, ignore well known sites, filter out specific ports, etc.).

# Generate arrays of interstitial times
p_log = n_log.groupBy("source_ip","destination_ip") \
.agg(collect_list("timestamp").alias("timestamp")) \
.select("source_ip", "destination_ip", "timestamp", tstamp_udf("timestamp").alias("timestamp_new")) \
.select("source_ip", "destination_ip", "timestamp", periodic_udf("timestamp_new").alias("entropy"))
# Filter results
# p_log = p_log.select("source_ip", "destination_ip", "timestamp", "entropy").where("entropy > 0.9")

Finally, let’s run this thing. For this use case I have chosen the complete output mode, which means that the entire results table will be written to console. In a real deployment you may only want to send new alerts to a data sink, which helps reduce output data volume/redundancy. I haven’t included the p_criteria filter for this example because I’m generating fake data and want to see my full results.

# Start running the query
query = p_log \
.writeStream \
.outputMode("complete") \
.format("console") \
.start()
query.awaitTermination()

And there you have it, a simple 77 line program to detect periodicity in comma separated log data. Of course this program gets more complicated if we are connecting to Kafka or raw syslog and need to send the data to any number of possible sinks. But this simple algorithm has proven time and again to detect malicious beacons in both small and large networks, in a streaming fashion, with a limited computational overhead. There are other more complex ways to think about beacons that are beyond the scope of this article, but this should help you get started down the road to thinking about how you would solve this problem and deploy a solution.

Gist: https://gist.github.com/jonticknor/dc8bbfe1420d052265ed652842d52529

--

--