Skip to content

Graph Algorithms

Praveen Kumar Anwla edited this page Dec 15, 2024 · 9 revisions

#Q1: Implement a Topological sort algorithm.

Ans:

from collections import defaultdict

class Graph:
    def __init__(self, number_of_vertices):
        """Initialize a graph with a given number of vertices."""
        self.graph = defaultdict(list)
        self.number_of_vertices = number_of_vertices
    
    def add_edge(self, from_vertex, to_vertex):
        """Add a directed edge from 'from_vertex' to 'to_vertex'."""
        self.graph[from_vertex].append(to_vertex)
    
    def topological_sort_util(self, vertex, visited, stack):
        """A recursive helper function to perform DFS and find topological order."""
        visited.add(vertex)
        
        for neighbor in self.graph[vertex]:
            if neighbor not in visited:
                self.topological_sort_util(neighbor, visited, stack)
        
        # Add the vertex to the stack after exploring all its neighbors
        stack.insert(0, vertex)
    
    def topological_sort(self):
        """Print the topological ordering of the vertices."""
        visited = set()
        stack = []
        
        # Call the recursive helper for every vertex not yet visited
        for vertex in self.graph:
            if vertex not in visited:
                self.topological_sort_util(vertex, visited, stack)
        
        # Reverse the stack (stack.reverse() ) to get the correct topological order
        # if you're using stack.append(vertex) instead.
        print(stack)


# Example usage:
custom_graph = Graph(8)
custom_graph.add_edge("A", "C")
custom_graph.add_edge("C", "E")
custom_graph.add_edge("E", "H")
custom_graph.add_edge("E", "F")
custom_graph.add_edge("F", "G")
custom_graph.add_edge("B", "D")
custom_graph.add_edge("B", "C")
custom_graph.add_edge("D", "F")

custom_graph.topological_sort()

#Q2: Single source shortest path. Ans:

from collections import deque

class Graph:
    def __init__(self, adjacency_dict=None):
        """
        Initialize the graph.
        
        :param adjacency_dict: A dictionary representing the graph, 
                               where each key is a node and the value
                               is a list of its neighbors.
        """
        # If no adjacency dictionary is given, use an empty dict
        self.gdict = adjacency_dict if adjacency_dict is not None else {}
    
    def bfs_path(self, start, end):
        """
        Find a path from 'start' to 'end' using Breadth-First Search (BFS).
        
        :param start: The starting node.
        :param end: The target node we want to reach.
        :return: A list of nodes representing the path from start to end,
                 or None if no path exists.
        """
        
        # A queue to store paths we need to explore.
        # Each element in the queue will be a list representing a path.
        queue = deque()
        
        # Initially, the only path we know is from 'start' to itself.
        queue.append([start])
        
        # Keep track of visited nodes to avoid cycles and redundant searches
        visited = set([start])
        
        # Loop until we have no more paths to explore
        while queue:
            # Get the next path from the queue
            current_path = queue.popleft()
            
            # The last node in the path is the one we're currently exploring
            current_node = current_path[-1]
            
            # If this node is the target, we found a path!
            if current_node == end:
                return current_path
            
            # Otherwise, explore all neighbors of the current node
            # and add new paths for each neighbor to the queue.
            for neighbor in self.gdict.get(current_node, []):
                if neighbor not in visited:
                    visited.add(neighbor)
                    # Create a new path by extending the current path
                    new_path = current_path + [neighbor]
                    queue.append(new_path)
        
        # If we exit the loop, it means we didn't find a path to 'end'
        return None


# Example usage:
customDict = { 
    "a": ["b", "c"],
    "b": ["d", "g"],
    "c": ["d", "e"],
    "d": ["f"],
    "e": ["f"],
    "g": ["f"]
}

graph = Graph(customDict)
result = graph.bfs_path("a", "z")
print(result)  # This will print None because there is no path to 'z'

#Q3: Implement Dijkastra's Algorithm. Ans:

# Implement Dijkstra's algorithm

import heapq
from collections import defaultdict
from typing import List, Dict, Tuple

class Solution:
    def shortestPath(self, num_nodes: int, edge_list: List[List[int]], source: int) -> Tuple[Dict[int, int], Dict[int, int]]:
        """
        Implements Dijkstra's algorithm to find the shortest paths from the source node to all other nodes.
        
        :param num_nodes: Number of nodes in the graph (nodes are labeled from 0 to num_nodes-1).
        :param edge_list: List of edges in the graph where each edge is represented as [from, to, weight].
        :param source: The source node from which to calculate shortest paths.
        :return: A tuple containing:
                  - shortest_distances: A dictionary mapping each node to its shortest distance from the source.
                  - predecessors: A dictionary mapping each node to its immediate predecessor on the shortest path.
        """
        
        # Initialize adjacency list as a dictionary mapping each node to a list of (neighbor, weight) tuples
        adjacency_list = {node: [] for node in range(num_nodes)}
        
        # Populate the adjacency list with edges
        for from_node, to_node, weight in edge_list:
            adjacency_list[from_node].append((to_node, weight))
        
        print("Adjacency List:", adjacency_list)
        
        # Dictionaries to store the shortest distance to each node and its predecessor
        shortest_distances = {}
        predecessors = {}
        
        # Priority queue to select the next node with the smallest tentative distance
        # Each element is a tuple: (current_distance, node)
        priority_queue = [(0, source)]
        
        while priority_queue:
            current_distance, current_node = heapq.heappop(priority_queue)
            
            # If the node has already been processed, skip it
            if current_node in shortest_distances:
                continue
            
            # Record the shortest distance from source node to the current node
            shortest_distances[current_node] = current_distance
            
            # Iterate through all neighbors of the current node
            for neighbor, edge_weight in adjacency_list[current_node]:
                # If the neighbor hasn't been processed yet
                if neighbor not in shortest_distances:
                    new_distance = current_distance + edge_weight
                    heapq.heappush(priority_queue, (new_distance, neighbor))
                    
                    # Update the predecessor if this path is better
                    if neighbor not in predecessors or new_distance < shortest_distances.get(neighbor, float('inf')):
                        predecessors[neighbor] = current_node
        
        # Assign -1 to nodes that are not reachable from the source
        for node in range(num_nodes):
            if node not in shortest_distances:
                shortest_distances[node] = -1
        
        return shortest_distances, predecessors

    def reconstructPath(self, predecessors: Dict[int, int], source: int, destination: int) -> List[int]:
        """
        Reconstructs the shortest path from the source to the destination node.
        
        :param predecessors: A dictionary mapping each node to its immediate predecessor on the shortest path.
        :param source: The source node.
        :param destination: The destination node.
        :return: A list representing the shortest path from source to destination.
        """
        path = []
        current = destination
        while current != source:
            path.append(current)
            if current in predecessors:
                current = predecessors[current]
            else:
                # No path exists
                return []
        path.append(source)
        path.reverse()
        return path

# Example Usage:
if __name__ == "__main__":
    solution = Solution()
    num_nodes = 7  # Nodes labeled from 0 to 6
    edge_list = [
        [0, 1, 2],  # Node 0 → Node 1 with weight 2
        [0, 2, 5],  # Node 0 → Node 2 with weight 5
        [1, 2, 6],  # Node 1 → Node 2 with weight 6
        [1, 3, 1],  # Node 1 → Node 3 with weight 1
        [1, 4, 6],  # Node 1 → Node 4 with weight 6
        [2, 5, 8],  # Node 2 → Node 5 with weight 8
        [3, 4, 1],  # Node 3 → Node 4 with weight 1
        [4, 6, 9],  # Node 4 → Node 6 with weight 9
        [5, 6, 7]   # Node 5 → Node 6 with weight 7
    ]
    source_node = 0  # Starting node is Node 0
    
    # Execute Dijkstra's algorithm
    shortest_distances, predecessors = solution.shortestPath(num_nodes, edge_list, source_node)
    
    # Display shortest distances from the source
    print("\nShortest Distances from Node 0:")
    for node in range(num_nodes):
        destination = node
        distance = shortest_distances[node]
        print(f"0 -> {destination} = {distance}")
    
    # Reconstruct and display the shortest path from Node 0 to Node 4
    destination_node = 4
    shortest_path_0_to_4 = solution.reconstructPath(predecessors, source_node, destination_node)
    print(f"\nShortest Path from Node 0 to Node {destination_node}:", shortest_path_0_to_4)
    
    # Reconstruct and display the shortest path from Node 0 to Node 6
    destination_node = 6
    shortest_path_0_to_6 = solution.reconstructPath(predecessors, source_node, destination_node)
    print(f"Shortest Path from Node 0 to Node {destination_node}:", shortest_path_0_to_6)
Clone this wiki locally