Securing Anthos Workloads With Chronicle Backstory — A comprehensive approach

Imran Roshan
Google Cloud - Community
6 min readMay 11, 2024

Implementation process, threat detection strategies, and remediation workflows to get started.

How cool would it be to have a single glass-pane to secure Anthos as well as GKE clusters on the go? Helping implement automation to be granular security findings before it escalates?

The world of multiple clouds and hybrid systems poses distinct security challenges. The Anthos platform from Google Cloud makes it easier to deploy applications in a variety of environments, but securing these workloads calls for a thorough strategy. With the recent integration of Chronicle Backstory, a threat detection and investigation tool, with Anthos, we can now leverage extended detection and response (XDR) capabilities throughout this intricate environment. This blog delves deeply into the technical aspects of using Chronicle Backstory to secure Anthos workloads, including data ingestion, threat hunting queries, and utilizing the integrations that are already built in.

Implementing Backstory with Anthos

Chronicle Backstory uses pre-existing data sources to identify potential threats. Our main goal in integrating it with Anthos will be to consume information from two main sources:

Cloud Audit Logs: Your GCP projects’ administrative activity, including that of Anthos clusters, is recorded in these logs.
GKE Logs: The Kubernetes Engine (GKE) logs offer valuable information about the activities of containers and possible security incidents that occur in your Anthos workloads.

Configuring cloud audit logging for Backstory

https://images.app.goo.gl/dM7fV7uT2ZPAD6ZD9

We start with enabling cloud audit logs for our GCP project as starters, run the command below to add an IAM binding to the Chronicle ingestor service account.

gcloud projects add-iam-policy-binding PROJECT_ID \
--member="serviceAccount:chronicle-ingestor@backstory.iam.gserviceaccount.com:user" \
--role="roles/logging.logWriter"

Now that we have a log ingestor in place we can go ahead and create a logging sink to backstory. With this setup, Anthos cluster activity from Cloud Audit Logs can be ingested by Backstory.

gcloud logging sinks create backstory-sink \
--log-filter="resource.type=cluster" \
--destination="projects/PROJECT_ID/sinks/backstory-sink" \
--destination-type=backstory

Enabling GKE logging for backstory includes two steps:

  • Enable Stackdriver Kubernetes Engine Monitoring for your Anthos cluster.
  • Create a sink within Stackdriver Monitoring to export logs to Backstory:
# Configure the Stackdriver Logging Agent
apiVersion: logging.k8s.io/v2
kind: LoggingDeployment
metadata:
name: backstory-agent
spec:
sinkRefs:
- name: "backstory-sink"
namespace: "logging"
# Replace with your Backstory ingestion endpoint
outputDataset: "projects/your-project-id/datasets/anthos-logs"
# Filters to select relevant container logs
selectors:
- expression: "resource.type=k8s_container"
---
# Define the Stackdriver Logging Sink to route logs to Backstory
apiVersion: logging.k8s.io/v2
kind: LoggingSink
metadata:
name: backstory-sink
spec:
# Replace with your Backstory ingestion credentials
secretRef:
name: backstory-credentials
destination:
# Configure secure HTTPS destination for Backstory
destination: "https://your-backstory-endpoint.google.com/v2/ingest"
# Define the log format for Backstory ingestion
outputFormat: "json"

With this setup, Backstory receives GKE logs from your Anthos workloads that show container activity.

Threat Detection With Backstory Queries

Using Chronicle Query Language, Backstory is highly proficient in threat detection (CQL). Here are a few instances:

Detecting Suspicious APIs

This query finds instances of unauthorized users within Anthos clusters making API calls to the Secrets API.

SELECT resource.labels.cluster_name, 
timestamp,
protoPayload.methodName,
protoPayload.request.principalEmail
FROM audit_log
WHERE protoPayload.methodName LIKE '%/v1/secrets%'
AND NOT protoPayload.request.principalEmail LIKE '%admin@yourdomain.com'
ORDER BY timestamp DESC;

Unusual Container Activity Detection

This query finds containers that crash frequently in Anthos workloads, which may be a sign of suspicious activity.

SELECT resource.labels.cluster_name, 
container.name,
timestamp,
jsonPayload.reason
FROM container
WHERE jsonPayload.reason LIKE '%CrashLoopBackOff%'
ORDER BY timestamp DESC;
# Find container executions with unusual resource usage
SELECT process.name, container.name, resource.usage.cpu.usage_in_cores, resource.usage.memory.usage_in_bytes
FROM logs
WHERE resource.type = "k8s_container"
AND resource.usage.cpu.usage_in_cores > (AVG(resource.usage.cpu.usage_in_cores) + 3 * STDDEV(resource.usage.cpu.usage_in_cores))
OR resource.usage.memory.usage_in_bytes > (AVG(resource.usage.memory.usage_in_bytes) + 3 * STDDEV(resource.usage.memory.usage_in_bytes))
ORDER BY resource.usage.cpu.usage_in_cores DESC, resource.usage.memory.usage_in_bytes DESC

Suspicious Login Attempts

This query looks for login attempts made during a specified time period from odd locations. Additional filtering options include user accounts or unsuccessful login attempts.

SELECT user_email, source_ip, timestamp
FROM events
WHERE event_type = 'login.attempt' AND
timestamp >= TIMESTAMP_SUB(CURRENT_TIMESTAMP(), INTERVAL 1d) AND
geo_country(source_ip) NOT IN ('US', 'GB') -- Replace with trusted countries

Potential Lateral Movements

This query looks for user activity that may indicate lateral movement across clusters involving multiple GCP resources. Events can be narrowed down by particular resource kinds or activities that take place within a constrained time frame.

SELECT user_email, resource_type, resource_name, timestamp
FROM events
WHERE event_type IN ('resource.create', 'resource.access')
GROUP BY user_email, resource_type, resource_name
HAVING COUNT(*) > 5 -- Adjust threshold based on expected activity

Unusual File Access

This query looks for file access events coming from source IP addresses or unexpected user accounts. Additional filters can be applied based on particular file types or attempts to access data after business hours.

SELECT user_email, source_ip, file_path, timestamp
FROM events
WHERE event_type = 'file.access'
AND (user_email NOT IN ('admin@example.com', 'service_account@project.com') -- Trusted accounts
OR geo_country(source_ip) NOT IN ('US')) -- Trusted location

Remediation?

For automated remediation, Backstory integrates with a number of tools. Here are some examples using Cloud Functions (because that’s what I found closest at hand).

Isolating infected workloads on the cluster requires a cloud function to be triggered via Backstory findings after which we can add the function:

def isolate_workload(data, context):
# Extract cluster name and pod details from Backstory alert.
cluster_name = data['resource']['labels']['cluster_name']
pod_name = data['container']['name']

# Use Kubernetes API to cordon the infected node.
from kubernetes import client, config
config.load_kube_config()
v1 = client.AppsV1Api()
v1.patch_namespaced_daemon_set(
"kube-system", "kube-dns", body={"spec": {"template": {"spec": {"taints": [{"effect": "NoSchedule", "key": "infected"}]}}}}
)

With the addition of a taint to stop additional pod scheduling, this Cloud Function automatically isolates the compromised node.

Further, you can implement something like:

def remediate_backstory_finding(data, context):
"""Cloud Function triggered by Backstory detection."""
# Parse the Pub/Sub message data
pubsub_message = json.loads(data)
backstory_finding = json.loads(pubsub_message["data"])

# Extract relevant details from the detection
finding_name = backstory_finding["findingName"]
threat_type = backstory_finding["externalSystems"][0]["threatType"]

# Implement logic for remediation based on threat type
if threat_type == "MALWARE":
# Example: Isolate the affected workload
print(f"Isolating workload associated with finding: {finding_name}")
# Replace with your specific isolation workflow (e.g., API call to Anthos)
elif threat_type == "PORT_SCAN":
# Example: Block suspicious IP addresses
print(f"Blocking suspicious IP addresses from finding: {finding_name}")
# Replace with your specific IP blocking workflow (e.g., firewall rule update)
else:
print(f"Unrecognized threat type: {threat_type} for finding: {finding_name}")
# Implement logic for handling unknown threats or sending notifications

A Pub/Sub message with the Backstory detection details in JSON format initiates the function. After parsing the message data, the threat type and finding name are extracted.
The function carries out particular remediation actions based on the type of threat. Including examples of workload isolation for malware and IP blocking for port scans in this case.

Conclude

These are but a few simple instances. Depending on your unique Anthos environment, security posture, and the threats you want to find, you’ll need to modify the queries. As the integration develops, it’s also advised to refer to the official Backstory documentation for the most recent syntax and functionalities.

Get in touch??

--

--

Imran Roshan
Google Cloud - Community

Your security sherpa | Google Developer Expert (GCP) | Ethical Hacker | Cloud Security