Real-time Security Data Lake

Farid Gurbanov
Data Analytics
Published in
13 min readJul 29, 2020

Security Operations team is interested in real-time log analysis capabilities to monitor and prevent anomalous activities. Data Scientists need tools to discover anomalous patterns, Compliance team need cheap storage to keep logs up to 7 years as forensic evidence.

Organizations traditionally deploy specialized SIEM tools that are expensive and inflexible when it comes to deploying ML patterns or keeping log history for extended periods.

In this article, I provide a reference architecture for a real-time Security Data Lake — a comprehensive, highly customizable real-time log management system with an integrated set of tools for pattern discovery, operationalization, and visualization. The setup is massively scalable with known use at Keystone (Netflix Real-time Stream Processing Platform) for 3 trillion events per day [1].

Along with the design, I’ve created a working prototype (a POC box) with a basic Syslog data pipeline. Before jumping into POC config playbook I suggest you have a look at concepts below so that you can reuse the components for your specific use-case.

Key features

  • Scalable architecture up to 3 trillion events per day and more [1].
  • Low cost of ownership and richer capabilities compared to specialized tools [2]
  • Quick deployment times. Basic deployments could be done in 4–6 weeks [3]
  • Small DevOps team with traditional skills: (Architect/PM, Cloud Engineer, Java Developer, Visual Designer) [4]

Requirements

  • Streaming data collection, curation, indexing and storage or your event logs,
  • Automated normalization and correlation of data,
  • Combination of internal data with third-party data on threats and vulnerabilities
  • Federation of logs for complex pattern mating,
  • Real-time security monitoring and alerting on anomalies (based on patterns),
  • Flexible visualization and self-discovery tool,
  • Tools for new pattern discovery and operationalization for Security Scientists,
  • Ad-hock query of data for Security Engineers.

There are tools on the market that provide similar functionality. However, my goal was to build a solution using standard open-source components for flexibility, cost and scalability.

Technology stack

I’ve made a decision to pick the following technology stack for their quick deployment time, unbounded scalability and flexibility to integrate with any existing SOC/SIEM systems and formats.

It worth noting that there are numerous deployment options depending on the IT landscape and organizational standards. E.g. Azure or GCP for AWS, On-prem Hadoop instead of the cloud.

My technology stack for this solution is: Kafka, Flink, Elastic and AWS for heavy lifting.

  • Apache Kafka — an event broker,
  • Apache Flink — a streaming event processor,
  • Elastic Stack — a set of tools to collect, normalize, deliver and visualize the event data,
  • AWS managed platform services

Conceptual Architecture

Here is the conceptual architecture for a real-time security data-lake. There are six main elements for this architecture (6th being the persistent storage)

Log Collection and Normalization (1)

Log collection and normalization one of the main challenges in SIEM. First, we need to be able to understand what to look at to start building rules and analyzing patterns.

I picked the Elastic Filebeat for this layer as it is rich in features, very simple to use and I know it well. But in principle, any other external tool or even hardware box could be utilized here.

Log normalization patterns are defined as Yaml templates using Regex expressions. I provide a few examples from my POC box described below.

Elastic Filebeat can integrate natively with Elastic Logstash or Kafka directly. For this solution, I use Elastic Logstash as routing gateways.

Routing (2)

Routing helps with consolidating and channelling the logs to the destination broker optimizing the network traffic. It also helps to keep nodes out of external network exposure. Routers need to be close to the log sources.

I’ve picked Elastic Logstash as a routing engine. It perfectly fits the purpose and could be configured in numerous ways. Logstash has an adaptive disk-based buffering system that will absorb incoming throughput, therefore mitigating backpressure.

In complex regionally diverse networks I would add one more level (regional) for log consolidation. In that case, I would create separate regional Kafka clusters and one central, then use Logstash to stream from sources to regional Kafka Brokers as shown below:

Streaming Broker (3)

We’ve reached the blood vessels of the streaming solution. Apache Kafka has unmatched sub-millisecond performance and massively scalable architecture. It is becoming a widely popular and key component in streaming applications.

I’m using Kafka in the centre of all data pipelines to serve normalized, correlated, aggregate and federated events. Processing engine transforms it into the next layer as part of event correlation or aggregation and sends the transformed stream back into the broker. It works as shown below

This creates a simple architecture for external systems to integrate at any level of the pipeline. As an example, existing SIEM tools can send correlated logs directly into the right topic. Another example could be sending critical alerts to IPS for an immediate automated response.

To deploy Kafka on AWS I will be using Amazon Managed Services for Kafka as it is a fully managed service and very easy to deploy.

Real-time Stream Processing (4)

I was choosing from two well-known and popular tools for stream processing: Spark and Flink. After reading a few interesting articles [8,9] on comparison I decided to focus on Flink for this use case. It also worth noting that Amazon has integrated Flink deeply into Kinesis stack and basically Kinesis Data Analytics is nothing but Flink managed service by Amazon.

Persistent Storage and Data Archival

S3 with Glacier storage type provides cost-effective storage for all your logs. My design includes 365 days in S3 standard storage then migration to Glacier for up to 7 years as per regulatory requirements.

Visualization, Self-discovery and Research (5)

This section probably deserves three separate chapters for each of the tools included in the solution. For simplicity, I provide here just a high-level overview and some core capabilities that we need.

Elasticsearch + Kibana

Elastic is my favourite open-source tool for visualization and I use extensively in solutions. The main differentiator for me is the high quality of visualization, quick deployment time for new dashboards and NoSQL nature of the underlying Elasticsearch.

There are also some drawbacks compared to Tableau and Oracle BI tools as you cannot do even simple computations on the report. You have to prepare your final data set before it hits Elasticsearch. That takes some time to get used to but in principle, it is a reasonable price for speed that you have.

It worth noting that Elasticsearch is expensive storage compared to S3 and it doesn’t make sense to keep data in there any longer than needed. My personal impression is that fast-search data should be limited to not more than 30 days. Reducing this parameter impacts dramatically the cost of the solution. I will look at cost optimization options separately.

Amazon Athena ad-hock queries

Now imagine you have 365 days of normalized, correlated, aggregated and federated log data sets at your fingertips (S3). Amazon Athena is your tool to run the queries. You can also run Spark jobs using the preferred language. So essentially all your batch jobs could be easily integrated into this architecture at S3 level.

Amazon SageMaker

SageMaker is one of the cool new tools that Amazon has recently introduced. It supports many interesting features from pattern discovery to operationalization. It is slightly offtopic for this article and I’m planning to spear a few days to get closer to it and share the experience.

Streaming Pipelines

Pipeline Design

Data Flow Layers

Security Data Lake is operating at five layers from raw event logs to federated alerts as shown in the diagram below. Processing occurs in real-time based on predefined rules and patterns and analysis windows

Windowing Concepts

Real-time processing introduces a few specific concepts that are important to define and keep in mind in order to spot and define new patterns:

  • Time window
  • Event count window
  • Federated window across different streams

Time window

Time window represents clock based simple rolling windows of pre-defined scale. For security monitoring, commonly used windows are 1, 5, 15, and 16 min.

The example below demonstrates the following correlation rule based on host IP and 1 min time window.

Event count window

Event count window is usually bounded by long time window (60 min or more) to give a different view on events. E.g. 100 connection attempts from 1 source.

Federated window across different streams

Federated window comes into play when we are interested in complex event patterns involving different log streams. Those patterns will pass through ML classification to identify malicious patterns and deploy them into production.

Correlation Rules

I’ve found out that correlation rules are a separate topic on its own and requires extensive research and experimentation on data. Below are some of the commonly used rules borrowed from Ertugrul Akbas [12]

  • Alert if a user accessed more than his 95th percentile number of assets
  • Alert if user whose HTTP to DNS protocol ratio is %300 more than %95 of the other users for the last four-week ratio for 4th day of week
  • Alert if the number of failed authentication ratio to the number of successful authentication is %10
  • Detect data loss by monitoring all endpoints for an abnormal volume of data egress
  • Measure the similarity between well-known process names with the running ones using Levenshtein distance in real-time and detect process masquerade

…and many more

Capacity planning and Costs

Capacity and costs are naturally interrelated. Below I provide architectural aspects that need to be considered for your specific use-case. They help to define non-functional requirements for the solution that impacts the capacity and costs.

AWS Managed Services for Kafka as a Streaming Broker

Pricing examples could be found here: https://aws.amazon.com/msk/pricing/

Pricing example 1 with two broker entry-level nodes: $72.85 monthly, which is $2.35/day.

Pricing example 2 with three medium-size nodes: $620.33 monthly

It is really tough to accurately calculate from EPS to node capacity. Kafka documentation suggests that usually you can take network throughput and use it as an upper boundary.

My suggestion is to start with a small cluster and upgrade it as workloads increase.

Amazon Kinesis Data Analytics (Flink)

There are few price estimation examples here: https://aws.amazon.com/kinesis/data-analytics/pricing/

Workloads with 8k EPS would result in $515.20 monthly

Amazon Elastcsearch Service

This service includes both Elasticsearch and Kibana for visualization

Managed Elasticsearch Service is very useful and helps concentrate on your ultimate solution rather than dealing with specific service setup.

Basic setup for production use with 1TB storage would cost $1419.80 monthly.

You can play with parameters at https://calculator.aws/

Storage

100 TB Standard S3 storage would cost you $2,304.28 monthly

700 TB S3 Glacier for archival would cost you $2,867.20 monthly

Cost summary

There will be more costs that those above, e.g. network, VPN, log routing and surely operational costs.

The above should give you a rough idea that this level of (very complex) stream processing solution would most probably cost you less than $10k in Opex and no Capex.

With deployment times in weeks rather than months.

My POC box configuration details

A small working prototype could be run on your laptop, a more complex one for live data better be configured properly on AWS

Configuration of AWS networking and IAM are offtopic for this article. I will provide here only some basic scripts helping to understand how simple it to get it working and configure these components.

Component versions that I’ve used for the POC:

  • java-1.8.0
  • Kafka 2.4.1
  • Scala 2.11
  • Flink 1.11.0
  • Elastic Stack 7.8

Elastic Filebeat

The code below collects Syslog (RedHat/Centos format from /var/log/messages) and adds some additional host-specific information. The resulting message is in JSON format streamed to Logstash.

filebeat.inputs:# Each - is an input. Most options can be set at the input level, so# you can use different inputs for various configurations.# Below are the input specific configurations.- type: log paths:    - "/u01/tmp/var/log/messages*" tags: ["syslog"] processors: - add_fields:     target: geo     fields:       name: nyc-dc1-rack1       location: 40.7128, -74.0060       continent_name: North America       country_iso_code: US       region_name: New York       region_iso_code: NY       city_name: New York - add_fields:     target: log     fields:       type: syslog- type: log paths:    - "/u01/tmp/var/log/httpd/*" fields:   apache: true fields_under_root: true tags: ["httpd"]output.logstash:   # The Logstash hosts   hosts: ["localhost:5044"]

Elastic Logstash

I’m doing the log normalization with Grok using pattern matching. The example below parses Syslog for “authentication failure” events. All parsed fields will be added to the resulting JSON as separate key-value pairs.

input {   beats {       port => "5044"   }}# in syslogfilter {   if "syslog" in [tags] {       # to replace timestampe with logdate       grok {           match => { "message" => "%{SYSLOGTIMESTAMP:logdate}" }       }       # sshd       # Sample: "Oct 16 06:20:09 combo sshd(pam_unix)[19775]: authentication failure; logname= uid=0 euid=0 tty=NODEVssh ruser= rhost=218.206.193.55  user=root"       grok {           patterns_dir => ["/u01/demo-siem/config/logstash/patterns"]           match => { "message" => "%{SYSLOGTIMESTAMP:syslog.timestamp} %{WORD:system.auth.hostname} sshd\(pam_unix\)\[%{BASE10NUM:sshd.pid}\]: authentication failure; logname=(%{WORD:sshd.logname})? uid=%{BASE10NUM:sshd.uid} euid=%{BASE10NUM:sshd.euid} tty=%{WORD:sshd.tty} ruser=(%{USERNAME:sshd.ruser})? rhost=%{IPORHOST:sshd.rhost}  user=%{USERNAME:sshd.user}" }           add_field => {               "host.daemon" => "sshd"               "sshd.message" => "authentication failure;"           }       }       # ftpd       # Sample: Feb 22 01:55:44 combo ftpd[5781]: connection from 210.212.240.242 () at Wed Feb 22 01:55:44 2006       grok {           patterns_dir => ["/u01/demo-siem/config/logstash/patterns"]           match => { "message" => "%{SYSLOGTIMESTAMP:syslog.timestamp} %{WORD:system.auth.hostname} ftpd\[%{BASE10NUM:ftpd.pid}\]: connection from (%{IP:ftpd.rhost_ip})? (\(%{IPORHOST:ftpd.rhost_name}\))?" }           add_field => {               "host.daemon" => "ftpd"           }       }       # add year as we are using the old logs       mutate {           replace => ["logdate", "%{logdate} 2005"]       }       # use timestamp from the log file       date {           match => [ "logdate", "MMM  d HH:mm:ss YYYY", "MMM dd HH:mm:ss YYYY", "ISO8601" ]       }       geoip {       source => "ftpd.rhost_ip"       }   }}output { kafka {   bootstrap_servers => "localhost:9092"   topic_id => "syslog"   }}

Kafka

I was using out of box Kafka configuration.

# Create a new Topicbin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic syslog
# Get number of messages in the topic./bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic syslog --time -1 --offsets 1 | awk -F ":" '{sum += $3} END {print sum}'

Flink

Sample Flink job to consume the topic from Kafka and sink it to S3

import org.apache.flink.api.common.serialization.SimpleStringEncoder;import org.apache.flink.api.common.serialization.SimpleStringSchema;import org.apache.flink.core.fs.Path;import org.apache.flink.streaming.api.datastream.DataStream;import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;import org.apache.flink.streaming.api.TimeCharacteristic;import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;import java.util.Properties;import java.util.concurrent.TimeUnit;import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;import org.apache.flink.streaming.util.serialization.JSONKeyValueDeserializationSchema;/*** Skeleton code for the datastream walkthrough*/public class SiemPersistJob {   public static void main(String[] args) throws Exception {       StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();       env.enableCheckpointing(5000); // checkpoint every 5000 msecs       // use event time for the application       env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);       // configure interval of periodic watermark generation       env.getConfig().setAutoWatermarkInterval(10 * 1000L); // 10 sec       // Configure Kafka as the data source       Properties properties = new Properties();       properties.setProperty("bootstrap.servers", "localhost:9092");       properties.setProperty("group.id", "group1"); //Kafka consumer group       FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<>("syslog", new SimpleStringSchema(), properties);       DataStream<String> stream = env               .addSource(myConsumer)               // Partitions elements round-robin, creating equal load per partition if there is any skew               .rebalance();       // Configure S3 sink       // @TODO configure dead letter queue https://stackoverflow.com/questions/60548609/flink-kafka-to-s3-with-retry       final StreamingFileSink<String> sink = StreamingFileSink               .forRowFormat(                       new Path("s3a://security-data-lake/staging-zone/syslog/"),                       new SimpleStringEncoder<String>("UTF-8"))               .withRollingPolicy(                       DefaultRollingPolicy.builder()                               .withRolloverInterval(TimeUnit.MINUTES.toMillis(15))                               .withInactivityInterval(TimeUnit.MINUTES.toMillis(5))                               .withMaxPartSize(1024 * 1024 * 1024)                               .build())               .build();       stream.addSink(sink);       env.execute("Syslog pipeline");   }}

As you see it is quite straightforward.

Another very good example with stream aggregation are done by Siddharth Sharma https://medium.com/@sid_sharma/click-stream-processing-on-apache-flink-using-kafka-source-and-aws-s3-sink-b12e6ece783e

S3, Elasticsearch, Kibana

S3 and Elasticsearch are straightforward to configure and I’ve decided to skip it here for the sake of simplicity.

Disclaimer

Materials provided in this article represent the work that I’ve done in my free time and not taken from any of my past projects or relate to any of my previous clients. They are free from any copyright and could be reused for any personal, commercial and non-commercial purposes.

I would be happy to deploy this or similar solution and work on research for specific correlation rules. I’m sure that the technology components provided in this solution are able to overcome the most complex challenges.

If you have any questions or comments feel free to reach out via https://www.linkedin.com/in/fgurbanov/

References

[1] Keystone Routing Pipeline at Netflix (presented at Flink Forward San Francisco 2018)

https://www.slideshare.net/FlinkForward/flink-forward-san-francisco-2018-steven-wu-scaling-flink-in-cloud

https://www.youtube.com/watch?v=w2AF8Ld-Ofs

[2] An approach to the correlation of security events based on machine learning techniques. Published: 11 March 2013

https://jisajournal.springeropen.com/articles/10.1186/1869-0238-4-7

[3] Event Correlation across Log Files: What is it and Why is it Important?

https://www.accenture.com/us-en/blogs/blogs-event-correlation-across-log-files-what-is-it-and-why-is-it-important

[4] Elastic Common Schema

https://github.com/elastic/ecs

[5] ArcSight Common Event Format (CEF) Implementation Standard

https://community.microfocus.com/t5/ArcSight-Connectors/ArcSight-Common-Event-Format-CEF-Implementation-Standard/ta-p/1645557

[6] Demo Data Sets that I’ve used for POC

https://log-sharing.dreamhosters.com/

[7] Book: Stream Processing with Apache Flink by Fabian Hueske and Vasiliki Kalavri (O’Reilly), © 2019

https://www.oreilly.com/library/view/stream-processing-with/9781491974285/?utm_source=dlvr.it&utm_medium=gplus

[8] Apache Flink vs Apache Spark

https://dzone.com/articles/apache-flink-vs-apache-spark-brewing-codes

[9] The past, present, and future of streaming: Flink, Spark, and the gang

https://www.zdnet.com/article/the-past-present-and-future-of-streaming-flink-spark-and-the-gang/

[10] Sliding Vs Tumbling Windows

https://stackoverflow.com/questions/12602368/sliding-vs-tumbling-windows

[11] Apache Kafka Quick Start

https://kafka.apache.org/quickstart

[12] Why SureLog is the Right SIEM Solution?

https://medium.com/@eakbas/why-surelog-is-the-right-siem-solution-6bad5a6f88e3

--

--

Farid Gurbanov
Data Analytics

I’m a solution architect and engineer specializing in cloud migration initiatives. My core focus is security, cost, performance, and time to market.