Tutorial : Making a Kubernetes Risk Analyser from scratch

Imran Roshan
Google Cloud - Community
5 min readJun 5, 2024

A detailed approach to designing a Kubernetes risk management system.

Although Kubernetes is scalable and agile, resource management issues and security flaws may arise due to its intricacy. THAT! is the exact thing I get paid for!!

This blog is meant to serve as your guide, regardless of your level of experience with Kubernetes or where you are in your container journey. We’ll simplify the intricacies into digestible portions, offering concise justifications and useful code samples.

Let’s break it down??

Data Collection

To communicate with the Kubernetes API and obtain information about deployments, pods, services, and other topics, we’re going to be using the Kubernetes library. With a major emphasis on extracting pertinent characteristics that could point to risk, like:

  • Requests for resources and container limits
  • Failures of the liveness/readiness probe
  • Age of the container image, Number of pod restarts
  • Pods’ security context configurations

Finally saving the gathered information in a Pandas dataframe or another structured format.

Preprocessing

The gathered data should be cleaned and formatted in a Pandas dataframe. If required we shall encode categorical features, handle missing values, and normalize numerical features. To build new features based on preexisting ones (such as the ratio of request to resource limit), feature engineering might be necessary.

Model Selection and Training

Selecting an appropriate machine learning model according to the kind of risk you wish to detect.

For example:

  • Identification of high-risk deployments is facilitated by classification models (e.g., Random Forest, Logistic Regression).
  • Unusual resource utilization patterns can be found using anomaly detection models (such as Isolation Forest and Local Outlier Factor).

Further we shall be dividing our data into testing and training sets and utilizing the training set to train our selected model (like that wasn’t self explanatory).

Model Evaluation

Utilizing metrics pertinent to our risk analysis objectives, we assess the trained model’s performance on the testing set (e.g., accuracy, precision, recall for classification models). If necessary, we can adjust the model’s hyperparameters to enhance performance.

Risk Assessment and Evaluation

Based on the features of new deployments, we use the trained model to predict the risk score. To classify deployments as low, medium, or high risk, define risk thresholds.

Integration (Optional)

Think about combining your RiskAnalyzer with additional resources:
A dashboard built on Kubernetes that shows risk scores utilizing maybe Grafana. But…. I will leave that upto you to figure out.

Preparations

  • Python Version: Ensure you have Python 3.6 or later installed. You can verify this using python --version or python3 --version in your terminal.
  • Libraries: Install the necessary libraries using pip
pip install kubernetes sklearn pandas numpy tensorflow

THE CODE

# Step 1: Data Collection
# For demonstration purposes, let's assume we have a function to collect Kubernetes pod logs.
def get_pod_logs(namespace, pod_name):
# Function to fetch pod logs from Kubernetes
pass

# Step 2: Data Preprocessing
# Preprocess the collected data, for example, cleaning and structuring the logs.
def preprocess_logs(logs):
# Perform cleaning and preprocessing steps here
cleaned_logs = logs.replace('\n', ' ').lower()
return cleaned_logs

# Step 3: Feature Engineering
# Extract relevant features from the preprocessed data.
def extract_features(logs):
# Extract relevant features from the logs
features = []
# Example feature extraction:
features.append(len(logs)) # Length of logs
features.append(logs.count('error')) # Count of error occurrences
return features

# Step 4: Model Training
# Train a machine learning model on the extracted features.
from sklearn.ensemble import RandomForestClassifier
from sklearn.model_selection import train_test_split

# Assuming you have prepared your dataset (X) and labels (y)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

# Initialize and train the Random Forest Classifier
rf_classifier = RandomForestClassifier()
rf_classifier.fit(X_train, y_train)

# Step 5: Model Evaluation
# Evaluate the performance of the trained model.
from sklearn.metrics import accuracy_score, classification_report

# Predict on the test set
y_pred = rf_classifier.predict(X_test)

# Evaluate the model
accuracy = accuracy_score(y_test, y_pred)
print("Accuracy:", accuracy)
print("Classification Report:")
print(classification_report(y_test, y_pred))

# Step 6: Deployment
# Deploy the trained model as a Kubernetes service.
# This step typically involves creating a Docker image and deploying it to a Kubernetes cluster.
# Kubernetes-specific deployment code is omitted here for simplicity.

# Step 7: Integration
# Integrate the Kubernetes Risk Analyzer with your existing Kubernetes infrastructure.
# This step involves configuring the analyzer to monitor Kubernetes events/logs and trigger alerts.

# Step 8: Continuous Monitoring and Improvement
# Continuously monitor the performance of the deployed model.
# Collect feedback and update the model periodically to adapt to changing risks and environments.

Now, let’s come to reporting! in order to report the findings instead of simply logging the details of the resources we can publish the same to a Pub/Sub topic:

# Example Pub/Sub message publishing with vulnerability details
from google.cloud import pubsub_v1

publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id="your-project-id", topic_name="vulnerability-alerts")

# ... (vulnerability detection logic)

vulnerability_data = {
"deployment_name": deployment.metadata.name,
"image_name": deployment.spec.template.spec.containers[0].image,
"vulnerability_count": vulnerability_data.get("vulnCount", 0),
"vulnerability_details": vulnerability_data.get("vulnerabilities", []),
}

# Publish message to Pub/Sub topic
publisher.publish(topic_path, data=json.dumps(vulnerability_data).encode("utf-8"))

Finally on Cloud Functions, configure the following

# Example Cloud Function to isolate image (Python)
from google.cloud import storage

def isolate_vulnerable_image(data, context):
# Access message data
deployment_name = data["deployment_name"]
vulnerability_count = data["vulnerability_count"]
image_name = data["image_name"]

# Connect to Cloud Storage client
storage_client = storage.Client()

# Get the bucket name (replace with yours)
bucket_name = "your-bucket-name"

# Locate the image in Cloud Storage
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(image_name)

# You can perform actions on the image here (e.g., download, quarantine)
print(f"Vulnerability detected in deployment: {deployment_name} - Image: {image_name}")

# ... (Cloud Function deployment configuration)

The Pub/Sub message will trigger the function which shall access the message data within the function to find the name of the image that has the vulnerability. Discover where in your bucket the vulnerable image is by using the Cloud Storage API (assuming the case where images are stored in a bucket).

There you go!!! Look at you, ready to battle the world!!

Connect with Me??

--

--

Imran Roshan
Google Cloud - Community

Your security sherpa | Google Developer Expert (GCP) | Ethical Hacker | Cloud Security