Informed Decisions with LGTM

Imran Roshan
Google Cloud - Community
6 min readApr 26, 2024

Utilising the LGTM stack in collaboration with BigQuery and Security Command Center to facilitate informed threat intelligence on GKE

Disclaimer: In case you are here from the title of this blog, common!!! Are you going to let smart tools take your job? Shh! this stays between you and me. Let’s begin.

It is critical to secure your cloud environment, particularly when using Google Kubernetes Engine (GKE) for containerized workloads. Damage mitigation and the upkeep of a robust cloud infrastructure depend heavily on the early detection and response to security threats. In order to accomplish thorough security threat detection on your GKE workloads, this guide explains how to use a potent combination of tools, including Grafana, Loki, Tempo, Metrics (LGTM stack), BigQuery, and Security Command Center (SCC).

A little bit about the tools:

  • Grafana: An open-source platform for monitoring and visualizing data. It lets you make dashboards to see logs, traces, and metrics from different sources.
  • Loki: A high-availability, scalable horizontal log aggregation system. It saves and retrieves logs from your GKE clusters with efficiency.
  • Tempo: An application-generated tracing backend that stores and retrieves traces. Traces give important background information about how requests move through your system.
  • Metrics (Prometheus): Metrics from your infrastructure and applications are gathered and stored by this well-known open-source monitoring system.
  • BigQuery: A serverless, fully managed data warehouse provided by Google Cloud Platform (GCP). It makes it possible to scale and save costs while storing and analyzing big datasets.
  • Security Command Center (SCC): A single platform for managing security information and events (SIEM). It compiles security information from multiple sources and offers tools for incident response, threat hunting, and investigation.

The Steps

Firstly, we start with the very obvious step of installation where we first get our hands dirty with the LGTM stack. Now, be very attentive to the very intricate installation process.

Run the command:

helm repo add lgtm https://charts.lgtm.com/
helm install lgtm lgtm/lgtm \
--namespace <your-namespace> \
--set image.tag=<lgtm-agent-version> \
--set lgtm.token=<your-LGTM-access-token>

and done. Hope you were able to follow.

Getting proper LGTM Logs

For proper operation, the LGTM agent needs access to particular resources located within the cluster. In the spec.template.spec section of the DaemonSet deployment YAML, edit it and add the following security context:

securityContext:
runAsNonRoot: true
capabilities:
add:
- CHOWN
- SETUID
- SETGID
readOnlyRootFilesystem: false

You have the option to set up alerting rules according to the security metrics that LGTM sends you. This enables the agent to notify you of any potential security issues. For information on configuring alerts, consult the Cloud Monitoring documentation.

Logs from the LGTM agent are located in your GKE cluster’s container engine namespace (kube-system). Depending on the version of your LGTM agent, the exact log names may change, but they usually have the format lgtm-agent-. To find the pertinent log names, use the GCP Console’s Kubernetes Engine Logs (GKE Logs) explorer.

Make a filter that will only allow the LGTM agent to record security-related logs. You can use the sophisticated log filtering expressions that Cloud Logging offers. This is an illustration of a filter that records logs that contain the word “vulnerability”:

resource.type="k8s_container" AND resource.labels.container_name="lgtm-agent-*" AND logName="stdout" AND message LIKE "%vulnerability%"

This filter gathers logs from the standard output stream (“stdout”), picks logs from containers whose names begin with “lgtm-agent-” (all LGTM agent containers), and looks for messages that contain the word “vulnerability.” This filter can be changed to suit your needs and to reflect the language LGTM uses for security logs.

All we need to do is store these logs. Log destinations have built-in support in the LGTM agent. This feature can be used to route logs to cloud storage. Modify the deployment YAML file for the LGTM agent. Find the section in the agent container where the environment variables are defined. Which should be replaced with values that belong to you.

spec:
template:
spec:
containers:
- name: lgtm
env:
- name: LGTM_TOKEN
value: "<your-LGTM-access-token>" # Replace with your access token
- name: LGTM_LOG_DESTINATION
value: "gcs://<your-bucket-name>" # Replace with your bucket name

Write log data to BigQuery

We now create a cloud function to populate data on to BIgQuery the moment new detections/scans are initiated thereby populating logs into the Cloud Storage bucket.

Here is an example of Python Cloud Function code that parses log data and sends it to BigQuery (modify project_id, dataset_name, and table_name to reflect your information):

from google.cloud import bigquery

def parse_lgtm_log(data, context):
"""Parses LGTM security log data and writes it to BigQuery

Args:
data: The Cloud Storage event data (JSON).
context: The Cloud Function execution context.
"""
# Extract log data from the event
log_entry = data['message']

# Parse the log entry (implement your parsing logic here)
# This example assumes basic key-value separation
log_dict = {}
for line in log_entry.splitlines():
key, value = line.split(":", 1)
log_dict[key.strip()] = value.strip()

# Create a BigQuery client
client = bigquery.Client()

# Reference the BigQuery dataset and table
dataset_ref = client.dataset(project_id, dataset_name)
table_ref = dataset_ref.table(table_name)

# Insert the parsed log data into BigQuery
rows_to_insert = [log_dict]
table = client.get_table(table_ref) # Make sure the table exists
errors = client.insert_rows(table, rows_to_insert)

if errors:
print(f"Errors while inserting data: {errors}")

Analyse

To examine the parsed LGTM security logs, use BigQuery. Based on the log data, you can create custom SQL queries to find trends, patterns, and possible security issues. Here are a few instances:

Identify CVEs:

SELECT * FROM `project_id.dataset_name.table_name` 
WHERE cve_id IS NOT NULL;

Identify frequently occurring security events:

SELECT finding_type, COUNT(*) AS count
FROM `project_id.dataset_name.table_name`
GROUP BY finding_type
ORDER BY count DESC;

Analyse event trends:

SELECT TIMESTAMP_TRUNC(timestamp, HOUR) AS timestamp, COUNT(*) AS count
FROM `project_id.dataset_name.table_name`
GROUP BY timestamp
ORDER BY timestamp;

Find Vulnerabilities by images:

SELECT container_image, vulnerability_id, COUNT(*) AS occurrences
FROM `your_project.your_dataset.lgtm_logs`
WHERE vulnerability_id IS NOT NULL
GROUP BY container_image, vulnerability_id
ORDER BY occurrences DESC;

Track threats over time:

SELECT DATE(timestamp) AS scan_date, threat_type, COUNT(*) AS count
FROM `your_project.your_dataset.lgtm_logs`
GROUP BY DATE(timestamp), threat_type
ORDER BY scan_date, count DESC;

Secure

Creating a cloud function to send the findings to Security Command Center:

from googleapiclient.discovery import build
from googleapiclient.errors import HttpError

def analyze_findings(data, context):
"""Analyzes BigQuery data and sends high-risk findings to SCC

Args:
data: The BigQuery Pub/Sub message data (JSON).
context: The Cloud Function execution context.
"""
# Extract data from the Pub/Sub message
row = data['value']['oldValue']['values']

# Analyze the BigQuery row data (replace logic with your threat identification)
if 'CVE-2023' in row['finding'] or 'critical' in row['severity'].lower():
finding_text = f"LGTM Security Finding: {row['finding']}"
send_finding_to_scc(finding_text)

def send_finding_to_scc(finding_text):
# Build SCC Findings service object
scc_service = build('securitycenter', 'v1beta1', credentials=context.credentials)

# Prepare SCC finding resource
finding = {
'parent': f'projects/{project_id}/locations/{location}/findings',
'eventTime': datetime.datetime.utcnow().isoformat() + 'Z',
'finding': {
'category': 'SECURITY_EVENT',
'observation': {'description': finding_text}
}
}

# Send the finding to SCC
try:
response = scc_service.findings().create(parent=finding['parent'], body=finding).execute()
print(f"Finding sent to SCC: {response.get('name')}")
except HttpError as error:
print(f"Error sending finding to SCC: {error}")

Set up insights to automatically take action in case a high-risk LGTM finding is discovered. Here are a few options:

  • Patching: Using the CVE found in the LGTM finding, integrate SCC with Patch Management to automatically apply security patches to compromised virtual machines.
  • Updates to IAM Policies: If a finding suggests that access permissions are inappropriate, use SCC insights to initiate updates to IAM policies on GCP resources.
  • Resource Quarantine: In severe circumstances, SCC insights may start the process of isolating resources that have serious vulnerabilities found by LGTM.

To Conclude

Complete automation of remediation of such findings might require manual intervention to eradicate false positives and analyse important information and custom scripting. This strategy calls for constant maintenance and several components. Before implementing automated actions that are prompted by SCC insights in production, make sure you are aware of their implications.

--

--

Imran Roshan
Google Cloud - Community

Your security sherpa | Google Developer Expert (GCP) | Ethical Hacker | Cloud Security