Ant Colony Optimization
Finding the shortest path in a graph using Ant Colony Optimization
This article aims to delve into my implementation of the Ant Colony Optimization algorithm to find the shortest path between two nodes in a graph. This Python package has been published to PyPi and can be installed using pip install aco_routing
.
What is Ant Colony Optimization? 🐜
The Ant Colony Optimization algorithm is a probabilistic technique for solving computational problems by modeling the behavior of ants and their colonies. This algorithm was introduced by Marco Dorigo (publication) and was intended to solve the traveling salesman problem. However, recent advances have also led to this approach being used for complex optimization tasks.
More information here.
The Search Space — NetworkX Graph
For this problem statement, our search space is a network of interconnected nodes and edges. The network is represented as a digraph (directed graph) and implemented using the Python package NetworkX. Each edge of the graph contains a heuristic value (travel cost) and a pheromone value (initially set to 1.0
for all edges).
For the sake of simplicity, only the necessary methods from NetworkX are exposed and consumed through a GraphApi in the package.
Implementing the ACO algorithm
Let’s start by creating a core ACO
class to run/control the algorithm:
There are several hyper-parameters that can be tweaked for the ACO algorithm
from dataclasses import dataclass, field
import networkx as nx
from aco_routing.ant import Ant
from aco_routing.graph_api import GraphApi
@dataclass
class ACO:
graph: nx.DiGraph
# Maximum number of steps an ant is allowed to take in order to reach the destination
ant_max_steps: int
# Number of cycles/waves of search ants to be deployed
num_iterations: int
# Indicates if the search ants should spawn at random nodes in the graph
ant_random_spawn: bool = True
# Evaporation rate (rho)
evaporation_rate: float = 0.1
# Pheromone bias
alpha: float = 0.7
# Edge cost bias
beta: float = 0.3
# Search ants
search_ants: List[Ant] = field(default_factory=list)
def __post_init__(self):
# Initialize the Graph API
self.graph_api = GraphApi(self.graph, self.evaporation_rate)
# Initialize all edges of the graph with a pheromone value of 1.0
for edge in self.graph.edges:
self.graph_api.set_edge_pheromones(edge[0], edge[1], 1.0)
def find_shortest_path(
self,
source: str,
destination: str,
num_ants: int,
) -> Tuple[List[str], float]:
"""Finds the shortest path from the source to the destination in the graph
Args:
source (str): The source node in the graph
destination (str): The destination node in the graph
num_ants (int): The number of search ants to be deployed
Returns:
List[str]: The shortest path found by the ants
float: The cost of the computed shortest path
"""
self._deploy_search_ants(
source,
destination,
num_ants,
)
solution_ant = self._deploy_solution_ant(source, destination)
return solution_ant.path, solution_ant.path_cost
Now that the search space has been instantiated, we can create an Ant
class with some relevant properties; These ants will be deployed in the search space.
Ant-specific hyper-parameters can also be tweaked in the
ACO
class.
from dataclasses import dataclass, field
from typing import Dict, List, Set, Union
from aco_routing.graph_api import GraphApi
@dataclass
class Ant:
graph_api: GraphApi
source: str
destination: str
# Pheromone bias
alpha: float = 0.7
# Edge cost bias
beta: float = 0.3
# Set of nodes that have been visited by the ant
visited_nodes: Set = field(default_factory=set)
# Path taken by the ant so far
path: List[str] = field(default_factory=list)
# Cost of the path taken by the ant so far
path_cost: float = 0.0
# Indicates if the ant has reached the destination (fit) or not (unfit)
is_fit: bool = False
# Indicates if the ant is the pheromone-greedy solution ant
is_solution_ant: bool = False
def __post_init__(self) -> None:
# Set the spawn node as the current and first node
self.current_node = self.source
self.path.append(self.source)
The ACO algorithm is split into 3 distinct phases that are thoroughly explained below. However, a prerequisite to finding the shortest path is to initialize and spawn ants in the graph at random nodes:
# class ACO
def _deploy_search_ants(
self,
source: str,
destination: str,
num_ants: int,
) -> None:
"""Deploy search ants that traverse the graph to find the shortest path
Args:
source (str): The source node in the graph
destination (str): The destination node in the graph
num_ants (int): The number of ants to be spawned
"""
for _ in range(self.num_iterations):
self.search_ants.clear()
for _ in range(num_ants):
spawn_point = (
random.choice(self.graph_api.get_all_nodes())
if self.ant_random_spawn
else source
)
ant = Ant(
self.graph_api,
spawn_point,
destination,
alpha=self.alpha,
beta=self.beta,
)
self.search_ants.append(ant)
self._deploy_forward_search_ants()
self._deploy_backward_search_ants()
Phase 1 — Forward Search Ants
All ants are tasked with reaching the destination node in a finite number of steps. If an ant reaches the destination in the expected number of steps, it is marked as fit. For all subsequent stages within an iteration, our focus lies on fit ants.
# class ACO
def _deploy_forward_search_ants(self) -> None:
"""Deploy forward search ants in the graph"""
for ant in self.search_ants:
for _ in range(self.ant_max_steps):
if ant.reached_destination():
ant.is_fit = True
break
ant.take_step()
# class Ant
def take_step(self) -> None:
"""Compute and update the ant position"""
# Mark the current node as visited
self.visited_nodes.add(self.current_node)
# Pick the next node of the ant
next_node = self._choose_next_node()
# Check if ant is stuck at current node
if not next_node:
return
self.path.append(next_node)
self.path_cost += self.graph_api.get_edge_cost(self.current_node, next_node)
self.current_node = next_node
def reached_destination(self) -> bool:
"""Returns if the ant has reached the destination node in the graph
Returns:
bool: returns True if the ant has reached the destination
"""
return self.current_node == self.destination
At each step, the ant determines its next move, using the edge Transition Probability Policy — the probability of transitioning from the current node i
to another node j
:
where,
τ_ij
is the amount of pheromone deposited on the edgei→j
η_ij
is the travel cost (heuristic) of the edgei→j
α
controls the influence of the pheromone value (α ≤ 1
)β
controls the influence of the heuristic value (β ≤ 1)
The denominator of the equation represents all edges that the ant is allowed to travel to (unvisited nodes).
Based on the above transition formula, let’s implement the necessary methods in the Ant
class to compute all edge probabilities and select the best next node to transition to:
# class Ant
def _choose_next_node(self) -> Union[str, None]:
"""Choose the next node to be visited by the ant
Returns:
[str, None]: The computed next node to be visited by the ant or None if no possible moves
"""
unvisited_neighbors = self._get_unvisited_neighbors()
# Check if ant has no possible nodes to move to
if len(unvisited_neighbors) == 0:
return None
probabilities = self._calculate_edge_probabilities(unvisited_neighbors)
# Pick the next node based on the roulette wheel selection technique
return utils.roulette_wheel_selection(probabilities)
Get all the allowed nodes (unvisited) for the ant to move to:
# class Ant
def _get_unvisited_neighbors(self) -> List[str]:
"""Returns a subset of the neighbors of the node which are unvisited
Returns:
List[str]: A list of all the unvisited neighbors
"""
return [
node
for node in self.graph_api.get_neighbors(self.current_node)
if node not in self.visited_nodes
]
Compute the edge probabilities:
# class Ant
def _calculate_edge_probabilities(
self, unvisited_neighbors: List[str]
) -> Dict[str, float]:
"""Computes the transition probabilities of all the edges from the current node
Args:
unvisited_neighbors (List[str]): A list of unvisited neighbors of the current node
Returns:
Dict[str, float]: A dictionary mapping nodes to their transition probabilities
"""
probabilities: Dict[str, float] = {}
all_edges_desirability = self._compute_all_edges_desirability(
unvisited_neighbors
)
for neighbor in unvisited_neighbors:
edge_pheromones = self.graph_api.get_edge_pheromones(
self.current_node, neighbor
)
edge_cost = self.graph_api.get_edge_cost(self.current_node, neighbor)
current_edge_desirability = utils.compute_edge_desirability(
edge_pheromones, edge_cost, self.alpha, self.beta
)
probabilities[neighbor] = current_edge_desirability / all_edges_desirability
return probabilities
def _compute_all_edges_desirability(
self,
unvisited_neighbors: List[str],
) -> float:
"""Computes the denominator of the transition probability equation for the ant
Args:
unvisited_neighbors (List[str]): All unvisited neighbors of the current node
Returns:
float: The summation of all the outgoing edges (to unvisited nodes) from the current node
"""
total = 0.0
for neighbor in unvisited_neighbors:
edge_pheromones = self.graph_api.get_edge_pheromones(
self.current_node, neighbor
)
edge_cost = self.graph_api.get_edge_cost(self.current_node, neighbor)
total += utils.compute_edge_desirability(
edge_pheromones, edge_cost, self.alpha, self.beta
)
return total
Now that the edge probabilities have been computed, the ant selects the next node based on the Roulette Wheel Selection technique:
# utils
def roulette_wheel_selection(probabilities: Dict[str, float]) -> str:
"""source: https://en.wikipedia.org/wiki/Fitness_proportionate_selection"""
sorted_probabilities = {
k: v for k, v in sorted(probabilities.items(), key=lambda item: -item[1])
}
pick = random.random()
current = 0.0
for node, fitness in sorted_probabilities.items():
current += fitness
if current > pick:
return node
raise Exception("Edge case for roulette wheel selection")
Phase 2 — Backward Search Ants
Each fit ant travels backward from the destination to the spawn point and deposits pheromones on this path.
# class ACO
def _deploy_backward_search_ants(self) -> None:
"""Deploy fit search ants back towards their source node while dropping pheromones on the path"""
for ant in self.search_ants:
if ant.is_fit:
ant.deposit_pheromones_on_path()
The amount of pheromone deposited by each ant on an edge is computed by the Pheromone Deposit Policy:
where,
ρ
is the coefficient of evaporation — this ensures that the pheromone value of an edge is always finiteτ_ij
is the amount of pheromone deposited on the edgei→j
m
is the total number of ants
Let’s add this to the Ant
class and a corresponding method in the GraphApi
:
# class Ant
def deposit_pheromones_on_path(self) -> None:
"""Updates the pheromones along all the edges in the path"""
for i in range(len(self.path) - 1):
u, v = self.path[i], self.path[i + 1]
new_pheromone_value = 1 / self.path_cost
self.graph_api.deposit_pheromones(u, v, new_pheromone_value)
# GraphApi
def deposit_pheromones(self, u: str, v: str, pheromone_amount: float) -> None:
self.graph[u][v]["pheromones"] += max(
(1 - self.evaporation_rate) + pheromone_amount, 1e-13
)
Phase 3 — Solution Ant
At this stage, we have deployed several waves (epochs) of forward and backward ants in the graph. The current assumption is that the pheromone levels along the edges are sufficiently high to accurately depict the shortest route from any node to the target destination.
To generate the solution, i.e. the shortest path from the source to the destination, we deploy a solution ant at the source node.
# class ACO
def _deploy_solution_ant(self, source: str, destination: str) -> Ant:
"""Deploy the pheromone-greedy solution to find the shortest path
Args:
source (str): The source node in the graph
destination (str): The destination node in the graph
Returns:
Ant: The solution ant with the computed shortest path and cost
"""
ant = Ant(
self.graph_api,
source,
destination,
is_solution_ant=True,
)
while not ant.reached_destination():
ant.take_step()
return ant
The unique characteristic of the solution ant is its pheromone-greediness (α=1
and β=0
), i.e. it will choose its next step based on the edge with the highest pheromone value, ignoring the heuristic value.
Let’s update the Ant.choose_next_node()
method to account for this special type of ant by adding an is_solution_ant
property:
# class Ant
def _choose_next_node(self) -> Union[str, None]:
"""Choose the next node to be visited by the ant
Returns:
[str, None]: The computed next node to be visited by the ant or None if no possible moves
"""
unvisited_neighbors = self._get_unvisited_neighbors()
if self.is_solution_ant:
if len(unvisited_neighbors) == 0:
raise Exception(
f"No path found from {self.source} to {self.destination}"
)
# The final/solution ant greedily chooses the next node with the highest pheromone value
return max(
unvisited_neighbors,
key=lambda neighbor: self.graph_api.get_edge_pheromones(
self.current_node, neighbor
),
)
# Check if ant has no possible nodes to move to
if len(unvisited_neighbors) == 0:
return None
probabilities = self._calculate_edge_probabilities(unvisited_neighbors)
# Pick the next node based on the roulette wheel selection technique
return utils.roulette_wheel_selection(probabilities)
Once the solution ant has reached its destination, we can access the ant’s path
and path_cost
properties to get the shortest path and path cost respectively, from the source to the destination node.
Finally, we will consolidate all the information covered regarding the ACO implementation and find the shortest path in a NetworkX graph:
import networkx as nx
from aco_routing import ACO
G = nx.DiGraph()
G.add_edge("A", "B", cost=2)
G.add_edge("B", "C", cost=2)
G.add_edge("A", "H", cost=2)
G.add_edge("H", "G", cost=2)
G.add_edge("C", "F", cost=1)
G.add_edge("F", "G", cost=1)
G.add_edge("G", "F", cost=1)
G.add_edge("F", "C", cost=1)
G.add_edge("C", "D", cost=10)
G.add_edge("E", "D", cost=2)
G.add_edge("G", "E", cost=2)
aco = ACO(G, ant_max_steps=100, num_iterations=100, ant_random_spawn=True)
aco_path, aco_cost = aco.find_shortest_path(
source="A",
destination="D",
num_ants=100,
)
print(f"ACO path: {aco_path}")
print(f"ACO path cost: {aco_cost}")
ACO returns the optimal path and cost 🎉
ACO path: A -> H -> G -> E -> D
ACO path cost: 8.0
ACO Key Learnings
- Search space exploration: Strikes a great balance between exploration and exploitation and leads to several diverse solutions, providing a broader perspective on the problem.
- Optimal solutions: Capable of finding near-optimal solutions across a range of problems, like uncovering the same shortest path as Dijsktra’s algorithm.
- Scalability: Effective strategy for handling large-scale problems as it allows ants to be deployed in parallel.
- Adaptability: Excels in dynamic environments since the ants can effectively update pheromone trails to find the optimal path.
GitHub Repository and Simulation
You can find the source code of the aco_routing
package here: hasnainroopawalla/ant-colony-optimization
An interactive 2D simulation of this algorithm using React and p5.js can be found here: hasnainroopawalla/ant-colony-simulation
Feel free to contribute or raise any issues 🚀
Thanks for reading!