|
5 | 5 | # TODO: REPLACE COLLECTIONS QUEUE WITH PYDATASTRUCTS QUEUE
|
6 | 6 | from collections import deque as Queue
|
7 | 7 | from pydatastructs.utils.misc_util import AdjacencyListGraphNode
|
| 8 | +from concurrent.futures import ThreadPoolExecutor |
8 | 9 |
|
9 | 10 | __all__ = [
|
10 | 11 | 'breadth_first_search',
|
| 12 | + 'breadth_first_search_parallel' |
11 | 13 | ]
|
12 | 14 |
|
13 | 15 | def breadth_first_search(
|
@@ -90,3 +92,98 @@ def _breadth_first_search_adjacency_list(
|
90 | 92 | return None
|
91 | 93 |
|
92 | 94 | _breadth_first_search_adjacency_matrix = _breadth_first_search_adjacency_list
|
| 95 | + |
| 96 | +def breadth_first_search_parallel( |
| 97 | + graph, source_node, num_threads, operation, *args, **kwargs): |
| 98 | + """ |
| 99 | + Parallel implementation of breadth first search on graphs. |
| 100 | +
|
| 101 | + Parameters |
| 102 | + ========== |
| 103 | +
|
| 104 | + graph: Graph |
| 105 | + The graph on which BFS is to be performed. |
| 106 | + source_node: str |
| 107 | + The name of the source node from where the BFS is |
| 108 | + to be initiated. |
| 109 | + num_threads: int |
| 110 | + Number of threads to be used for computation. |
| 111 | + operation: function |
| 112 | + The function which is to be applied |
| 113 | + on every node when it is visited. |
| 114 | + The prototype which is to be followed is, |
| 115 | + `function_name(curr_node, next_node, |
| 116 | + arg_1, arg_2, . . ., arg_n)`. |
| 117 | + Here, the first two arguments denote, the |
| 118 | + current node and the node next to current node. |
| 119 | + The rest of the arguments are optional and you can |
| 120 | + provide your own stuff there. |
| 121 | +
|
| 122 | + Note |
| 123 | + ==== |
| 124 | +
|
| 125 | + You should pass all the arguments which you are going |
| 126 | + to use in the prototype of your `operation` after |
| 127 | + passing the operation function. |
| 128 | +
|
| 129 | + Examples |
| 130 | + ======== |
| 131 | +
|
| 132 | + >>> from pydatastructs import Graph, AdjacencyListGraphNode |
| 133 | + >>> V1 = AdjacencyListGraphNode("V1") |
| 134 | + >>> V2 = AdjacencyListGraphNode("V2") |
| 135 | + >>> V3 = AdjacencyListGraphNode("V3") |
| 136 | + >>> G = Graph(V1, V2, V3) |
| 137 | + >>> from pydatastructs import breadth_first_search_parallel |
| 138 | + >>> def f(curr_node, next_node, dest_node): |
| 139 | + ... return curr_node != dest_node |
| 140 | + ... |
| 141 | + >>> G.add_edge(V1.name, V2.name) |
| 142 | + >>> G.add_edge(V2.name, V3.name) |
| 143 | + >>> breadth_first_search_parallel(G, V1.name, 3, f, V3.name) |
| 144 | + """ |
| 145 | + import pydatastructs.graphs.algorithms as algorithms |
| 146 | + func = "_breadth_first_search_parallel_" + graph._impl |
| 147 | + if not hasattr(algorithms, func): |
| 148 | + raise NotImplementedError( |
| 149 | + "Currently breadth first search isn't implemented for " |
| 150 | + "%s graphs."%(graph._impl)) |
| 151 | + return getattr(algorithms, func)( |
| 152 | + graph, source_node, num_threads, operation, *args, **kwargs) |
| 153 | + |
| 154 | +def _generate_layer(**kwargs): |
| 155 | + _args, _kwargs = kwargs.get('args'), kwargs.get('kwargs') |
| 156 | + (graph, curr_node, next_layer, visited, operation) = _args[0:5] |
| 157 | + op_args, op_kwargs = _args[5:], _kwargs |
| 158 | + next_nodes = graph.neighbors(curr_node) |
| 159 | + status = True |
| 160 | + if len(next_nodes) != 0: |
| 161 | + for next_node in next_nodes: |
| 162 | + if visited.get(next_node, False) is False: |
| 163 | + status = status and operation(curr_node, next_node.name, *op_args, **op_kwargs) |
| 164 | + next_layer.add(next_node.name) |
| 165 | + visited[next_node.name] = True |
| 166 | + else: |
| 167 | + status = status and operation(curr_node, "", *op_args, **op_kwargs) |
| 168 | + return status |
| 169 | + |
| 170 | +def _breadth_first_search_parallel_adjacency_list( |
| 171 | + graph, source_node, num_threads, operation, *args, **kwargs): |
| 172 | + visited, layers = dict(), dict() |
| 173 | + layers[0] = set() |
| 174 | + layers[0].add(source_node) |
| 175 | + visited[source_node] = True |
| 176 | + layer = 0 |
| 177 | + while len(layers[layer]) != 0: |
| 178 | + layers[layer+1] = set() |
| 179 | + with ThreadPoolExecutor(max_workers=num_threads) as Executor: |
| 180 | + for node in layers[layer]: |
| 181 | + status = Executor.submit( |
| 182 | + _generate_layer, args= |
| 183 | + (graph, node, layers[layer+1], visited, |
| 184 | + operation, *args), kwargs=kwargs).result() |
| 185 | + layer += 1 |
| 186 | + if not status: |
| 187 | + return None |
| 188 | + |
| 189 | +_breadth_first_search_parallel_adjacency_matrix = _breadth_first_search_parallel_adjacency_list |
0 commit comments