Skip to content

Add normalize_chunksize and partition utility functions #47

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Feb 27, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ repos:
additional_dependencies: [tomli]
files: ^(graphblas_algorithms|docs)/
- repo: https://github.com/charliermarsh/ruff-pre-commit
rev: v0.0.249
rev: v0.0.252
hooks:
- id: ruff
- repo: https://github.com/pre-commit/pre-commit-hooks
Expand Down
127 changes: 127 additions & 0 deletions graphblas_algorithms/nxapi/_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
from math import ceil
from numbers import Number

try:
from itertools import pairwise # Added in Python 3.10
except ImportError:

def pairwise(it):
it = iter(it)
for prev in it:
for cur in it:
yield (prev, cur)
prev = cur


BYTES_UNITS = {
"": 1,
"b": 1,
"kb": 1000,
"mb": 1000**2,
"gb": 1000**3,
"tb": 1000**4,
"pb": 1000**5,
"eb": 1000**6,
"zb": 1000**7,
"kib": 1024,
"mib": 1024**2,
"gib": 1024**3,
"tib": 1024**4,
"pib": 1024**5,
"eib": 1024**6,
"zib": 1024**7,
}


def normalize_chunksize(chunksize, itemsize=1, N=None):
if chunksize is None:
return None
if isinstance(chunksize, Number):
rv = int(chunksize)
if rv <= 0 or N is not None and rv >= N:
return None
return rv
if not isinstance(chunksize, str):
raise TypeError(f"chunksize must be a number or a string; got {type(chunksize)}")
chunkstring = chunksize.replace(" ", "").replace("_", "").lower()
if not chunkstring or chunkstring == "all":
return None
for i, c in enumerate(reversed(chunkstring)):
if c.isdigit():
index = len(chunkstring) - i
break
else:
chunkstring = f"1{chunkstring}"
index = 1

prefix = chunkstring[:index]
suffix = chunkstring[index:]

try:
number = float(prefix)
except ValueError as exc:
raise ValueError(
f"Bad chunksize: {chunksize!r}. Could not interpret {prefix!r} as a number."
) from exc

if suffix in {"chunk", "chunks"}:
if number <= 1:
return None
if N is None:
raise TypeError(
f"N argument is required to determine chunksize to split into {int(number)} chunks"
)
rv = ceil(N / number)
else:
scale = BYTES_UNITS.get(suffix)
if scale is None:
raise ValueError(
f"Bad chunksize: {chunksize!r}. Could not interpret {suffix!r} as a bytes unit."
)
number *= scale
if chunkstring[-1] == "b":
number = max(1, number / itemsize)
rv = int(round(number))
if rv <= 0 or N is not None and rv >= N:
return None
return rv


def partition(chunksize, L, *, evenly=True):
"""Partition a list into chunks"""
N = len(L)
if N == 0:
return
chunksize = int(chunksize)
if chunksize <= 0 or chunksize >= N:
yield L
return
if chunksize == 1:
yield from L
return
if evenly:
k = ceil(L / chunksize)
if k * chunksize != N:
yield from split_evenly(k, L)
return
for start, stop in pairwise(range(0, N + chunksize, chunksize)):
yield L[start:stop]


def split_evenly(k, L):
"""Split a list into approximately-equal parts"""
N = len(L)
if N == 0:
return
k = int(k)
if k <= 1:
yield L
return
start = 0
for i in range(1, k):
stop = (N * i + k - 1) // k
if stop != start:
yield L[start:stop]
start = stop
if stop != N:
yield L[stop:]
30 changes: 14 additions & 16 deletions graphblas_algorithms/nxapi/cluster.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@
from graphblas_algorithms.classes.graph import to_undirected_graph
from graphblas_algorithms.utils import not_implemented_for

from ._utils import normalize_chunksize, partition

__all__ = [
"triangles",
"transitivity",
Expand Down Expand Up @@ -90,11 +92,11 @@ def _split(L, k):


# TODO: should this move into algorithms?
def _square_clustering_split(G, node_ids=None, *, nsplits):
def _square_clustering_split(G, node_ids=None, *, chunksize):
if node_ids is None:
node_ids, _ = G._A.reduce_rowwise(monoid.any).to_coo(values=False)
result = None
for chunk_ids in _split(node_ids, nsplits):
for chunk_ids in partition(chunksize, node_ids):
res = algorithms.square_clustering(G, chunk_ids)
if result is None:
result = res
Expand All @@ -103,36 +105,32 @@ def _square_clustering_split(G, node_ids=None, *, nsplits):
return result


def square_clustering(G, nodes=None, *, nsplits="auto"):
# `nsplits` is used to split the computation into chunks.
def square_clustering(G, nodes=None, *, chunksize="256 MiB"):
# `chunksize` is used to split the computation into chunks.
# square_clustering computes `A @ A`, which can get very large, even dense.
# The default `nsplits` is to choose the number so that `Asubset @ A`
# The default `chunksize` is to choose the number so that `Asubset @ A`
# will be about 256 MB if dense.
G = to_undirected_graph(G)
if len(G) == 0:
return {}
if nsplits == "auto":
# TODO: make a utility function for this that can be reused
# Also, should we use `chunksize` instead of `nsplits`?
targetsize = 256 * 1024 * 1024 # 256 MB
nsplits = len(G) ** 2 * G._A.dtype.np_type.itemsize // targetsize
if nsplits <= 1:
nsplits = None

chunksize = normalize_chunksize(chunksize, len(G) * G._A.dtype.np_type.itemsize, len(G))

if nodes is None:
# Should we use this one for subsets of nodes as well?
if nsplits is None:
if chunksize is None:
result = algorithms.square_clustering(G)
else:
result = _square_clustering_split(G, nsplits=nsplits)
result = _square_clustering_split(G, chunksize=chunksize)
return G.vector_to_nodemap(result, fill_value=0)
if nodes in G:
idx = G._key_to_id[nodes]
return algorithms.single_square_clustering(G, idx)
ids = G.list_to_ids(nodes)
if nsplits is None:
if chunksize is None:
result = algorithms.square_clustering(G, ids)
else:
result = _square_clustering_split(G, ids, nsplits=nsplits)
result = _square_clustering_split(G, ids, chunksize=chunksize)
return G.vector_to_nodemap(result)


Expand Down
18 changes: 5 additions & 13 deletions graphblas_algorithms/nxapi/shortest_paths/weighted.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from graphblas_algorithms import algorithms
from graphblas_algorithms.classes.digraph import to_graph

from .._utils import normalize_chunksize, partition
from ..exception import NetworkXUnbounded, NodeNotFound

__all__ = [
Expand All @@ -9,18 +10,14 @@
]


def all_pairs_bellman_ford_path_length(G, weight="weight", *, chunksize="auto"):
def all_pairs_bellman_ford_path_length(G, weight="weight", *, chunksize="10 MiB"):
# Larger chunksize offers more parallelism, but uses more memory.
# Chunksize indicates for how many source nodes to compute at one time.
# The default is to choose the number of rows so the result, if dense,
# will be about 10MB.
G = to_graph(G, weight=weight)
if chunksize == "auto":
# TODO: make a utility function for this that can be reused
targetsize = 10 * 1024 * 1024 # 10 MB
chunksize = max(1, targetsize // (len(G) * G._A.dtype.np_type.itemsize))

if chunksize is None or chunksize <= 0 or chunksize >= len(G):
chunksize = normalize_chunksize(chunksize, len(G) * G._A.dtype.np_type.itemsize, len(G))
if chunksize is None:
# All at once
try:
D = algorithms.bellman_ford_path_lengths(G)
Expand All @@ -35,12 +32,7 @@ def all_pairs_bellman_ford_path_length(G, weight="weight", *, chunksize="auto"):
raise NetworkXUnbounded(*e.args) from e
yield (source, G.vector_to_nodemap(d))
else:
# We should probably make a utility function for chunking
nodes = list(G)
for start, stop in zip(
range(0, len(nodes), chunksize), range(chunksize, len(nodes) + chunksize, chunksize)
):
cur_nodes = nodes[start:stop]
for cur_nodes in partition(chunksize, list(G)):
try:
D = algorithms.bellman_ford_path_lengths(G, cur_nodes)
except algorithms.exceptions.Unbounded as e:
Expand Down
33 changes: 33 additions & 0 deletions graphblas_algorithms/nxapi/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import pytest

from graphblas_algorithms.nxapi._utils import normalize_chunksize


def test_normalize_chunksize():
assert normalize_chunksize(None) is None
assert normalize_chunksize("all") is None
assert normalize_chunksize("") is None
assert normalize_chunksize(-1) is None
assert normalize_chunksize("-1") is None
assert normalize_chunksize(10, N=10) is None
assert normalize_chunksize("1 MB", N=100) is None
assert normalize_chunksize("1 chunk") is None
assert normalize_chunksize("2 chunks", N=20) == 10
assert normalize_chunksize(10) == 10
assert normalize_chunksize(10.0) == 10
assert normalize_chunksize("10") == 10
assert normalize_chunksize("10.0") == 10
assert normalize_chunksize("1_0 B") == 10
assert normalize_chunksize("1e1") == 10
assert normalize_chunksize("1e-2 kb") == 10
assert normalize_chunksize("Mb") == 1000**2
assert normalize_chunksize(" mb") == 1000**2
assert normalize_chunksize("gib") == 1024**3
with pytest.raises(TypeError, match="chunksize must be"):
normalize_chunksize(object())
with pytest.raises(ValueError, match="as a bytes"):
normalize_chunksize("10 badbytes")
with pytest.raises(ValueError, match="as a number"):
normalize_chunksize("1bad0 TB")
with pytest.raises(TypeError, match="N argument is required"):
normalize_chunksize("10 chunks")
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,7 @@ ignore = [
"PLR0913", # Too many arguments to function call
"PLR0915", # Too many statements
"PLR2004", # Magic number used in comparison, consider replacing magic with a constant variable
"PLW2901", # Outer for loop variable ... overwritten by inner assignment target (Note: good advice, but too strict)
"RET502", # Do not implicitly `return None` in function able to return non-`None` value
"RET503", # Missing explicit `return` at the end of function able to return non-`None` value
"RET504", # Unnecessary variable assignment before `return` statement
Expand Down