Skip to content

Added parallel Prim's algorithm #219

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 26, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 53 additions & 0 deletions pydatastructs/graphs/algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from collections import deque as Queue
from concurrent.futures import ThreadPoolExecutor
from pydatastructs.utils import GraphEdge
from pydatastructs.utils.misc_util import _comp
from pydatastructs.miscellaneous_data_structures import (
DisjointSetForest, PriorityQueue)
from pydatastructs.graphs.graph import Graph
Expand Down Expand Up @@ -327,6 +328,58 @@ def _minimum_spanning_tree_parallel_kruskal_adjacency_list(graph, num_threads):
_minimum_spanning_tree_parallel_kruskal_adjacency_matrix = \
_minimum_spanning_tree_parallel_kruskal_adjacency_list

def _find_min(q, v, i):
if not q.is_empty:
v[i] = q.peek
else:
v[i] = None

def _minimum_spanning_tree_parallel_prim_adjacency_list(graph, num_threads):
q = [PriorityQueue(implementation='binomial_heap') for _ in range(num_threads)]
e = [dict() for _ in range(num_threads)]
v2q = dict()
mst = Graph(implementation='adjacency_list')

itr = iter(graph.vertices)
for i in range(len(graph.vertices)):
v2q[next(itr)] = i%len(q)
q[0].push(next(iter(graph.vertices)), 0)

while True:

_vs = [None for _ in range(num_threads)]
with ThreadPoolExecutor(max_workers=num_threads) as Executor:
for i in range(num_threads):
Executor.submit(_find_min, q[i], _vs, i).result()
v = None

for i in range(num_threads):
if _comp(_vs[i], v, lambda u, v: u.key < v.key):
v = _vs[i]
if v is None:
break
v = v.data
idx = v2q[v]
q[idx].pop()

if not hasattr(mst, v):
mst.add_vertex(graph.__getattribute__(v))
if e[idx].get(v, None) is not None:
edge = e[idx][v]
mst.add_vertex(edge.target)
mst.add_edge(edge.source.name, edge.target.name, edge.value)
mst.add_edge(edge.target.name, edge.source.name, edge.value)
for w_node in graph.neighbors(v):
w = w_node.name
vw = graph.edge_weights[v + '_' + w]
j = v2q[w]
q[j].push(w, vw.value)
if e[j].get(w, None) is None or \
e[j][w].value > vw.value:
e[j][w] = vw

return mst

def minimum_spanning_tree_parallel(graph, algorithm, num_threads):
"""
Computes a minimum spanning tree for the given
Expand Down
42 changes: 10 additions & 32 deletions pydatastructs/graphs/tests/test_algorithms.py
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ def bfs_tree(curr_node, next_node, parent):

def test_minimum_spanning_tree():

def _test_minimum_spanning_tree(ds, algorithm):
def _test_minimum_spanning_tree(func, ds, algorithm, *args):
import pydatastructs.utils.misc_util as utils
GraphNode = getattr(utils, "Adjacency" + ds + "GraphNode")
a, b, c, d, e = [GraphNode(x) for x in [0, 1, 2, 3, 4]]
Expand All @@ -140,40 +140,18 @@ def _test_minimum_spanning_tree(ds, algorithm):
graph.add_edge(b.name, d.name, 32)
graph.add_edge(d.name, e.name, 23)
graph.add_edge(e.name, d.name, 23)
mst = minimum_spanning_tree(graph, algorithm=algorithm)
mst = func(graph, algorithm, *args)
expected_mst = [('0_3', 7), ('2_3', 9), ('3_4', 23), ('3_1', 32),
('3_0', 7), ('3_2', 9), ('4_3', 23), ('1_3', 32)]
assert len(expected_mst) == len(mst.edge_weights.items())
for k, v in mst.edge_weights.items():
assert (k, v.value) in expected_mst

_test_minimum_spanning_tree("List", "kruskal")
_test_minimum_spanning_tree("Matrix", "kruskal")
_test_minimum_spanning_tree("List", "prim")

def test_minimum_spanning_tree_parallel():

def _test_minimum_spanning_tree_parallel(ds, algorithm):
import pydatastructs.utils.misc_util as utils
GraphNode = getattr(utils, "Adjacency" + ds + "GraphNode")
a, b, c, d, e = [GraphNode(x) for x in [0, 1, 2, 3, 4]]
graph = Graph(a, b, c, d, e)
graph.add_edge(a.name, c.name, 10)
graph.add_edge(c.name, a.name, 10)
graph.add_edge(a.name, d.name, 7)
graph.add_edge(d.name, a.name, 7)
graph.add_edge(c.name, d.name, 9)
graph.add_edge(d.name, c.name, 9)
graph.add_edge(d.name, b.name, 32)
graph.add_edge(b.name, d.name, 32)
graph.add_edge(d.name, e.name, 23)
graph.add_edge(e.name, d.name, 23)
mst = minimum_spanning_tree_parallel(graph, algorithm, 3)
expected_mst = [('0_3', 7), ('2_3', 9), ('3_4', 23), ('3_1', 32),
('3_0', 7), ('3_2', 9), ('4_3', 23), ('1_3', 32)]
assert len(expected_mst) == len(mst.edge_weights.items())
for k, v in mst.edge_weights.items():
assert (k, v.value) in expected_mst

_test_minimum_spanning_tree_parallel("List", "kruskal")
_test_minimum_spanning_tree_parallel("Matrix", "kruskal")
fmst = minimum_spanning_tree
fmstp = minimum_spanning_tree_parallel
_test_minimum_spanning_tree(fmst, "List", "kruskal")
_test_minimum_spanning_tree(fmst, "Matrix", "kruskal")
_test_minimum_spanning_tree(fmst, "List", "prim")
_test_minimum_spanning_tree(fmstp, "List", "kruskal", 3)
_test_minimum_spanning_tree(fmstp, "Matrix", "kruskal", 3)
_test_minimum_spanning_tree(fmstp, "List", "prim", 3)
41 changes: 35 additions & 6 deletions pydatastructs/miscellaneous_data_structures/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,15 @@ def pop(self):
raise NotImplementedError(
"This is an abstract method.")

@property
def peek(self):
"""
Returns the pointer to the value which will be
popped out by `pop` method.
"""
raise NotImplementedError(
"This is an abstract method.")

@property
def is_empty(self):
"""
Expand All @@ -273,22 +282,32 @@ def __new__(cls, comp):
return obj

def push(self, value, priority):
self.items.append(value, priority)
self.items.append(priority, value)

def pop(self):
_, max_i = self._find_peek(return_index=True)
pop_val = self.items.extract(max_i)
return pop_val.data

def _find_peek(self, return_index=False):
if self.is_empty:
raise IndexError("Priority queue is empty.")

walk = self.items.head
i, max_i, max_p = 0, 0, walk.data
i, max_i, max_p = 0, 0, walk
while walk is not None:
if self.comp(walk.data, max_p):
if self.comp(walk.key, max_p.key):
max_i = i
max_p = walk.data
max_p = walk
i += 1
walk = walk.next
pop_val = self.items.extract(max_i)
return pop_val.key
if return_index:
return max_p, max_i
return max_p

@property
def peek(self):
return self._find_peek()

@property
def is_empty(self):
Expand All @@ -311,6 +330,12 @@ def pop(self):
node = self.items.extract()
return node.data

@property
def peek(self):
if self.items.is_empty:
raise IndexError("Priority queue is empty.")
return self.items.heap[0]

@property
def is_empty(self):
return self.items.is_empty
Expand All @@ -332,6 +357,10 @@ def pop(self):
self.items.delete_minimum()
return node.data

@property
def peek(self):
return self.items.find_minimum()

@property
def is_empty(self):
return self.items.is_empty
13 changes: 8 additions & 5 deletions pydatastructs/miscellaneous_data_structures/tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,11 +69,14 @@ def test_ImplementationPriorityQueue():
impls = ['linked_list', 'binomial_heap', 'binary_heap']
for impl in impls:
pq1 = PriorityQueue(implementation=impl)
pq1.push(1, 2)
pq1.push(1, 4)
pq1.push(2, 3)
pq1.push(3, 4)
assert pq1.pop() == 1
assert pq1.pop() == 2
pq1.push(3, 2)
assert pq1.peek.data == 3
assert pq1.pop() == 3
assert pq1.peek.data == 2
assert pq1.pop() == 2
assert pq1.peek.data == 1
assert pq1.pop() == 1
assert pq1.is_empty is True
assert raises(IndexError, lambda: pq1.pop())
assert raises(IndexError, lambda: pq1.peek)