From 89683d3c738d44afc0392285da54d25cb813a76f Mon Sep 17 00:00:00 2001 From: Nils Husung Date: Sat, 25 May 2024 10:18:26 +0200 Subject: [PATCH] Add Python bindings for substitute Switch back to protocols such that we can require multiple protocols (as long as there are no [intersection types](https://github.com/python/typing/issues/213) and `TypeVar`s cannot have multiple bounds). --- bindings/python/oxidd/__init__.py | 4 +- bindings/python/oxidd/bcdd.py | 47 +++++- bindings/python/oxidd/bdd.py | 47 +++++- .../python/oxidd/{abc.py => protocols.py} | 157 +++++++++++------- .../oxidd/tests/test_boolean_function.py | 102 +++++++++--- bindings/python/oxidd/zbdd.py | 6 +- 6 files changed, 270 insertions(+), 93 deletions(-) rename bindings/python/oxidd/{abc.py => protocols.py} (82%) diff --git a/bindings/python/oxidd/__init__.py b/bindings/python/oxidd/__init__.py index 4fe7e95..651907b 100644 --- a/bindings/python/oxidd/__init__.py +++ b/bindings/python/oxidd/__init__.py @@ -5,7 +5,7 @@ import importlib.metadata -__all__ = ["bcdd", "bdd", "abc", "zbdd"] +__all__ = ["bcdd", "bdd", "protocols", "zbdd"] __version__ = importlib.metadata.version("oxidd") -from . import abc, bcdd, bdd, zbdd +from . import bcdd, bdd, protocols, zbdd diff --git a/bindings/python/oxidd/bcdd.py b/bindings/python/oxidd/bcdd.py index 8c5e426..f85e651 100644 --- a/bindings/python/oxidd/bcdd.py +++ b/bindings/python/oxidd/bcdd.py @@ -2,17 +2,17 @@ __all__ = ["BCDDManager", "BCDDFunction"] -from collections.abc import Collection +import collections.abc from typing import Optional from _oxidd import ffi as _ffi from _oxidd import lib as _lib from typing_extensions import Never, Self, override -from . import abc, util +from . import protocols, util -class BCDDManager(abc.BooleanFunctionManager): +class BCDDManager(protocols.BooleanFunctionManager["BCDDFunction"]): """Manager for binary decision diagrams with complement edges""" _mgr: ... #: Wrapped FFI object @@ -64,7 +64,31 @@ def num_inner_nodes(self) -> int: return _lib.oxidd_bcdd_num_inner_nodes(self._mgr) -class BCDDFunction(abc.BooleanFunctionQuant[BCDDManager]): +class BCDDSubstitution: + """Substitution mapping variables to replacement functions""" + + _subst: ... #: Wrapped FFI object (``oxidd_bcdd_substitution_t *``) + + def __init__( + self, pairs: collections.abc.Iterable[tuple["BCDDFunction", "BCDDFunction"]] + ): + """Create a new substitution object for BCDDs + + See :meth:`abc.FunctionSubst.make_substitution` fore more details. + """ + self._subst = _lib.oxidd_bcdd_substitution_new( + len(pairs) if isinstance(pairs, collections.abc.Sized) else 0 + ) + for v, r in pairs: + _lib.oxidd_bcdd_substitution_add_pair(self._subst, v._func, r._func) + + def __del__(self): + _lib.oxidd_bcdd_substitution_free(self._subst) + + +class BCDDFunction( + protocols.BooleanFunctionQuant, protocols.FunctionSubst[BCDDSubstitution] +): """Boolean function represented as a binary decision diagram with complement edges (BCDD) @@ -233,6 +257,19 @@ def ite(self, t: Self, e: Self) -> Self: _lib.oxidd_bcdd_ite(self._func, t._func, e._func) ) + @override + @classmethod + def make_substitution( + cls, pairs: collections.abc.Iterable[tuple[Self, Self]] + ) -> BCDDSubstitution: + return BCDDSubstitution(pairs) + + @override + def substitute(self, substitution: BCDDSubstitution) -> Self: + return self.__class__._from_raw( + _lib.oxidd_bcdd_substitute(self._func, substitution._subst) + ) + @override def forall(self, vars: Self) -> Self: assert ( @@ -276,7 +313,7 @@ def pick_cube(self) -> Optional[util.Assignment]: return util.Assignment._from_raw(raw) if raw.len > 0 else None @override - def eval(self, args: Collection[tuple[Self, bool]]) -> bool: + def eval(self, args: collections.abc.Collection[tuple[Self, bool]]) -> bool: n = len(args) c_args = util._alloc("oxidd_bcdd_bool_pair_t[]", n) for c_arg, (f, v) in zip(c_args, args): diff --git a/bindings/python/oxidd/bdd.py b/bindings/python/oxidd/bdd.py index 520a196..2c1a9e9 100644 --- a/bindings/python/oxidd/bdd.py +++ b/bindings/python/oxidd/bdd.py @@ -2,17 +2,17 @@ __all__ = ["BDDManager", "BDDFunction"] -from collections.abc import Collection +import collections.abc from typing import Optional from _oxidd import ffi as _ffi from _oxidd import lib as _lib from typing_extensions import Never, Self, override -from . import abc, util +from . import protocols, util -class BDDManager(abc.BooleanFunctionManager): +class BDDManager(protocols.BooleanFunctionManager["BDDFunction"]): """Manager for binary decision diagrams (without complement edges)""" _mgr: ... #: Wrapped FFI object @@ -64,7 +64,31 @@ def num_inner_nodes(self) -> int: return _lib.oxidd_bdd_num_inner_nodes(self._mgr) -class BDDFunction(abc.BooleanFunctionQuant[BDDManager]): +class BDDSubstitution: + """Substitution mapping variables to replacement functions""" + + _subst: ... #: Wrapped FFI object (``oxidd_bdd_substitution_t *``) + + def __init__( + self, pairs: collections.abc.Iterable[tuple["BDDFunction", "BDDFunction"]] + ): + """Create a new substitution object for BDDs + + See :meth:`abc.FunctionSubst.make_substitution` fore more details. + """ + self._subst = _lib.oxidd_bdd_substitution_new( + len(pairs) if isinstance(pairs, collections.abc.Sized) else 0 + ) + for v, r in pairs: + _lib.oxidd_bdd_substitution_add_pair(self._subst, v._func, r._func) + + def __del__(self): + _lib.oxidd_bdd_substitution_free(self._subst) + + +class BDDFunction( + protocols.BooleanFunctionQuant, protocols.FunctionSubst[BDDSubstitution] +): """Boolean function represented as a simple binary decision diagram (BDD) All operations constructing BDDs may throw a :exc:`MemoryError` in case they @@ -231,6 +255,19 @@ def ite(self, t: Self, e: Self) -> Self: _lib.oxidd_bdd_ite(self._func, t._func, e._func) ) + @override + @classmethod + def make_substitution( + cls, pairs: collections.abc.Iterable[tuple[Self, Self]] + ) -> BDDSubstitution: + return BDDSubstitution(pairs) + + @override + def substitute(self, substitution: BDDSubstitution) -> Self: + return self.__class__._from_raw( + _lib.oxidd_bdd_substitute(self._func, substitution._subst) + ) + @override def forall(self, vars: Self) -> Self: assert ( @@ -274,7 +311,7 @@ def pick_cube(self) -> Optional[util.Assignment]: return util.Assignment._from_raw(raw) if raw.len > 0 else None @override - def eval(self, args: Collection[tuple[Self, bool]]) -> bool: + def eval(self, args: collections.abc.Collection[tuple[Self, bool]]) -> bool: n = len(args) c_args = util._alloc("oxidd_bdd_bool_pair_t[]", n) for c_arg, (f, v) in zip(c_args, args): diff --git a/bindings/python/oxidd/abc.py b/bindings/python/oxidd/protocols.py similarity index 82% rename from bindings/python/oxidd/abc.py rename to bindings/python/oxidd/protocols.py index 9287d2e..ebdd26f 100644 --- a/bindings/python/oxidd/abc.py +++ b/bindings/python/oxidd/protocols.py @@ -1,5 +1,5 @@ """ -The abstract base classes declared in this module allow abstracting away the +The protocols classes declared in this module allow abstracting away the concrete decision diagram kind in a type-safe fashion. """ @@ -7,97 +7,87 @@ "Manager", "BooleanFunctionManager", "Function", + "FunctionSubst", "BooleanFunction", "BooleanFunctionQuant", ] -from abc import ABC, abstractmethod -from collections.abc import Collection -from typing import Generic, Optional, TypeVar +import collections.abc +from abc import abstractmethod +from typing import Generic, Optional, Protocol, TypeVar from typing_extensions import Self from .util import Assignment -class Manager(ABC): - """Manager storing nodes and ensuring their uniqueness - - A manager is the data structure responsible for storing nodes and ensuring - their uniqueness. It also defines the variable order. +class Function(Protocol): + """Function represented as decision diagram - Implementations supporting concurrency have an internal read/write lock. - Many operations acquire this lock for reading (shared) or writing - (exclusive). + A function is the combination of a reference to a :class:`Manager` and a + (possibly tagged) edge pointing to a node. Obtaining the manager reference + is possible via the :attr:`manager` property. """ + @property @abstractmethod - def num_inner_nodes(self) -> int: - """Get the number of inner nodes - - Acquires the manager's lock for shared access. - """ - raise NotImplementedError - - -class BooleanFunctionManager(Manager): - """Manager whose nodes represent Boolean functions""" - - @abstractmethod - def new_var(self) -> "BooleanFunction[Self]": - """Get a fresh variable, i.e., a function that is true if and only if - the variable is true. This adds a new level to a decision diagram. - - Acquires the manager's lock for exclusive access. - """ + def manager(self) -> "Manager[Self]": + """The associated manager""" raise NotImplementedError @abstractmethod - def true(self) -> "BooleanFunction[Self]": - """Get the constant true Boolean function ``⊤`` - - Acquires the manager's lock for shared access. - """ - raise NotImplementedError + def node_count(self) -> int: + """Get the number of descendant nodes - @abstractmethod - def false(self) -> "BooleanFunction[Self]": - """Get the constant false Boolean function ``⊥`` + The returned number includes the root node itself and terminal nodes. Acquires the manager's lock for shared access. """ raise NotImplementedError -M = TypeVar("M", covariant=True, bound=Manager) - +S = TypeVar("S") -class Function(ABC, Generic[M]): - """Function represented as decision diagram - A function is the combination of a reference to a :class:`Manager` and a - (possibly tagged) edge pointing to a node. Obtaining the manager reference - is possible via the :attr:`manager` property. - """ +class FunctionSubst(Function, Generic[S], Protocol): + """Substitution extension for :class:`Function`""" - @property + @classmethod @abstractmethod - def manager(self) -> M: - """The associated manager""" + def make_substitution(cls, pairs: collections.abc.Iterable[tuple[Self, Self]]) -> S: + """Create a new substitution object from a collection of pairs + ``(var, replacement)`` + + The intent behind substitution objects is to optimize the case where the + same substitution is applied multiple times. We would like to re-use + apply cache entries across these operations, and therefore, we need a + compact identifier for the substitution (provided by the returned + substitution object). + + All variables of must be distinct. Furthermore, variables must be + handles for the respective decision diagram levels, e.g., the respective + Boolean function for B(C)DDs, and a singleton set for ZBDDs. The order + of the pairs is irrelevant. + """ raise NotImplementedError @abstractmethod - def node_count(self) -> int: - """Get the number of descendant nodes + def substitute(self, substitution: S) -> Self: + """Substitute variables in ``self`` according to ``substitution``, which + can be created using :meth:`make_substitution` - The returned number includes the root node itself and terminal nodes. + The substitution is performed in a parallel fashion, e.g.: + ``(¬x ∧ ¬y)[x ↦ ¬x ∧ ¬y, y ↦ ⊥] = ¬(¬x ∧ ¬y) ∧ ¬⊥ = x ∨ y`` Acquires the manager's lock for shared access. + + ``self`` and all functions in ``substitution`` must belong to the same + manager. """ raise NotImplementedError -class BooleanFunction(Function[M]): +class BooleanFunction(Function, Protocol): """Boolean function represented as decision diagram""" @abstractmethod @@ -292,7 +282,7 @@ def pick_cube(self) -> Optional[Assignment]: raise NotImplementedError @abstractmethod - def eval(self, args: Collection[tuple[Self, bool]]) -> bool: + def eval(self, args: collections.abc.Collection[tuple[Self, bool]]) -> bool: """Evaluate this Boolean function with arguments ``args`` ``args`` determines the valuation for all variables. Missing values are @@ -306,7 +296,7 @@ def eval(self, args: Collection[tuple[Self, bool]]) -> bool: raise NotImplementedError -class BooleanFunctionQuant(BooleanFunction[M]): +class BooleanFunctionQuant(BooleanFunction, Protocol): """Quantification extension for :class:`BooleanFunction`""" @abstractmethod @@ -360,3 +350,58 @@ def unique(self, vars: Self) -> Self: Acquires the manager's lock for shared access. """ # noqa: E501 raise NotImplementedError + + +F = TypeVar("F", bound=Function, covariant=True) + + +class Manager(Generic[F], Protocol): + """Manager storing nodes and ensuring their uniqueness + + A manager is the data structure responsible for storing nodes and ensuring + their uniqueness. It also defines the variable order. + + Implementations supporting concurrency have an internal read/write lock. + Many operations acquire this lock for reading (shared) or writing + (exclusive). + """ + + @abstractmethod + def num_inner_nodes(self) -> int: + """Get the number of inner nodes + + Acquires the manager's lock for shared access. + """ + raise NotImplementedError + + +BF = TypeVar("BF", bound=BooleanFunction, covariant=True) + + +class BooleanFunctionManager(Manager[BF], Protocol): + """Manager whose nodes represent Boolean functions""" + + @abstractmethod + def new_var(self) -> BF: + """Get a fresh variable, i.e., a function that is true if and only if + the variable is true. This adds a new level to a decision diagram. + + Acquires the manager's lock for exclusive access. + """ + raise NotImplementedError + + @abstractmethod + def true(self) -> BF: + """Get the constant true Boolean function ``⊤`` + + Acquires the manager's lock for shared access. + """ + raise NotImplementedError + + @abstractmethod + def false(self) -> BF: + """Get the constant false Boolean function ``⊥`` + + Acquires the manager's lock for shared access. + """ + raise NotImplementedError diff --git a/bindings/python/oxidd/tests/test_boolean_function.py b/bindings/python/oxidd/tests/test_boolean_function.py index 810a1ac..911ac27 100644 --- a/bindings/python/oxidd/tests/test_boolean_function.py +++ b/bindings/python/oxidd/tests/test_boolean_function.py @@ -1,13 +1,23 @@ from collections.abc import Sequence -from typing import Generic, TypeVar +from typing import Generic, Optional, Protocol, TypeVar import oxidd -from oxidd.abc import BooleanFunction, BooleanFunctionManager, BooleanFunctionQuant +from oxidd.protocols import ( + BooleanFunction, + BooleanFunctionManager, + BooleanFunctionQuant, + FunctionSubst, +) -# spell-checker:ignore nvars +# spell-checker:ignore nvars,BFQS -M = TypeVar("M", bound=BooleanFunctionManager) -T = TypeVar("T") + +class BooleanFunctionQuantSubst(BooleanFunctionQuant, FunctionSubst, Protocol): + pass + + +BF = TypeVar("BF", bound=BooleanFunction) +BFQS = TypeVar("BFQS", bound=BooleanFunctionQuantSubst) def bit_count(x: int) -> int: @@ -22,26 +32,32 @@ def bit_count(x: int) -> int: return b -class AllBooleanFunctions(Generic[M]): +class AllBooleanFunctions(Generic[BF]): """Python translation of ``TestAllBooleanFunctions`` from ``crates/oxidd/tests/boolean_function.rs``""" - _mgr: M - _vars: Sequence[BooleanFunction[M]] + _mgr: BooleanFunctionManager[BF] + _vars: Sequence[BF] + _var_handles: Sequence[BF] #: stores all possible Boolean functions with `len(vars)` variables - _boolean_functions: list[BooleanFunction[M]] - _dd_to_boolean_func: dict[BooleanFunction[M], int] + _boolean_functions: list[BF] + _dd_to_boolean_func: dict[BF, int] def __init__( self, - manager: M, - vars: Sequence[BooleanFunction[M]], - var_handles: Sequence[BooleanFunction[M]], + manager: BooleanFunctionManager[BF], + vars: Sequence[BF], + var_handles: Sequence[BF], ): + """Initialize the test, generating DDs for all Boolean functions for the + given variable set. ``vars`` are the Boolean functions representing the + variables identified by ``var_handles``. For BDDs, the two coincide, but + not for ZBDDs.""" assert len(vars) == len(var_handles) self._mgr = manager self._vars = vars + self._var_handles = var_handles self._boolean_functions = [] self._dd_to_boolean_func = {} @@ -82,7 +98,7 @@ def __init__( self._dd_to_boolean_func[f] = explicit_f def basic(self): - """Test basic operations on all Boolean function with the given variable set""" + """Test basic operations on all Boolean function""" nvars = len(self._vars) num_assignments = 1 << nvars @@ -157,9 +173,54 @@ def basic(self): actual = self._dd_to_boolean_func[f.ite(g, h)] assert actual == expected + +class AllBooleanFunctionsQuantSubst(AllBooleanFunctions[BFQS]): + def _subst_rec(self, replacements: list[Optional[int]], current_var: int): + assert len(replacements) == len(self._vars) + if current_var < len(self._vars): + replacements[current_var] = None + self._subst_rec(replacements, current_var + 1) + for f in range(0, len(self._boolean_functions)): + replacements[current_var] = f + self._subst_rec(replacements, current_var + 1) + else: + nvars = len(self._vars) + num_assignments = 1 << nvars + + subst = self._vars[0].make_substitution( + ( + (self._var_handles[i], self._boolean_functions[repl]) + for i, repl in enumerate(replacements) + if repl is not None + ) + ) + + for f_explicit, f in enumerate(self._boolean_functions): + expected = 0 + # To compute the expected truth table, we first compute a + # mapped assignment that we look up in the truth table for `f` + for assignment in range(num_assignments): + mapped_assignment = 0 + for var, repl in enumerate(replacements): + val = ( + # replacement function evaluated for `assignment` + repl >> assignment + if repl is not None + # `var` is set in `assignment`? + else assignment >> var + ) & 1 + mapped_assignment |= val << var + expected |= ((f_explicit >> mapped_assignment) & 1) << assignment + + actual = self._dd_to_boolean_func[f.substitute(subst)] + assert actual == expected + + def subst(self): + """Test all possible substitutions""" + self._subst_rec([None] * len(self._vars), 0) + def quant(self): - """Test quantification operations on all Boolean function with the given - variable set""" + """Test quantification operations on all Boolean function""" nvars = len(self._vars) num_assignments = 1 << nvars @@ -188,20 +249,17 @@ def assignment_to_mask(assignment: int, var_set: int) -> int: # quantification for var_set in range(num_assignments): dd_var_set = self._mgr.true() + for i in range(nvars): if (var_set & (1 << i)) != 0: dd_var_set &= self._vars[i] - assert isinstance(dd_var_set, BooleanFunctionQuant) - # precompute `assignment_to_mask` assignment_to_mask_pc = [ assignment_to_mask(a, var_set) for a in range(num_assignments) ] for f_explicit, f in enumerate(self._boolean_functions): - assert isinstance(f, BooleanFunctionQuant) - exist_expected = 0 forall_expected = 0 unique_expected = 0 @@ -232,7 +290,7 @@ def assignment_to_mask(assignment: int, var_set: int) -> int: def test_bdd_all_boolean_functions_2vars_t1(): mgr = oxidd.bdd.BDDManager(1024, 1024, 1) vars = [mgr.new_var() for _ in range(2)] - test = AllBooleanFunctions(mgr, vars, vars) + test = AllBooleanFunctionsQuantSubst(mgr, vars, vars) test.basic() test.quant() @@ -240,7 +298,7 @@ def test_bdd_all_boolean_functions_2vars_t1(): def test_bcdd_all_boolean_functions_2vars_t1(): mgr = oxidd.bcdd.BCDDManager(1024, 1024, 1) vars = [mgr.new_var() for _ in range(2)] - test = AllBooleanFunctions(mgr, vars, vars) + test = AllBooleanFunctionsQuantSubst(mgr, vars, vars) test.basic() test.quant() diff --git a/bindings/python/oxidd/zbdd.py b/bindings/python/oxidd/zbdd.py index 95740d6..83cdc7e 100644 --- a/bindings/python/oxidd/zbdd.py +++ b/bindings/python/oxidd/zbdd.py @@ -9,10 +9,10 @@ from _oxidd import lib as _lib from typing_extensions import Never, Self, override -from . import abc, util +from . import protocols, util -class ZBDDManager(abc.BooleanFunctionManager): +class ZBDDManager(protocols.BooleanFunctionManager["ZBDDFunction"]): """Manager for zero-suppressed binary decision diagrams""" _mgr: ... #: Wrapped FFI object @@ -87,7 +87,7 @@ def num_inner_nodes(self) -> int: return _lib.oxidd_zbdd_num_inner_nodes(self._mgr) -class ZBDDFunction(abc.BooleanFunction[ZBDDManager]): +class ZBDDFunction(protocols.BooleanFunction): """Boolean function 𝔹ⁿ → 𝔹 (or set of Boolean vectors 𝔹ⁿ) represented as zero-suppressed binary decision diagram