Automating Container Remediation with Copacetic and AquaSec

Hyphen
8 min readNov 14, 2024

--

Craig Thompson, Head of Infrastructure & Security and Ilan Rodriguez, Application Developer

About Us

In today’s rapidly evolving healthcare landscape, collaboration among patients, healthcare providers, community-based organizations and administrators is a key success factor for improving outcomes and reducing costs. Building a cloud-hosted healthcare collaboration platform that meets the diverse needs of all participants has many challenges. Getting the diverse group of players to collaborate can be a formidable task. Likewise, putting the appropriate technical approach in place that provides reliability and scalability, meets the stringent security required of healthcare data exchange and is cost-effective, can be daunting.

The Hyphen Platform enables the healthcare ecosystem to collaborate around their patients, so data and value flow in a way that benefits all. Our mission is to be the trusted platform for healthcare stakeholders to share data and demonstrate value, so they can work together to improve the lives of more people. We do that by creating a connected-care ecosystem that makes it easy for healthcare stakeholders to work together, around and with their patients, to deliver high-quality, value-driven care within their preferred workflows by embedding our products into electronic medical record systems (EMRs), pharmacy information systems and case management systems.

Hyphen views security as a core feature of our platform. Our customers must have the confidence that security is at the core of Hyphen and that it is baked into our culture. As part of efforts to keep the Hyphen platform secure, one of the initiatives we have worked on is creating a pipeline to automatically patch vulnerabilities on our containers.

Introducing Project Copacetic

The Project Copacetic was accepted into CNCF Sandbox on September 13, 2023. The project homepage can be found here.

Project Copacetic provides a CLI tool called Copa that allows users to directly patch container image vulnerabilities without needing a full rebuild. It works with popular vulnerability scanning tools like Trivy and uses the BuildKit engine to apply updates efficiently. Their approach reduces complexity, storage, and deployment time by applying patches as additional layers instead of rebuilding images. It supports various images, including distroless containers, making it accessible for DevSecOps and other engineers.

As an example, let’s check the vulnerabilities on the nginx:1.21.6 docker container using Trivy:

$ export IMAGE=docker.io/library/nginx:1.21.6
$ trivy image --vuln-type os --ignore-unfixed $IMAGE
nginx:1.21.6 (debian 11.3)
==========================
Total: 231 (UNKNOWN: 0, LOW: 12, MEDIUM: 111, HIGH: 86, CRITICAL: 22)
...

According to Trivy, this container has 22 Critical, 86 High, 111 Medium, and 12 Low vulnerabilities.

Now, let’s patch the container with Copacetic (note that you must first install the Copa CLI).

On a Mac, we needed to run buildkit in docker for Copacetic to work. Buildkit can be started with the following command:

export BUILDKIT_VERSION=v0.12.4
export BUILDKIT_PORT=8888
docker run \
--detach \
--rm \
--privileged \
-p 127.0.0.1:$BUILDKIT_PORT:$BUILDKIT_PORT/tcp \
--name buildkitd \
--entrypoint buildkitd \
"moby/buildkit:$BUILDKIT_VERSION" \
--addr tcp://0.0.0.0:$BUILDKIT_PORT

Next, we need to run Trivy and export the results as JSON:

$ export IMAGE=docker.io/library/nginx:1.21.6
$ trivy image --vuln-type os --ignore-unfixed -f json -o $(basename $IMAGE).json $IMAGE

With the Trivy report, we can patch the container:

copa patch \
-i docker.io/library/nginx:1.21.6 \
-r nginx:1.21.6.json \
-t 1.21.6-patched \
-a tcp://0.0.0.0:$BUILDKIT_PORT

Upon completion, there will be a new image tagged nginx:1.21.6-patched. We can now run a Trivy scan against the patched container:

trivy image --vuln-type os --skip-update --docker-host unix:///$HOME/.colima/default/docker.sock --ignore-unfixed nginx:1.21.6-patched
Total: 0 (UNKNOWN: 0, LOW: 0, MEDIUM: 0, HIGH: 0, CRITICAL: 0)

Magic! No vulnerabilities! Note that since we use Colima on our Mac, we had to include the “ — docker-host” flag.

Identifying Containers with Vulnerabilities Using AquaSec

AquaSec is a comprehensive security platform designed to protect cloud-native environments, including containers, Kubernetes, serverless functions, and other microservices based applications. AquaSec’s capabilities include:

1. Container Security

2. Kubernetes Security

3. Infrastructure as Code (IaC) Security

4. Compliance and Governance

5. Vulnerability Management

6. Cloud Security Posture Management (CSPM)

AquaSec continuously scans containers running in our Kubernetes cluster for vulnerabilities and provides reports detailing the findings. In addition, AquaSec provides APIs for retrieving those reports. We leverage those APIs as part of our solution.

To get a list of running containers that have High or Critical vulnerabilities, we hit the following Aqua API:

#get a token from aquasec:
curl -X POST "https://api.cloudsploit.com/v2/signin" \
-H "Content-Type: application/json" \
-d '{
"email": "<your-username>",
"password": "<your-password>"
}'

#use the token obtained above to obtain the list of containers with vulnerabilities
curl -X GET "https://<tenant>.cloud.aquasec.com/api/v2/containers?status=running&cluster=k8s-cluster-1&container_type=containers&risk_high=true&page=1&pagesize=500&risk_critical=true" \
-H "Content-Type: application/json", \
-H "Authorization: Bearer <token>"

Automating the Process

To automate the process, we have implemented the following high-level flow using a scheduled Gitlab Pipeline.

1. Pull the list of containers running in our Kubernetes cluster with High or Critical vulnerabilities from Aqua using their API.

2. Loop over that list of containers and use Copacetic to patch them.

3. Push the patched container to our development container repository.

4. Update a table in Confluence that displays the vulnerabilities before and after patching. (Code to update Confluence not included here.)

This process is currently running as a scheduled pipeline in Gitlab and executes once per day. We have python scripts that run the steps in our pipeline.

The code below retrieves a token from Aqua and calls the Aqua API to retrieve containers with High and Critical vulnerabilities.

import logging
import json
from typing import Any
import requests
import os
from src.util import is_valid_container, write_data_to_file_path


AQUA_USER_NAME = os.getenv("AQUA_USER")
AQUA_USER_PASS = os.getenv("AQUA_PW")
AQUA_AUTH_ENDPOINT = "https://api.cloudsploit.com/v2/signin"
AQUA_REPORT_ENDPOINT = "https://<tenant>.cloud.aquasec.com/api/v2/containers?status=running&cluster=k8s-cluster-1&container_type=containers&risk_high=true&page=1&pagesize=500&risk_critical=true"


# Create logger
logger = logging.getLogger(__name__)


def get_access_token_from_aqua(aqua_endpoint: str, username: str, password: str) -> str:
json_string = json.dumps(
{
"email": username,
"password": password,
}
)
response = requests.post(aqua_endpoint, data=json_string)
response_data = response.json()
access_token = response_data["data"]["token"]
return access_token


def get_aqua_scan_report(aqua_endpoint: str, access_token: str) -> dict:
report = requests.get(
aqua_endpoint,
headers={
"Content-Type": "application/json",
"Authorization": "Bearer {}".format(access_token),
},
)
report_data = report.json()
return report_data


def sort_report(aqua_scan_report: dict) -> list:
report_items: list = aqua_scan_report["result"]
# sort the json based on the number of criticals and highs
sorted_list = sorted(
report_items,
key=lambda item: (item["critical"], item["high"]),
reverse=True,
)
return sorted_list


def create_container_report_from_list(report_list: list) -> dict:
# create list of qualified containers to display in table
processed_containers = []
container_objects = []
for item in report_list:
namespace_name = item["namespace_name"]
image_name = item["image_name"]
image_origin = item["origin_image_name"]
if not is_valid_container(namespace_name, image_name):
continue

container_full_name = f"{image_name}:{namespace_name}"
if container_full_name in processed_containers:
continue

container = {}
container["image_name"] = image_name
parsed_name, *rest = image_name.split(":")
parsed_tag = rest[0] if rest else ""
container["name"] = parsed_name
container["tag"] = parsed_tag if len(parsed_tag) > 1 else ""
container["registry_name"] = image_origin
container["namespace_name"] = namespace_name
container["critical"] = str(item["critical"])
container["high"] = str(item["high"])
container_objects.append(container)

# mark container_name as "processed" and avoid reprocessing in future loop iterations
processed_containers.append(container_full_name)

container_report = {"containers": container_objects}
return container_report


def main() -> None:
access_token = get_access_token_from_aqua(
AQUA_AUTH_ENDPOINT, AQUA_USER_NAME, AQUA_USER_PASS
)
aqua_scan_report = get_aqua_scan_report(AQUA_REPORT_ENDPOINT, access_token)
sorted_list = sort_report(aqua_scan_report)
container_report = create_container_report_from_list(sorted_list)
write_data_to_file_path(container_report, "containers.json")


if __name__ == "__main__":
main()

Next, we run the Copacetic patch. We use AWS ECR as our container repository; this process will pull the container from our prod account and push the patched container to our dev account so that it can be tested. As part of this code, we also keep track of the patched version. This can be eliminated if not necessary for your use case.

from datetime import datetime, time
import os
import re
import logging
import json
import subprocess
import sys
from typing import Any
from dataclasses import dataclass
from src.util import read_json_from_file_path, write_data_to_file_path


AWS_PROD_ACCOUNT = os.getenv("AWS_PROD_ACCOUNT")
AWS_DEV_ACCOUNT = os.getenv("AWS_DEV_ACCOUNT")

# Create logger
logger = logging.getLogger(__name__)


## System functions


def docker_pull(registry_name: str) -> subprocess.CompletedProcess[bytes]:
process_result = subprocess.run(
["docker", "pull", "-q", registry_name], capture_output=True
)
return process_result


def docker_push(registry_name: str) -> subprocess.CompletedProcess[bytes]:
process_result = subprocess.run(
["docker", "push", registry_name], capture_output=True
)
return process_result


def docker_tag(registry_name: str, tag: str) -> subprocess.CompletedProcess[bytes]:
process_result = subprocess.run(
["docker", "tag", registry_name, tag], capture_output=True
)
return process_result


def trivy_scan(
registry_name: str, output_filename: str, ignore_unfixed: bool = False
) -> subprocess.CompletedProcess[bytes]:
command = [
"trivy",
"image",
"--vuln-type",
"os",
"--scanners",
"vuln",
"-f",
"json",
"-o",
output_filename,
registry_name,
]
if ignore_unfixed:
command.append("--ignore-unfixed")
process_result = subprocess.run(
command,
capture_output=True,
)
return process_result


def copacetic_patch(
registry_name: str, trivy_report_filename: str, output_tag: str
) -> subprocess.CompletedProcess[bytes]:
process_result = subprocess.run(
[
"copa",
"patch",
"--image",
registry_name,
"--report",
trivy_report_filename,
"--tag",
output_tag,
"--addr",
"tcp://buildkitd:8888",
"--timeout",
"30m",
],
capture_output=True,
)
return process_result


## Report functions


def extract_vulnerability_counts(trivy_scan: dict) -> dict:
if "Results" not in trivy_scan:
artifact_name = trivy_scan["ArtifactName"]
return dict(container=artifact_name, critical=0, high=0)

scan_result = trivy_scan["Results"][0]
registry_name = scan_result["Target"]
vulnerabilities = scan_result.get("Vulnerabilities", None)
if vulnerabilities is None:
return dict(container=registry_name, critical=0, high=0)

critical_count = len([v for v in vulnerabilities if v["Severity"] == "CRITICAL"])
high_count = len([v for v in vulnerabilities if v["Severity"] == "HIGH"])
return dict(container=registry_name, critical=critical_count, high=high_count)


def add_or_update_patched_suffix(input_string: str, version: int) -> str:
suffix = re.compile(r"\-patched\.\d*$")
if suffix.match(input_string):
new_string = suffix.sub(f"-patched.{version}", input_string)
else:
new_string = input_string + f"-patched.{version}"
return new_string


def remove_patched_suffix(input_string: str) -> str:
suffix = re.compile(r"\-patched\.\d*$")
new_string = suffix.sub("", input_string)
return new_string


def is_new_patch(patch_state: dict, detected_vulnerabilities: dict) -> bool:
return (
patch_state["critical_remediated"] != detected_vulnerabilities["critical"]
or patch_state["high_remediated"] != detected_vulnerabilities["high"]
)


def get_or_create_patch_state_from_table(table: dict, image_name: str) -> dict:
table_entry = table.get(image_name, None)
if not table_entry:
table_entry = {
"image_name": image_name,
"namespace": None,
"critical_initial": None,
"high_initial": None,
"critical_remediated": None,
"high_remediated": None,
"patched_version": None,
"patched_image": None,
"created_date": None,
"last_patched_date": None,
"notes": None,
}
return table_entry


def main():
container_report = read_json_from_file_path("containers.json")
patch_table = read_json_from_file_path("patch_table.json")

failed_patch_containers = []
failed_push_containers = []
unchanged_containers = []
for container in container_report["containers"]:
print(f"Processing {container['name']}")
registry_name = container["registry_name"]
container_name = container["name"].replace("/", "-")

container_tag = container["tag"]
# container tag could contain '-patched.{number}' suffix, remove it
original_tag = remove_patched_suffix(container_tag)

image_name = f"{container_name}:{original_tag}"
patch_state = get_or_create_patch_state_from_table(patch_table, image_name)

docker_pull(registry_name)

report_filename = f"{container_name}.json"
trivy_scan(registry_name, report_filename, ignore_unfixed=True)
# print("Extract Pre-Patch Results")
inital_counts = extract_vulnerability_counts(
read_json_from_file_path(report_filename)
)
container["critical_initial"] = inital_counts["critical"]
container["high_initial"] = inital_counts["high"]
container["critical_remediated"] = patch_state["critical_remediated"]
container["high_remediated"] = patch_state["high_remediated"]
container["patched_version"] = patch_state["patched_version"]
container["patched_image"] = patch_state["patched_image"]
container["last_patched_date"] = patch_state["last_patched_date"]
container["notes"] = patch_state["notes"]

# Assume the upcoming patch is the newer than (is not the same as) the previous patch
# (Also assuming there was a previous patch)
patched_version = patch_state["patched_version"]
if not patched_version:
new_patched_version = 0
else:
new_patched_version = patched_version + 1

# Patch container and give it the '-patched.{version}' tag
# This tag will be used to identify the patched container later in the code
patched_tag = add_or_update_patched_suffix(original_tag, new_patched_version)
output = copacetic_patch(registry_name, report_filename, patched_tag)
if output.returncode:
container["last_patched_date"] = None
container["notes"] = (
f"Error: failed to create new patch version {new_patched_version}"
)
failed_patch_containers.append(registry_name)
continue

# Run the trivy scan again but on the patched container this time.
# Trivy will find the container based on the patched tag.
# In this case it would be: "{container_name}:{container_tag}-patched.{new_patched_version}"
patched_registry_name_prod = add_or_update_patched_suffix(
registry_name, new_patched_version
)
patched_registry_name_dev = patched_registry_name_prod.replace(
AWS_PROD_ACCOUNT, AWS_DEV_ACCOUNT
)
patched_report_filename = f"{container_name}-patched.json"
trivy_scan(patched_registry_name_prod, patched_report_filename)
# print("Extract Post-Patch Results")
remediated_counts = extract_vulnerability_counts(
read_json_from_file_path(patched_report_filename)
)
if not is_new_patch(patch_state, remediated_counts):
print(
f"Warning: patched container '{patched_registry_name_prod}' is the same as previous patch."
" Skipping container for now..."
)
unchanged_containers.append(patched_registry_name_prod)
continue

# Push the patched container to DEV, !!!!!NOT PROD!!!!!!
docker_tag(patched_registry_name_prod, patched_registry_name_dev)
output = docker_push(patched_registry_name_dev)
if output.returncode:
container["last_patched_date"] = None
container["notes"] = (
f"Error: failed to publish new patch version {new_patched_version}"
)
failed_push_containers.append(patched_registry_name_dev)
continue

current_time = str(datetime.now())

# Update report only if the patched container exists in DEV
container["critical_remediated"] = remediated_counts["critical"]
container["high_remediated"] = remediated_counts["high"]
container["patched_version"] = new_patched_version
container["patched_image"] = patched_registry_name_dev
if new_patched_version == 0:
container["created_date"] = current_time
container["last_patched_date"] = current_time

patch_state["namespace"] = container["namespace_name"]
patch_state["critical_initial"] = inital_counts["critical"]
patch_state["high_initial"] = inital_counts["high"]
patch_state["critical_remediated"] = remediated_counts["critical"]
patch_state["high_remediated"] = remediated_counts["high"]
patch_state["patched_version"] = new_patched_version
patch_state["patched_image"] = patched_registry_name_dev
if new_patched_version == 0:
patch_state["created_date"] = current_time
patch_state["last_patched_date"] = current_time

patch_table[image_name] = patch_state

print(f"Remediated High: {remediated_counts['high']}")
print(f"Remediated Critical: {remediated_counts['critical']}")
print(f"New Container: {patched_registry_name_dev}")

containers_failed_report = {
"failed_patch": failed_patch_containers,
"failed_push": failed_push_containers,
"unchanged": unchanged_containers,
}
write_data_to_file_path(container_report, "containers_remediated.json")
write_data_to_file_path(containers_failed_report, "containers_failed.json")
write_data_to_file_path(patch_table, "patch_table.json")


if __name__ == "__main__":
main()

At this point, we have some additional code to update a Confluence page to show the containers and their patched state. For brevity, we have not included that code here.

Summary

In summary, to increase our platform’s security posture and improve our developer’s experience, we have implemented automated container patching for our production workloads. We accomplish this through the use of the CNCF project Copacetic combined with API’s from AquaSec to identify containers with vulnerabilities and a scheduled Gitlab CI/CD pipeline.

--

--

No responses yet