Skip to content

Graph Algorithms

Praveen Kumar Anwla edited this page Dec 15, 2024 · 9 revisions

#Q1: Implement a Topological sort algorithm.

Ans:

from collections import defaultdict

class Graph:
    def __init__(self, number_of_vertices):
        """Initialize a graph with a given number of vertices."""
        self.graph = defaultdict(list)
        self.number_of_vertices = number_of_vertices
    
    def add_edge(self, from_vertex, to_vertex):
        """Add a directed edge from 'from_vertex' to 'to_vertex'."""
        self.graph[from_vertex].append(to_vertex)
    
    def topological_sort_util(self, vertex, visited, stack):
        """A recursive helper function to perform DFS and find topological order."""
        visited.add(vertex)
        
        for neighbor in self.graph[vertex]:
            if neighbor not in visited:
                self.topological_sort_util(neighbor, visited, stack)
        
        # Add the vertex to the stack after exploring all its neighbors
        stack.insert(0, vertex)
    
    def topological_sort(self):
        """Print the topological ordering of the vertices."""
        visited = set()
        stack = []
        
        # Call the recursive helper for every vertex not yet visited
        for vertex in self.graph:
            if vertex not in visited:
                self.topological_sort_util(vertex, visited, stack)
        
        # Reverse the stack (stack.reverse() ) to get the correct topological order
        # if you're using stack.append(vertex) instead.
        print(stack)


# Example usage:
custom_graph = Graph(8)
custom_graph.add_edge("A", "C")
custom_graph.add_edge("C", "E")
custom_graph.add_edge("E", "H")
custom_graph.add_edge("E", "F")
custom_graph.add_edge("F", "G")
custom_graph.add_edge("B", "D")
custom_graph.add_edge("B", "C")
custom_graph.add_edge("D", "F")

custom_graph.topological_sort()

#Q2: Single source shortest path. Ans:

from collections import deque

class Graph:
    def __init__(self, adjacency_dict=None):
        """
        Initialize the graph.
        
        :param adjacency_dict: A dictionary representing the graph, 
                               where each key is a node and the value
                               is a list of its neighbors.
        """
        # If no adjacency dictionary is given, use an empty dict
        self.gdict = adjacency_dict if adjacency_dict is not None else {}
    
    def bfs_path(self, start, end):
        """
        Find a path from 'start' to 'end' using Breadth-First Search (BFS).
        
        :param start: The starting node.
        :param end: The target node we want to reach.
        :return: A list of nodes representing the path from start to end,
                 or None if no path exists.
        """
        
        # A queue to store paths we need to explore.
        # Each element in the queue will be a list representing a path.
        queue = deque()
        
        # Initially, the only path we know is from 'start' to itself.
        queue.append([start])
        
        # Keep track of visited nodes to avoid cycles and redundant searches
        visited = set([start])
        
        # Loop until we have no more paths to explore
        while queue:
            # Get the next path from the queue
            current_path = queue.popleft()
            
            # The last node in the path is the one we're currently exploring
            current_node = current_path[-1]
            
            # If this node is the target, we found a path!
            if current_node == end:
                return current_path
            
            # Otherwise, explore all neighbors of the current node
            # and add new paths for each neighbor to the queue.
            for neighbor in self.gdict.get(current_node, []):
                if neighbor not in visited:
                    visited.add(neighbor)
                    # Create a new path by extending the current path
                    new_path = current_path + [neighbor]
                    queue.append(new_path)
        
        # If we exit the loop, it means we didn't find a path to 'end'
        return None


# Example usage:
customDict = { 
    "a": ["b", "c"],
    "b": ["d", "g"],
    "c": ["d", "e"],
    "d": ["f"],
    "e": ["f"],
    "g": ["f"]
}

graph = Graph(customDict)
result = graph.bfs_path("a", "z")
print(result)  # This will print None because there is no path to 'z'

#Q3: Implement Dijkastra's Algorithm. Ans:

# Implement Dijkstra's algorithm

import heapq
from collections import defaultdict
from typing import List, Dict, Tuple

class Solution:
    def shortestPath(self, num_nodes: int, edge_list: List[List[int]], source: int) -> Tuple[Dict[int, int], Dict[int, int]]:
        """
        Implements Dijkstra's algorithm to find the shortest paths from the source node to all other nodes.
        
        :param num_nodes: Number of nodes in the graph (nodes are labeled from 0 to num_nodes-1).
        :param edge_list: List of edges in the graph where each edge is represented as [from, to, weight].
        :param source: The source node from which to calculate shortest paths.
        :return: A tuple containing:
                  - shortest_distances: A dictionary mapping each node to its shortest distance from the source.
                  - predecessors: A dictionary mapping each node to its immediate predecessor on the shortest path.
        """
        
        # Initialize adjacency list as a dictionary mapping each node to a list of (neighbor, weight) tuples
        adjacency_list = {node: [] for node in range(num_nodes)}
        
        # Populate the adjacency list with edges
        for from_node, to_node, weight in edge_list:
            adjacency_list[from_node].append((to_node, weight))
        
        print("Adjacency List:", adjacency_list)
        
        # Dictionaries to store the shortest distance to each node and its predecessor
        shortest_distances = {}
        predecessors = {}
        
        # Priority queue to select the next node with the smallest tentative distance
        # Each element is a tuple: (current_distance, node)
        priority_queue = [(0, source)]
        
        while priority_queue:
            current_distance, current_node = heapq.heappop(priority_queue)
            
            # If the node has already been processed, skip it
            if current_node in shortest_distances:
                continue
            
            # Record the shortest distance from source node to the current node
            shortest_distances[current_node] = current_distance
            
            # Iterate through all neighbors of the current node
            for neighbor, edge_weight in adjacency_list[current_node]:
                # If the neighbor hasn't been processed yet
                if neighbor not in shortest_distances:
                    new_distance = current_distance + edge_weight
                    heapq.heappush(priority_queue, (new_distance, neighbor))
                    
                    # Update the predecessor if this path is better
                    if neighbor not in predecessors or new_distance < shortest_distances.get(neighbor, float('inf')):
                        predecessors[neighbor] = current_node
        
        # Assign -1 to nodes that are not reachable from the source
        for node in range(num_nodes):
            if node not in shortest_distances:
                shortest_distances[node] = -1
        
        return shortest_distances, predecessors

    def reconstructPath(self, predecessors: Dict[int, int], source: int, destination: int) -> List[int]:
        """
        Reconstructs the shortest path from the source to the destination node.
        
        :param predecessors: A dictionary mapping each node to its immediate predecessor on the shortest path.
        :param source: The source node.
        :param destination: The destination node.
        :return: A list representing the shortest path from source to destination.
        """
        path = []
        current = destination
        while current != source:
            path.append(current)
            if current in predecessors:
                current = predecessors[current] #backtrack to source node one node at a time
            else:
                # No path exists
                return []
        path.append(source)  #complete the path
        path.reverse() #Reverse the path to get source to destination instead of other way around.
        return path

# Example Usage:
if __name__ == "__main__":
    solution = Solution()
    num_nodes = 7  # Nodes labeled from 0 to 6
    edge_list = [
        [0, 1, 2],  # Node 0 → Node 1 with weight 2
        [0, 2, 5],  # Node 0 → Node 2 with weight 5
        [1, 2, 6],  # Node 1 → Node 2 with weight 6
        [1, 3, 1],  # Node 1 → Node 3 with weight 1
        [1, 4, 6],  # Node 1 → Node 4 with weight 6
        [2, 5, 8],  # Node 2 → Node 5 with weight 8
        [3, 4, 1],  # Node 3 → Node 4 with weight 1
        [4, 6, 9],  # Node 4 → Node 6 with weight 9
        [5, 6, 7]   # Node 5 → Node 6 with weight 7
    ]
    source_node = 0  # Starting node is Node 0
    
    # Execute Dijkstra's algorithm
    shortest_distances, predecessors = solution.shortestPath(num_nodes, edge_list, source_node)
    
    # Display shortest distances from the source
    print("\nShortest Distances from Node 0:")
    for node in range(num_nodes):
        destination = node
        distance = shortest_distances[node]
        print(f"0 -> {destination} = {distance}")
    
    # Reconstruct and display the shortest path from Node 0 to Node 4
    destination_node = 4
    shortest_path_0_to_4 = solution.reconstructPath(predecessors, source_node, destination_node)
    print(f"\nShortest Path from Node 0 to Node {destination_node}:", shortest_path_0_to_4)
    
    # Reconstruct and display the shortest path from Node 0 to Node 6
    destination_node = 6
    shortest_path_0_to_6 = solution.reconstructPath(predecessors, source_node, destination_node)
    print(f"Shortest Path from Node 0 to Node {destination_node}:", shortest_path_0_to_6)

### **5. Implement the Main Loop of Dijkstra's Algorithm**

This loop processes nodes in order of their current shortest known distance, updating distances and predecessors as it explores the graph.

**Steps:**

1. **Loop Condition:** Continue while the priority queue is not empty.
   
2. **Extract Node with Smallest Distance:**
   - Pop the node with the smallest tentative distance from the priority queue.
   
3. **Check if Node is Already Processed:**
   - If the node has already been processed (i.e., its shortest distance is recorded), skip it to avoid redundant work.
   
4. **Record Shortest Distance:**
   - Assign the current distance to the node in the `shortest_distances` dictionary.
   
5. **Explore Neighbors:**
   - Iterate through each neighbor of the current node from the adjacency list.
   
6. **Calculate New Tentative Distance:**
   - For each neighbor, calculate the new tentative distance (`current_distance + edge_weight`).
   
7. **Update Priority Queue and Predecessors:**
   - If the neighbor hasn't been processed:
     - Push the neighbor and its new tentative distance onto the priority queue.
     - If this new distance is better than any previously recorded distance to the neighbor, update the predecessor.

Pseudocode:

while priority_queue is not empty:
    current_distance, current_node = pop node with smallest distance from priority_queue
    
    if current_node is already in shortest_distances:
        continue  # Skip already processed nodes
    
    shortest_distances[current_node] = current_distance
    
    for each neighbor, edge_weight in adjacency_list[current_node]:
        if neighbor not in shortest_distances:
            new_distance = current_distance + edge_weight
            push (new_distance, neighbor) to priority_queue
            
            if neighbor not in predecessors or new_distance < shortest_distances.get(neighbor, ∞):
                predecessors[neighbor] = current_node

7. Reconstruct the Shortest Path

Using the predecessors dictionary, reconstruct the actual shortest path from the source to any destination node.

Steps:

  1. Initialize Path List: Start with the destination node.
  2. Backtrack Using Predecessors:
    • Set current_node to the destination node.
    • While current_node is not the source:
      • Append the current_node to the path list.
      • Update current_node to its predecessor.
      • If at any point current_node is not found in predecessors, return an empty list (no path exists).
  3. Append Source and Reverse Path:
    • Append the source node to the path list.
    • Reverse the path list to get the correct order from source to destination.

Q: Implement Bellman Ford's algorithm.

Ans:

class Graph:
    def __init__(self, vertices):
        self.V = vertices   
        self.graph = []     
        self.nodes = []

    def add_edge(self, src, dest, weight):
        self.graph.append([src, dest, weight])
    
    def addNode(self,value):
        self.nodes.append(value)

    def print_solution(self, dist):
        print("Vertex Distance from Source")
        for key, value in dist.items():
            print('  ' + key, ' :    ', value)
    
    def bellmanFord(self, src):
        dist = {i : float("Inf") for i in self.nodes}
        dist[src] = 0

        for _ in range(self.V-1):
            for src, dest, weight in self.graph:
                if dist[src] != float("Inf") and dist[src] + weight < dist[dest]:
                    dist[dest] = dist[src] + weight
        
        for src, dest, weight in self.graph:
            if dist[src] != float("Inf") and dist[src] + weight < dist[dest]:
                print("Graph contains negative cycle")
                return
        

        self.print_solution(dist)

g = Graph(5)
g.addNode("A")
g.addNode("B")
g.addNode("C")
g.addNode("D")
g.addNode("E")
g.add_edge("A", "C", 6)
g.add_edge("A", "D", 6)
g.add_edge("B", "A", 3)
g.add_edge("C", "D", 1)
g.add_edge("D", "C", 2)
g.add_edge("D", "B", 1)
g.add_edge("E", "B", 4)
g.add_edge("E", "D", 2)
g.bellmanFord("E")
Clone this wiki locally