Data-driven Vulnerability Management: Graph Theory based Reachability Analysis (2/2)

Albert-Jan Talsma
8 min readFeb 13, 2024

--

The previous post provided the background behind the need for a reachability metric in order to perform more effective vulnerability remediation prioritization. This part will provide the PoC example code, the results and future work.

Proof-of-concept

In order to use the PoC with your own data, and calculate reachability, there are a couple of prerequisites:

  • Asset and vulnerability data from your vulnerability management solution. Ideally accessible via an (RESTful) API
  • ACL data from your firewall (management) solution. Ideally accessible via an (RESTful) API
  • Insight into the trust levels of network vlan/subnets/zones
  • Insight into the amount of online assets in a given subnet of the IT network. For example, based on statistical analysis (mean, median, random forest, linear regression)
  • Some basic TCP/IP knowledge and relevant design of the IT network
  • A segmented IT network, preferably on OSI model layer 2, 3 and 4.

Data massaging and example data

Massaging your data will be the hard part. As for most data heavy projects, working out the process and model/algorithm takes around 20–30% of the time and massaging the data 70–80%. This PoC provides some example data to play around with in for example a Jupyter Notebook. ChatGPT or Copilot can provide you with the skeleton code to call a (RESTful) vendor solution API. You will need to develop some parts yourself in order to parse the specific vendor solution data.

Networkx, trust and in-degree centrality

Once all the data has been ingested into the graph, the created relationships between the different types of nodes in the graph allows us to:

  • Determine the assets and services in the IT network that are actually affected by a firewall ACL
  • Use the (potential) amount of assets that are able to reach a certain asset via a certain network service as a weight for the in-degree centrality calculation
  • From the asset point of view map the ‘incoming’ network services as configured in the firewall ACL with the ‘outgoing’ network services that contains one or more vulnerabilities
  • Calculate the lowest trust value based on the source side of the firewall ACL (the source subnet(s) or zone(s))
# Import required libraries
import networkx as nx
import pandas as pd
import matplotlib.pyplot as plt
import ipaddress
import uuid
import operator
# Firewall example data
fwdata = {
'srcintf': ['server', 'ics', 'server', 'voip', 'video', 'video', 'client', 'printer', 'ics', 'server', 'client', 'client', 'client', 'dmz', 'client'],
'dstintf': ['video', 'printer', 'dmz', 'client', 'printer', 'printer', 'server', 'server', 'server', 'dmz', 'printer', 'printer', 'server', 'server', 'video'],
'srcaddr': ['192.168.0.0/24', '10.0.2.76', '192.168.0.3', '192.168.0.3', '10.0.1.0/24', '10.0.1.0/24', '192.168.2.4', '10.0.0.1', '10.0.2.76', '192.168.0.7', '192.168.2.0/24', '192.168.2.0/24', '192.168.2.0/24', '172.16.50.2', '192.168.2.0/24'],
'dstaddr': ['10.0.1.0/24', '10.0.0.2', '172.16.50.2', '192.168.2.1', '10.0.0.1', '10.0.0.1', '172.16.0.0/24', '172.16.0.2', '172.16.0.2', '172.16.50.2', '10.0.0.0/24', '10.0.0.0/24', '172.16.0.7', '172.16.0.7', '10.0.1.0/24'],
'service': ['tcp/8080', 'udp/161', 'tcp/443', 'tcp/443', 'tcp/9100', 'tcp/80', 'tcp/8080', 'tcp/80', 'tcp/80', 'tcp/443', 'tcp/21', 'udp/161', 'tcp/3389', 'tcp/3389', 'tcp/8080'],
'firewall': ['fw02', 'fw02', 'fw02', 'fw02', 'fw02', 'fw02', 'fw02', 'fw02', 'fw02', 'fw02', 'fw02', 'fw02', 'fw02', 'fw02', 'fw02']
}

# Vulnerability and asset example data
vmdata = {
'name': ['Heartbleed', 'Default SNMP community string', 'Microsoft Remote Desktop Services Remote Code Execution (BlueKeep)', 'Apache Tomcat HTTP Trailer Header Parsing Vulnerability', 'Apache Tomcat AJP File Read/Inclusion (Ghostcat)', 'Nginx Buffer Overflow in FastCGI Module', 'Ivanti Connect Secure and Policy Secure Authentication Bypass Vulnerability', 'Writable FTP folder', 'MS SQL Server Remote Code Execution', 'Apache Struts Remote Code Execution'],
'hostname': ['client05' , 'printer21', 'server07', 'video16', 'video16', 'server10', 'dmz02', 'printer06', 'server03', 'server02'],
'ipaddr': ['192.168.2.5', '10.0.0.21', '172.16.0.7', '10.0.1.16', '10.0.1.16', '172.16.0.10', '172.16.50.2', '10.0.0.6', '172.16.0.3', '172.16.0.2'],
'cve': ['CVE-2014-0160', 'CVE-1999-0472', 'CVE-2019-0708', 'CVE-2023-46589', 'CVE-2020-1938', 'CVE-2019-11043', 'CVE-2023-46805', 'CVE-1999-0527', 'CVE-2020-0618', 'CVE-2017-5638'],
'cvss': ['9.4', '', '9.8', '7.5', '9.8', '9.8', '8.2', '', '9.8', '10'],
'detected_on_port': ['tcp/443', 'udp/161', 'tcp/3389', 'tcp/8080', 'tcp/8080', 'tcp/80', 'tcp/443', 'tcp/21', 'tcp/1433', 'tcp/443'],
'zone': ['client', 'printer', 'server', 'video', 'video', 'server', 'dmz', 'printer', 'server', 'server'],
'firewall' : ['fw02', 'fw02', 'fw02', 'fw02', 'fw02', 'fw02', 'fw02', 'fw02', 'fw02', 'fw02']
}

# Trust values for different zones/subnets
# Example mapping to quantify trust: high=1, medium=2, low=3, zero=4
trust_values = {
'server': 1,
'client': 3,
'printer': 2,
'video': 2,
'voip': 2,
'ics': 1,
'dev': 2,
'guest_wifi': 3,
'internet': 4,
'dmz': 3
}

# DHCP lease example data
dhcp_leases = {
'server': 85,
'client': 174,
'printer': 23,
'video': 13,
'voip': 150,
'ics': 5,
'dev': 32,
'guest_wifi': 98,
'dmz': 3
}

# Creating the Panda DataFrames
firewall_df = pd.DataFrame(fwdata)
vuln_df = pd.DataFrame(vmdata)
# Create a Networkx MultiDiGraph
G = nx.MultiDiGraph()

def get_or_create_zone_node(graph, intf, firewall, trust_values):
zone_node = next((node for node, attr in graph.nodes(data=True) if attr.get('type') == 'Zone' and attr.get('name') == intf), None)
fw_node = next((node for node, attr in graph.nodes(data=True) if attr.get('type') == 'Firewall' and attr.get('name') == firewall), None)

if zone_node is None:
if fw_node is None:
fw_node_id = str(uuid.uuid4())[:6]
fw_node = (fw_node_id, {"type": "Firewall", "name": firewall})
graph.add_nodes_from([fw_node])
else:
fw_node_id = fw_node

zone_node_id = str(uuid.uuid4())[:6]
zone_node = (zone_node_id, {"type": "Zone", "name": intf, "firewall": firewall, "trust": trust_values.get(intf, 'unknown')})
graph.add_nodes_from([zone_node])
G.add_edge(fw_node_id, zone_node_id, label="ROUTES")
else:
zone_node_id = zone_node

return zone_node_id

# Add host nodes from the vulnerability DataFrame
for index, row in vuln_df.iterrows():
name = row['name']
hostname = row['hostname']
ipaddr = row['ipaddr']
cve = row['cve']
cvss = row['cvss']
detected_on_port = row['detected_on_port']
zone = row['zone']
firewall = row['firewall'] # see byproduct section

# Add nodes and edges for firewalls, hosts and vulnerability
zone_node_id = get_or_create_zone_node(G, zone, firewall, trust_values)

host_node = next((node for node, attr in G.nodes(data=True) if attr.get('type') == 'Host' and attr.get('ipaddr') == ipaddr), None)
if host_node is None:
host_node_id = str(uuid.uuid4())[:6]
host_node = (host_node_id, {"type": "Host", "name": hostname, "ipaddr": ipaddr})
G.add_nodes_from([host_node])
G.add_edge(host_node_id, zone_node_id, label="PART_OF")
else:
host_node_id = host_node

if cve is not None:
vuln_node = next((node for node, attr in G.nodes(data=True) if attr.get('type') == 'Vulnerability' and attr.get('cve') == cve), None)
if vuln_node is None:
vuln_node_id = str(uuid.uuid4())[:6]
vuln_node = (vuln_node_id, {"type": "Vulnerability", "name": name, "cve": cve, "cvss": cvss})
G.add_nodes_from([vuln_node])
G.add_edge(host_node_id, vuln_node_id, service=detected_on_port)
else:
vuln_node_id = vuln_node
G.add_edge(host_node_id, vuln_node_id, service=detected_on_port)

# Add nodes and edges based on the firewall DataFrame
for index, row in firewall_df.iterrows():
firewall = row['firewall']
src_intf = row['srcintf']
dst_intf = row['dstintf']
src_addr = row['srcaddr']
dst_addr = row['dstaddr']
service = row['service']

# Create or find source zone node
src_zone_node_id = get_or_create_zone_node(G, src_intf, firewall, trust_values)

# Create or find destination zone node
dst_zone_node_id = get_or_create_zone_node(G, dst_intf, firewall, trust_values)

src_subnet = ipaddress.IPv4Network(src_addr)
dst_subnet = ipaddress.IPv4Network(dst_addr)

src_host_node = next((node for node, attr in G.nodes(data=True) if attr.get('type') == 'Host' and attr.get('ipaddr') == src_addr), None)
if src_host_node is None:
num_hosts = src_subnet.num_addresses
src_host_node_id = str(uuid.uuid4())[:6]
src_host_node = (src_host_node_id, {"type": "Host", "ipaddr": src_addr})
G.add_nodes_from([src_host_node])
G.add_edge(src_host_node_id, src_zone_node_id, weight=dhcp_leases.get(src_intf, 'unknown'), label='PART_OF')
G.add_edge(src_zone_node_id, dst_zone_node_id, label='CONNECTED_TO')
else:
src_host_node_id = src_host_node

dst_host_node = next((node for node, attr in G.nodes(data=True) if attr.get('type') == 'Host' and attr.get('ipaddr') == dst_addr), None)
if dst_host_node is None:
dst_host_node_id = str(uuid.uuid4())[:6]
dst_host_node = (dst_host_node_id, {"type": "Host", "ipaddr": dst_addr})
G.add_nodes_from([dst_host_node])
G.add_edge(dst_host_node_id, dst_zone_node_id, label='PART_OF')
else:
dst_host_node_id = dst_host_node

if src_subnet.num_addresses == 1 and dst_subnet.num_addresses == 1:
num_hosts = src_subnet.num_addresses
G.add_edge(dst_zone_node_id, dst_host_node_id, service=service, weight=num_hosts, trust=trust_values.get(src_intf, 'unknown'))

elif src_subnet.num_addresses > 1 and dst_subnet.num_addresses == 1:
G.add_edge(dst_zone_node_id, dst_host_node_id, service=service, weight=dhcp_leases.get(src_intf, 'unknown'), trust=trust_values.get(src_intf, 'unknown'))

elif src_subnet.num_addresses > 1 and dst_subnet.num_addresses > 1:
for node, attr in G.nodes(data=True):
if attr.get('type') == 'Host' and 'ipaddr' in attr:
ip_address = attr['ipaddr']
try:
ip = ipaddress.IPv4Address(ip_address)
if ip in dst_subnet:
G.add_edge(dst_zone_node_id, node, service=service, weight=dhcp_leases.get(src_intf, 'unknown'), trust=trust_values.get(src_intf, 'unknown'))
except ipaddress.AddressValueError:
pass

elif src_subnet.num_addresses == 1 and dst_subnet.num_addresses > 1:
num_hosts = src_subnet.num_addresses
for node, attr in G.nodes(data=True):
if attr.get('type') == 'Host' and 'ipaddr' in attr:
ip_address = attr['ipaddr']
try:
ip = ipaddress.IPv4Address(ip_address)
if ip in dst_subnet:
G.add_edge(dst_zone_node_id, node, service=service, weight=num_hosts, trust=trust_values.get(src_intf, 'unknown'))
except ipaddress.AddressValueError:
pass
# Calculate in-degree centrality for nodes with type='Host'
def weighted_in_degree_centrality_for_service(graph):
in_degrees = dict(graph.in_degree(weight='weight'))
host_nodes = [node for node in graph.nodes if graph.nodes[node].get("type") == 'Host' and in_degrees.get(node, 0) > 0]
total_weight = sum(data['weight'] for node in graph.nodes() if graph.nodes[node].get('type') == 'Host' for _, _, data in graph.out_edges(node, data=True) if 'weight' in data)

centrality_by_service = {}

for host_node in host_nodes:
host_in_edges = [(u, v, w) for u, v, w in graph.in_edges(host_node, data=True) if w.get('service') is not None]
service_values = set([w['service'] for _, _, w in host_in_edges])
min_trust = min(w['trust'] for _, _, w in host_in_edges)

centrality_by_service[host_node] = {}

for service_value in service_values:
service_edges = [(u, v, w) for u, v, w in host_in_edges if w['service'] == service_value]

# Calculate the weighted in-degree centrality for nodes involved in the specified service edges
centrality = sum(w['weight'] for _, _, w in service_edges)
normalized_centrality = (centrality / total_weight) * min_trust

# Round the normalized centrality to 4 digits
normalized_centrality = round(normalized_centrality, 4)

centrality_by_service[host_node][service_value] = normalized_centrality

return centrality_by_service

centrality_for_service = weighted_in_degree_centrality_for_service(G)
nx.set_node_attributes(G, centrality_for_service, "centrality")
# Map the service from the incoming edges to the service where the vulnerability is detected on and determine the lowest trust value
in_degrees = {node: G.in_degree(node) for node, attr in G.nodes(data=True) if attr.get('type') == 'Host' and G.in_degree(node) > 0}

# Iterate through the edges of host nodes with in-degree > 0
for host_node in in_degrees:
in_edges = G.in_edges(host_node, data=True)
for u, v, in_data in in_edges:
in_service = in_data.get('service')
in_trust = in_data.get('trust')

# Trust values dictionary
trust_values_dict = {}
if in_service in trust_values_dict:
trust_values_dict[in_service].append(in_trust)
else:
trust_values_dict[in_service] = [in_trust]

out_edges = [(x, y, data) for x, y, data in G.out_edges(v, data=True) if 'service' in data]
if not out_edges: # Check if out_edges is empty
continue
for x, y, out_data in out_edges:
if out_data.get('service') == in_service:
min_trust_value = min(trust_values_dict[in_service])
G.nodes[y]['lowest_trust'] = min_trust_value
G.nodes[y]['reachability'] = True

# Copy correct service value from centrality attribute to the node at the other end of the out-edge
centrality_value = G.nodes[host_node].get('centrality', {}).get(in_service, 'unknown')
if centrality_value is not None:
G.nodes[y]['centrality'] = centrality_value
else:
G.nodes[y]['reachability'] = False
# Adjust display options to make the first column wider
pd.set_option('display.max_colwidth', 100)

# Create a list to store vulnerability information
vulnerabilities_info = []

# Collect information for reachable vulnerabilities
for node, data in G.nodes(data=True):
if 'type' in data and data['type'] == 'Vulnerability' and data.get('reachability') is True:
cve = data.get('cve', 'unknown')
cvss = data.get('cvss', 'unknown')
centrality = data.get('centrality', 'unknown')

# Append information to the list
vulnerabilities_info.append({
'Name': data.get('name', 'unknown'),
'CVE': cve,
'CVSS': cvss,
'Lowest Trust': data.get('lowest_trust', 'unknown'),
'Reachability score': centrality
})

# Sort vulnerabilities by in-degree centrality and lowest trust
vulnerabilities_info.sort(key=operator.itemgetter('Reachability score'), reverse=True)

# Create a DataFrame
df = pd.DataFrame(vulnerabilities_info)

# Display the DataFrame
df

The results

Above code results in the following table. In this example the BlueKeep vulnerability can be reached by 10.65% of observed assets.

Reachable vulnerabilities sorted by Reachability score

Reachability analysis only makes sense if your IT network is segmented on layer 2 (VLANs), layer 3 (routable subnets) and layer 4 (port based firewall ACLs) of the OSI model. If not, implementing network segmentation, in a least privilege manor, should be your top priority.

Reachability analysis allows you to pick the ‘right’ 10% of vulnerabilities your IT team is able to remediate every month. The following figure depicts an example decision tree using multiple prioritization metrics (thresholds are indicative):

Vulnerability remediation prioritization decision tree

I highly recommend reading the book Math for Security by Daniel Reilly to learn more about how to utilize graphs, geometry and spatial analysis to solve complex security challenges.

Byproduct

Due to data massaging efforts you might encounter remnant firewall ACLs, ACL misconfigurations and issues with naming conventions. This exercise might improve your overall network security as well.

Things to improve

In no particular order:

  • Take the order of allow and deny ACLs into account (1)
  • Take local vulnerabilities into account besides network based vulnerabilities
  • Take reachability from the same zone/subnet into account
  • Feed the reachability score back into the VM solution
  • Add support for IPv6 addresses
  • Adapt the code to determine the ‘blast radius’ of an exploited vulnerability
  • Adapt the code to handle IT environments with multiple firewalls at multiple locations
  • Refactor the code because I’m not an experienced (Python) developer :-)
  1. https://www.cse.msu.edu/~alexliu/publications/Reachability/reachability10ICDCS.pdf

--

--