diff --git a/baynet/parameters.py b/baynet/parameters.py index 652a802..097b31a 100644 --- a/baynet/parameters.py +++ b/baynet/parameters.py @@ -1,5 +1,5 @@ """Parameter tables for Graph objects.""" -from typing import List, Tuple, Union +from typing import List, Tuple, Optional, Dict, Any import numpy as np import igraph @@ -7,32 +7,43 @@ class ConditionalProbabilityTable: """Conditional probability table for categorical data.""" - def __init__(self, node: igraph.Vertex) -> None: + def __init__(self, node: Optional[igraph.Vertex] = None) -> None: """Initialise a conditional probability table.""" + if node is None: + return self._scaled = False - # sorted_parents = sorted(node.neighbors(mode="in"), key = lambda x: x['name']) - # print(sorted_parents) - parent_levels = [v['levels'] for v in node.neighbors(mode="in")] - self._n_parents = len(parent_levels) - self.parents = np.array([parent.index for parent in node.neighbors(mode="in")], dtype=int) - self.parent_names = [parent['name'] for parent in node.neighbors(mode="in")] - if any([pl is None for pl in parent_levels]): + self.parent_levels = [v['levels'] for v in node.neighbors(mode="in")] + if any([pl is None for pl in self.parent_levels]): raise ValueError(f"Parent of {node['name']} missing attribute 'levels'") + self.n_parent_levels = [len(v['levels']) for v in node.neighbors(mode="in")] + self.parents = np.array([parent.index for parent in node.neighbors(mode="in")], dtype=int) node_levels = node['levels'] if node_levels is None: raise ValueError(f"Node {node['name']} missing attribute 'levels'") - - self.array = np.zeros([*parent_levels, node_levels], dtype=float) - self._levels = node_levels - - def __getitem__(self, indexer: Union[int, Tuple[int, ...]]) -> np.ndarray: - """ - Return CPT row corresponding to given indexer. - - Wraps the stored array's __getitem___. - """ - return self.array[indexer] + self.levels = node_levels + self.n_levels = len(node_levels) + + self.array = np.zeros([*self.n_parent_levels, len(node_levels)], dtype=float) + self.cumsum_array = np.zeros([*self.n_parent_levels, len(node_levels)], dtype=float) + + @classmethod + def from_dict(cls, **kwargs: Dict[str, Any]) -> 'ConditionalProbabilityTable': + """Instantiate from dict generated by `to_dict()`.""" + cpd = cls() + kwargs['array'] = np.array(kwargs['array']) + kwargs['parents'] = np.array(kwargs['parents']) + kwargs['cumsum_array'] = np.array(kwargs['cumsum_array']) + cpd.__dict__.update(**kwargs) + return cpd + + def to_dict(self) -> Dict[str, Any]: + """Generate dictionary representation for serialisation.""" + kwargs = self.__dict__.copy() + kwargs['array'] = self.array.tolist() + kwargs['parents'] = self.parents.tolist() + kwargs['cumsum_array'] = self.cumsum_array.tolist() + return kwargs def rescale_probabilities(self) -> None: """ @@ -44,23 +55,35 @@ def rescale_probabilities(self) -> None: """ # Anywhere with sum(probs) == 0, we set to all 1 prior to scaling self.array[self.array.sum(axis=-1) == 0] = 1 + self.array = np.nan_to_num(self.array, nan=1e-8, posinf=1.0 - 1e-8) # Rescale probabilities to sum to 1 self.array /= np.expand_dims(self.array.sum(axis=-1), axis=-1) - self.array = self.array.cumsum(axis=-1) + self.cumsum_array = self.array.cumsum(axis=-1) self._scaled = True def sample(self, incomplete_data: np.ndarray) -> np.ndarray: """Sample based on parent values.""" if not self._scaled: raise ValueError("CPT not scaled; use .rescale_probabilities() before sampling") - parent_values = incomplete_data[:, self.parents] - random_vector = np.random.uniform(size=parent_values.shape[0]) - parent_values: List[Tuple[int, ...]] = list(map(tuple, parent_values)) - return _sample_cpt(self.array, parent_values, random_vector) + parent_values_array = incomplete_data[:, self.parents].astype(int) + random_vector = np.random.uniform(size=parent_values_array.shape[0]) + parent_values: List[Tuple[int, ...]] = list(map(tuple, parent_values_array)) + return _sample_cpt(self.cumsum_array, parent_values, random_vector) - def sample_parameters(self) -> None: + def sample_parameters( + self, alpha: Optional[float] = None, seed: Optional[int] = None + ) -> np.ndarray: """Sample CPT from dirichlet distribution.""" - raise NotImplementedError + if alpha is None: + alpha = 20.0 + if seed is not None: + np.random.seed(seed) + parent_levels = int(np.prod(np.array(self.n_parent_levels, dtype=np.int64))) + alpha_norm: np.float64 = np.max(np.array([0.01, alpha / (parent_levels * self.n_levels)])) + self.array = np.random.dirichlet( + np.array([alpha_norm] * self.n_levels), parent_levels + ).reshape(self.array.shape) + self.rescale_probabilities() def _sample_cpt( @@ -77,24 +100,54 @@ def _sample_cpt( class ConditionalProbabilityDistribution: """Conditional probability distribution for continuous data.""" - def __init__(self, node: igraph.Vertex, noise_scale: float = 1.0) -> None: + def __init__( + self, + node: Optional[igraph.Vertex] = None, + mean: Optional[float] = None, + std: Optional[float] = None, + ) -> None: """Initialise a conditional probability table.""" - self.noise_scale = noise_scale + if mean is None: + mean = 0.0 + self.mean = mean + if std is None: + std = 1.0 + self.std = std + if node is None: + return self.parents = np.array([parent.index for parent in node.neighbors(mode="in")], dtype=int) - self.parent_names = [parent['name'] for parent in node.neighbors(mode="in")] - self._n_parents = len(self.parents) - self.array = np.zeros(self._n_parents, dtype=float) + self.array = np.zeros(len(self.parents), dtype=float) + + @classmethod + def from_dict(cls, **kwargs: Dict[str, Any]) -> 'ConditionalProbabilityDistribution': + """Instantiate from dict generated by `to_dict()`.""" + cpd = cls() + kwargs['array'] = np.array(kwargs['array']) + kwargs['parents'] = np.array(kwargs['parents']) + cpd.__dict__.update(**kwargs) + return cpd + + def to_dict(self) -> Dict[str, Any]: + """Generate dictionary representation for serialisation.""" + kwargs = self.__dict__.copy() + kwargs['array'] = self.array.tolist() + kwargs['parents'] = self.parents.tolist() + return kwargs def sample_parameters( - self, weights: Union[List[float], Tuple[float, ...]] = (-2.0, -0.5, 0.5, 2.0) + self, weights: Optional[List[float]] = None, seed: Optional[int] = None ) -> None: """Sample parent weights uniformly from defined possible values.""" - self.array = np.random.choice(weights, self._n_parents) + if seed is not None: + np.random.seed(seed) + if weights is None: + weights = [-2.0, -0.5, 0.5, 2.0] + self.array = np.random.choice(weights, len(self.parents)) def sample(self, incomplete_data: np.ndarray) -> np.ndarray: """Sample column based on parent columns in incomplete data matrix.""" - noise = np.random.normal(loc=0.0, scale=self.noise_scale, size=incomplete_data.shape[0]) - if self._n_parents == 0: + noise = np.random.normal(loc=self.mean, scale=self.std, size=incomplete_data.shape[0]) + if len(self.parents) == 0: return noise parent_values = incomplete_data[:, self.parents] return parent_values.dot(self.array) + noise diff --git a/baynet/structure.py b/baynet/structure.py index 22056cc..49d9fa0 100644 --- a/baynet/structure.py +++ b/baynet/structure.py @@ -2,13 +2,14 @@ from __future__ import annotations from itertools import combinations from typing import List, Union, Tuple, Set, Any, Dict, Optional -from string import Template from pathlib import Path import igraph import numpy as np +from yaml import safe_dump, safe_load -from .parameters import ConditionalProbabilityDistribution +from . import parameters +from .parameters import ConditionalProbabilityDistribution, ConditionalProbabilityTable def _nodes_sorted(nodes: Union[List[int], List[str], List[object]]) -> List[str]: @@ -43,7 +44,7 @@ class DAG(igraph.Graph): def __init__(self, *args: None, **kwargs: Any) -> None: """Create a graph object.""" # Grab *args and **kwargs because pickle/igraph do weird things here - super().__init__(directed=True, vertex_attrs={'CPD': None, 'levels': None}) + super().__init__(directed=True, vertex_attrs={'CPD': None}) if 'name' in kwargs.keys(): self.name = kwargs['name'] else: @@ -52,23 +53,44 @@ def __init__(self, *args: None, **kwargs: Any) -> None: @property def __dict__(self) -> Dict: """Return dict of attributes needed for pickling.""" - return {'nodes': list(self.nodes), 'edges': list(self.edges)} + if self.vs['CPD'] == [None for _ in self.vs]: + return { + 'name': self.name, + 'vs': [{'name': v['name']} for v in self.vs], + 'edges': list(self.edges), + } + return { + 'name': self.name, + 'vs': [ + {'name': v['name'], 'CPD': v['CPD'].to_dict(), 'type': type(v['CPD']).__name__} + for v in self.vs + ], + 'edges': list(self.edges), + } def __setstate__(self, state: Dict[str, Any]) -> None: - """Set new instance's state from a dict, used by pickle.""" - self.add_vertices(_nodes_sorted(state['nodes'])) - self.add_edges(state['edges']) + """Set new instance's state from a dict.""" + for vertex in state['vs']: + if 'CPD' in vertex.keys(): + cpd = getattr(parameters, vertex['type']).from_dict(**vertex['CPD']) + self.add_vertex(name=vertex['name'], CPD=cpd) + else: + self.add_vertex(name=vertex['name']) + self.add_edges([(node_from, node_to) for node_from, node_to in state.get('edges', [])]) + self.name = state['name'] @classmethod - def from_modelstring(cls, modelstring: str) -> DAG: + def from_modelstring(cls, modelstring: str, **kwargs: Dict[str, Any]) -> 'DAG': """Instantiate a Graph object from a modelstring.""" - dag = cls() + dag = cls(**kwargs) dag.add_vertices(_nodes_from_modelstring(modelstring)) dag.add_edges(_edges_from_modelstring(modelstring)) return dag @classmethod - def from_amat(cls, amat: Union[np.ndarray, List[List[int]]], colnames: List[str]) -> DAG: + def from_amat( + cls, amat: Union[np.ndarray, List[List[int]]], colnames: List[str], **kwargs: Dict[str, Any] + ) -> 'DAG': """Instantiate a Graph object from an adjacency matrix.""" if isinstance(amat, np.ndarray): amat = amat.tolist() @@ -78,14 +100,14 @@ def from_amat(cls, amat: Union[np.ndarray, List[List[int]]], colnames: List[str] raise ValueError( f"Graph.from_amat() expected `colnames` of type list, but got {type(colnames)}" ) - dag = cls.Adjacency(amat) + dag = cls.Adjacency(amat, **kwargs) dag.vs['name'] = colnames return dag @classmethod - def from_other(cls, other_graph: Any) -> DAG: + def from_other(cls, other_graph: Any, **kwargs: Dict[str, Any]) -> 'DAG': """Attempt to create a Graph from an existing graph object (nx.DiGraph etc.).""" - graph = cls() + graph = cls(**kwargs) graph.add_vertices(_nodes_sorted(other_graph.nodes)) graph.add_edges(other_graph.edges) return graph @@ -141,8 +163,8 @@ def add_edges(self, edges: List[Tuple[str, str]]) -> None: for source, target in edges: if (source, target) in self.edges: raise ValueError(f"Edge {source}->{target} already exists in Graph") - if len(edges) != len(set(edges)): - raise ValueError("Edges list contains duplicates") + if len(edges) != len(set(edges)): + raise ValueError("Edges list contains duplicates") super().add_edges(edges) assert self.is_dag() @@ -203,25 +225,51 @@ def get_v_structures(self, include_shielded: bool = False) -> Set[Tuple[str, str v_structures += node_v_structures return set(v_structures) - def generate_parameters( + def generate_continuous_parameters( + self, + possible_weights: Optional[List[float]] = None, + mean: Optional[float] = None, + std: Optional[float] = None, + seed: Optional[int] = None, + ) -> None: + """Populate continuous conditional distributions for each node.""" + for vertex in self.vs: + vertex['CPD'] = ConditionalProbabilityDistribution(vertex, mean=mean, std=std) + vertex['CPD'].sample_parameters(weights=possible_weights, seed=seed) + + def generate_levels( self, - data_type: str, - possible_weights: Optional[Union[List[float], Tuple[float]]] = None, - noise_scale: float = 1.0, + cardinality_min: Optional[int] = None, + cardinality_max: Optional[int] = None, seed: Optional[int] = None, ) -> None: - """Populate parameters for each node.""" + """Set number of levels in each node, for generating discrete data.""" if seed is not None: np.random.seed(seed) - if data_type in ['cont', 'continuous']: - for vertex in self.vs: - vertex['CPD'] = ConditionalProbabilityDistribution(vertex, noise_scale) - if possible_weights is not None: - vertex['CPD'].sample_parameters(weights=possible_weights) - else: - vertex['CPD'].sample_parameters() - else: - raise NotImplementedError("Graph.generate_parameters() only supports 'continuous'") + if cardinality_min is None: + cardinality_min = 2 + if cardinality_max is None: + cardinality_max = 3 + assert cardinality_max >= cardinality_min >= 2 + for vertex in self.vs: + n_levels = np.random.randint(cardinality_min, cardinality_max + 1) + vertex['levels'] = list(map(str, range(n_levels))) + + def generate_discrete_parameters( + self, + alpha: Optional[float] = None, + cardinality_min: Optional[int] = None, + cardinality_max: Optional[int] = None, + seed: Optional[int] = None, + ) -> None: + """Populate discrete conditional parameter tables for each node.""" + try: + self.vs['levels'] + except KeyError: + self.generate_levels(cardinality_min, cardinality_max, seed) + for vertex in self.vs: + vertex['CPD'] = ConditionalProbabilityTable(vertex) + vertex['CPD'].sample_parameters(alpha=alpha, seed=seed) def sample(self, n_samples: int, seed: Optional[int] = None) -> np.ndarray: """Sample n_samples rows of data from the graph.""" @@ -233,34 +281,22 @@ def sample(self, n_samples: int, seed: Optional[int] = None) -> np.ndarray: data[:, node_idx] = self.vs[node_idx]['CPD'].sample(data) return data - def to_bif(self, filepath: Optional[Path] = None) -> str: - """Represent DAG as a BIF file, optionally saving to file.""" - network_template = Template("network $name {\n}\n") - continuous_variable_template = Template( - """variable $name {\n type continuous;\n $properties}\n""" - ) - continuous_probability_template = Template( - """probability ( $node | $parents ) {\n table $values ;\n }\n""" - ) - bif_string = network_template.safe_substitute(name=self.name) - - for vertex in self.vs: - bif_string += continuous_variable_template.safe_substitute( - name=vertex['name'], properties="" - ) + def save(self, yaml_path: Optional[Path] = None) -> Optional[str]: + """Save DAG as yaml file, or string if no path is specified.""" + if yaml_path is None: + return safe_dump(self.__dict__) + with yaml_path.open('w') as stream: + return safe_dump(self.__dict__, stream=stream) - for vertex in self.vs: - if vertex['CPD'] is not None and vertex['CPD'].array.size > 0: - bif_string += continuous_probability_template.safe_substitute( - node=vertex['name'], - parents=', '.join(vertex['CPD'].parent_names), - values=', '.join(list(vertex['CPD'].array.astype(str))), - ) - if filepath is not None: - if filepath.is_dir(): - filepath = filepath / 'graph.bif' - filepath.resolve() - assert filepath.suffix == '.bif' - filepath.write_text(bif_string) - - return bif_string + @classmethod + def load(cls, yaml: Union[Path, str]) -> DAG: + """Load DAG from yaml file or string.""" + if isinstance(yaml, Path): + with yaml.open('r') as stream: + yaml_str = stream.read() + else: + yaml_str = yaml + state = safe_load(yaml_str) + dag = cls() + dag.__setstate__(state) + return dag diff --git a/setup.py b/setup.py index bcc525b..5e1990a 100644 --- a/setup.py +++ b/setup.py @@ -14,7 +14,7 @@ url="https://github.com/Stoffle/BayNet", packages=find_packages(exclude=("tests",)), python_requires=">=3.7", - install_requires=["python-igraph < 0.8.0", "numpy >= 1.17.2", "pandas >= 0.25",], + install_requires=["python-igraph < 0.8.0", "numpy >= 1.17.2", "pandas >= 0.25", "pyyaml"], extras_require={ "dev": [ "black", diff --git a/tests/test_metrics.py b/tests/test_metrics.py index c86d214..9bbaa78 100644 --- a/tests/test_metrics.py +++ b/tests/test_metrics.py @@ -6,7 +6,7 @@ def test_check_args(): dag1 = test_dag() dag1.to_undirected() - dag2 = test_dag(reversed=True) + dag2 = test_dag(reverse=True) assert metrics._check_args(dag2, dag2, False) assert metrics._check_args(dag1, dag2, True) with pytest.raises(ValueError): @@ -21,7 +21,7 @@ def test_check_args(): def test_false_positive_edges(): dag1 = test_dag() - dag2 = test_dag(reversed=True) + dag2 = test_dag(reverse=True) assert metrics.false_positive_edges(dag1, dag1, True) == set() assert metrics.false_positive_edges(dag1, dag1, False) == set() assert metrics.false_positive_edges(dag1, dag2, True) == set() @@ -30,7 +30,7 @@ def test_false_positive_edges(): def test_true_positive_edges(): dag1 = test_dag() - dag2 = test_dag(reversed=True) + dag2 = test_dag(reverse=True) assert metrics.true_positive_edges(dag1, dag1, True) == dag1.edges | dag1.reversed_edges assert metrics.true_positive_edges(dag1, dag1, False) == dag1.edges assert metrics.true_positive_edges(dag1, dag2, True) == dag1.edges | dag1.reversed_edges @@ -39,7 +39,7 @@ def test_true_positive_edges(): def test_precision(): dag1 = test_dag() - dag2 = test_dag(reversed=True) + dag2 = test_dag(reverse=True) dag3 = partial_dag() assert metrics.precision(dag1, dag1, True) == 1.0 assert metrics.precision(dag1, dag1, False) == 1.0 @@ -51,7 +51,7 @@ def test_precision(): def test_recall(): dag1 = test_dag() - dag2 = test_dag(reversed=True) + dag2 = test_dag(reverse=True) dag3 = partial_dag() assert metrics.recall(dag1, dag1, True) == 1.0 assert metrics.recall(dag1, dag1, False) == 1.0 @@ -63,7 +63,7 @@ def test_recall(): def test_f1_score(): dag1 = test_dag() - dag2 = test_dag(reversed=True) + dag2 = test_dag(reverse=True) dag3 = partial_dag() assert metrics.f1_score(dag1, dag1, True) == 1.0 assert metrics.f1_score(dag1, dag2, True) == 1.0 @@ -73,7 +73,7 @@ def test_f1_score(): def test_dag_shd(): dag1 = test_dag() - dag2 = test_dag(reversed=True) + dag2 = test_dag(reverse=True) dag3 = partial_dag() assert metrics.shd(dag1, dag1, False) == 0 @@ -83,7 +83,7 @@ def test_dag_shd(): def test_skeleton_shd(): dag1 = test_dag() - dag2 = test_dag(reversed=True) + dag2 = test_dag(reverse=True) dag3 = partial_dag() assert metrics.shd(dag1, dag1, True) == 0 @@ -93,7 +93,7 @@ def test_skeleton_shd(): def test_false_positive_v_structures(): dag1 = test_dag() - dag2 = test_dag(reversed=True) + dag2 = test_dag(reverse=True) dag3 = partial_dag() assert metrics.false_positive_v_structures(dag1, dag2) == set() @@ -103,7 +103,7 @@ def test_false_positive_v_structures(): def test_true_positive_v_structures(): dag1 = test_dag() - dag2 = test_dag(reversed=True) + dag2 = test_dag(reverse=True) dag3 = partial_dag() assert metrics.true_positive_v_structures(dag1, dag2) == set() @@ -114,7 +114,7 @@ def test_true_positive_v_structures(): def test_false_negative_v_structures(): dag1 = test_dag() - dag2 = test_dag(reversed=True) + dag2 = test_dag(reverse=True) dag3 = partial_dag() assert metrics.false_negative_v_structures(dag1, dag2) == set() @@ -124,7 +124,7 @@ def test_false_negative_v_structures(): def test_v_precision(): dag1 = test_dag() - dag2 = test_dag(reversed=True) + dag2 = test_dag(reverse=True) dag3 = partial_dag() assert metrics.v_precision(dag1, dag2) == 0.0 @@ -134,7 +134,7 @@ def test_v_precision(): def test_v_recall(): dag1 = test_dag() - dag2 = test_dag(reversed=True) + dag2 = test_dag(reverse=True) dag3 = partial_dag() assert metrics.v_recall(dag1, dag2) == 0.0 @@ -143,7 +143,7 @@ def test_v_recall(): def test_v_f1(): dag1 = test_dag() - dag2 = test_dag(reversed=True) + dag2 = test_dag(reverse=True) dag3 = partial_dag() assert metrics.v_f1(dag1, dag2) == 0.0 diff --git a/tests/test_parameters.py b/tests/test_parameters.py index 8aa370a..43f277a 100644 --- a/tests/test_parameters.py +++ b/tests/test_parameters.py @@ -13,7 +13,7 @@ def test_CPT_init(): dag = test_dag() - dag.vs['levels'] = 2 + dag.vs['levels'] = [["0", "1"] for v in dag.vs] cpt = ConditionalProbabilityTable(dag.vs[1]) assert cpt.array.shape == (2, 2, 2) assert np.allclose(cpt.array, 0) @@ -30,25 +30,27 @@ def test_CPT_init(): def test_CPT_rescale(): dag = test_dag() for n_levels in [1, 2, 3, 4]: - dag.vs['levels'] = n_levels + dag.vs['levels'] = [list(map(str, range(n_levels))) for v in dag.vs] cpt = ConditionalProbabilityTable(dag.vs[1]) cpt.rescale_probabilities() # Check cumsum is working properly - for i in range(cpt._levels): - assert np.allclose(cpt.array[:, :, i], (i + 1) / cpt._levels) + for i in range(cpt.n_levels): + assert np.allclose(cpt.cumsum_array[:, :, i], (i + 1) / cpt.n_levels) cpt.array = np.random.uniform(size=(3, 3, 3)) cpt.rescale_probabilities() for i in range(3): for j in range(3): # Check last value in each CPT 'row' is 1 (double checking cumsum with random init) - assert np.isclose(np.sum(cpt[i, j, -1]), 1) + assert np.isclose(np.sum(cpt.cumsum_array[i, j, -1]), 1) # and each value is larger than the previous - assert cpt[i, j, 0] <= cpt[i, j, 1] <= cpt[i, j, 2] + assert ( + cpt.cumsum_array[i, j, 0] <= cpt.cumsum_array[i, j, 1] <= cpt.cumsum_array[i, j, 2] + ) def test_CPT_sample_exceptions(): dag = test_dag() - dag.vs['levels'] = 2 + dag.vs['levels'] = [["0", "1"] for v in dag.vs] cpt = ConditionalProbabilityTable(dag.vs[1]) with pytest.raises(ValueError): cpt.sample(None) @@ -58,15 +60,16 @@ def test_CPT_sample_exceptions(): def test_CPT_sample_parameters(): dag = test_dag() - dag.vs['levels'] = 2 + dag.vs['levels'] = [["0", "1"] for v in dag.vs] cpt = ConditionalProbabilityTable(dag.vs[1]) - with pytest.raises(NotImplementedError): - cpt.sample_parameters() + cpt_shape = cpt.array.shape + cpt.sample_parameters(seed=0) + assert cpt.array.shape == cpt_shape def test_sample_cpt(): dag = test_dag() - dag.vs['levels'] = 2 + dag.vs['levels'] = [["0", "1"] for v in dag.vs] cpt = ConditionalProbabilityTable(dag.vs[1]) cpt.array[0, 0, :] = [0.5, 0.5] cpt.array[0, 1, :] = [1.0, 0.0] @@ -80,31 +83,15 @@ def test_sample_cpt(): expected_output = np.array([0, 1, 0, 0, 1, 1, 0, 1]) - assert np.all(_sample_cpt(cpt.array, parent_values_tuples, random_vector) == expected_output) + assert np.all( + _sample_cpt(cpt.cumsum_array, parent_values_tuples, random_vector) == expected_output + ) np.random.seed(0) # TODO: replace with mocking np.random.normal data = np.zeros((8, 4), dtype=int) data[:, cpt.parents] = parent_values assert np.all(cpt.sample(data) == [1.0, 1.0, 0.0, 0.0, 1.0, 1.0, 0.0, 1.0]) -def time_sample_cpt(): - n = 1_000_000 - levels = 3 - - dag = DAG.from_modelstring("[A|B:C:D:E:F:G:H:I:J:K][B][C][D][E][F][G][H][I][J][K]") - dag.vs['levels'] = levels - cpt = ConditionalProbabilityTable(dag.vs[0]) - cpt.rescale_probabilities() - parent_values = np.random.randint(0, levels, size=(n, 2)) - parent_values = list(map(tuple, parent_values)) - random_vector = np.random.uniform(size=(n)) - - start = time() - _sample_cpt(cpt.array, parent_values, random_vector) - end = time() - print(end - start) - - def test_cpd_init(): dag = test_dag() cpd = ConditionalProbabilityDistribution(dag.vs[1]) @@ -115,13 +102,13 @@ def test_cpd_init(): def test_cpd_sample_params(): dag = test_dag() cpd = ConditionalProbabilityDistribution(dag.vs[1]) - cpd.sample_parameters(weights=[1]) + cpd.sample_parameters(weights=[1], seed=0) assert np.allclose(cpd.array, 1) def test_cpd_sample(): dag = test_dag() - cpd = ConditionalProbabilityDistribution(dag.vs[1], noise_scale=0) + cpd = ConditionalProbabilityDistribution(dag.vs[1], std=0) cpd.sample_parameters(weights=[1]) assert np.allclose(cpd.sample(np.ones((10, 4))), 2) with pytest.raises(TypeError): @@ -129,6 +116,6 @@ def test_cpd_sample(): with pytest.raises(IndexError): cpd.sample(np.ones((10, 1))) - cpd_no_parents = ConditionalProbabilityDistribution(dag.vs[0], noise_scale=0) + cpd_no_parents = ConditionalProbabilityDistribution(dag.vs[0], std=0) cpd_no_parents.sample_parameters(weights=[1]) assert np.allclose(cpd_no_parents.sample(np.ones((10, 4))), 0) diff --git a/tests/test_structure.py b/tests/test_structure.py index 7e381d8..c2b6cd0 100644 --- a/tests/test_structure.py +++ b/tests/test_structure.py @@ -6,9 +6,10 @@ import networkx as nx import numpy as np from igraph import VertexSeq +import yaml from baynet.structure import DAG, _nodes_sorted, _nodes_from_modelstring, _edges_from_modelstring -from .utils import TEST_MODELSTRING, REVERSED_MODELSTRING, test_dag, partial_dag +from .utils import TEST_MODELSTRING, REVERSED_MODELSTRING, test_dag, partial_dag, temp_out def test_nodes_sorted(): @@ -107,7 +108,7 @@ def test_DAG_get_numpy_adjacency(): def test_DAG_get_modelstring(): assert test_dag().get_modelstring() == TEST_MODELSTRING - assert test_dag(reversed=True).get_modelstring() == REVERSED_MODELSTRING + assert test_dag(reverse=True).get_modelstring() == REVERSED_MODELSTRING def test_DAG_get_ancestors(): @@ -158,7 +159,6 @@ def test_DAG_are_neighbours(): def test_DAG_get_v_structures(): dag = test_dag() - part_dag = partial_dag() reversed_dag = test_dag(True) assert partial_dag().get_v_structures() == {("C", "B", "D")} assert dag.get_v_structures() == set() @@ -168,101 +168,83 @@ def test_DAG_get_v_structures(): def test_DAG_pickling(): dag = test_dag() - state = dag.__dict__ - dag_from_state = DAG() - dag_from_state.__setstate__(state) p = pickle.dumps(dag) unpickled_dag = pickle.loads(p) - assert dag.nodes == dag_from_state.nodes - assert dag.edges == dag_from_state.edges == dag_from_state.directed_edges assert dag.nodes == unpickled_dag.nodes assert dag.edges == unpickled_dag.edges == unpickled_dag.directed_edges +def test_DAG_yaml_continuous_file(temp_out): + dag_path = temp_out / 'cont.yml' + dag = test_dag() + dag.generate_continuous_parameters() + dag.save(dag_path) + dag2 = DAG.load(dag_path) + assert dag.nodes == dag2.nodes + assert dag.edges == dag2.edges + assert dag.__dict__['vs'] == dag2.__dict__['vs'] + + +def test_DAG_yaml_continuous_str(): + dag = test_dag() + dag.generate_continuous_parameters() + dag_string = dag.save() + dag2 = DAG.load(dag_string) + assert dag.nodes == dag2.nodes + assert dag.edges == dag2.edges + assert dag.__dict__['vs'] == dag2.__dict__['vs'] + + +def test_DAG_yaml_discrete_file(temp_out): + dag_path = temp_out / 'cont.yml' + dag = test_dag() + dag.generate_discrete_parameters(seed=0) + dag.save(dag_path) + dag2 = DAG.load(dag_path) + assert dag.nodes == dag2.nodes + assert dag.edges == dag2.edges + assert dag.__dict__['vs'] == dag2.__dict__['vs'] + + +def test_DAG_yaml_discrete_str(): + dag = test_dag() + dag.generate_discrete_parameters(seed=0) + dag_string = dag.save() + dag2 = DAG.load(dag_string) + assert dag.nodes == dag2.nodes + assert dag.edges == dag2.edges + assert dag.__dict__['vs'] == dag2.__dict__['vs'] + + def test_DAG_generate_parameters(): dag = test_dag() - dag.generate_parameters(data_type='cont', possible_weights=[1], noise_scale=0.0) + dag.generate_continuous_parameters(possible_weights=[1], std=0.0) for v in dag.vs: assert np.allclose(v['CPD'].array, 1) - with pytest.raises(NotImplementedError): - dag.generate_parameters(data_type='disc') + for levels in [["0", "1"], ["0", "1", "2"]]: + dag.vs['levels'] = [levels for v in dag.vs] + dag.generate_discrete_parameters() + assert dag.vs[0]['CPD'].array.shape == (len(levels),) + assert dag.vs[1]['CPD'].array.shape == (len(levels), len(levels), len(levels)) + assert dag.vs[2]['CPD'].array.shape == (len(levels), len(levels)) + assert dag.vs[3]['CPD'].array.shape == (len(levels),) -def test_DAG_sample(): +def test_DAG_sample_continuous(): dag = test_dag() - dag.generate_parameters(data_type='cont', noise_scale=0.0) + dag.generate_continuous_parameters(std=0.0) assert np.allclose(dag.sample(10), 0) - dag.generate_parameters(data_type='cont', noise_scale=1.0) + dag.generate_continuous_parameters(std=1.0) assert not np.allclose(dag.sample(10, seed=1), 0) -def test_DAG_to_bif(): - dag = test_dag() - assert ( - dag.to_bif() - == """network unnamed { -} -variable A { - type continuous; - } -variable B { - type continuous; - } -variable C { - type continuous; - } -variable D { - type continuous; - } -""" - ) - +def test_DAG_sample_discrete(): dag = test_dag() - dag.generate_parameters(data_type='cont', possible_weights=[2], noise_scale=0.0, seed=1) - dag.name = 'test_dag' - assert ( - dag.to_bif() - == """network test_dag { -} -variable A { - type continuous; - } -variable B { - type continuous; - } -variable C { - type continuous; - } -variable D { - type continuous; - } -probability ( B | C, D ) { - table 2, 2 ; - } -probability ( C | D ) { - table 2 ; - } -""" - ) - - test_path = Path(__file__).parent.resolve() - dag.to_bif(filepath=test_path) - filepath = test_path / 'graph.bif' - assert filepath.read_text() == dag.to_bif() - filepath.unlink() - - with pytest.raises(NotImplementedError): - dag = test_dag() - dag.generate_parameters(data_type='discrete') - assert ( - dag.to_bif() - == """ - - """ - ) + dag.generate_discrete_parameters() + assert not np.allclose(dag.sample(10, seed=1), 0) def test_Graph(): diff --git a/tests/utils.py b/tests/utils.py index b0dea5f..6957f2f 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,20 +1,32 @@ +import tempfile +from pathlib import Path import numpy as np +import pytest from baynet.structure import DAG TEST_MODELSTRING = "[A][B|C:D][C|D][D]" REVERSED_MODELSTRING = "[A][B][C|B][D|B:C]" -def test_dag(reversed: bool = False) -> DAG: - if not reversed: - return DAG.from_modelstring(TEST_MODELSTRING) +def test_dag(reverse: bool = False) -> DAG: + if not reverse: + return DAG.from_modelstring(TEST_MODELSTRING, name='test_dag') else: - return DAG.from_modelstring(REVERSED_MODELSTRING) + return DAG.from_modelstring(REVERSED_MODELSTRING, name='test_dag') def partial_dag() -> DAG: - return DAG.from_modelstring("[A][B|C:D][C][D]") + return DAG.from_modelstring("[A][B|C:D][C][D]", name='partial_dag') def empty_dag() -> DAG: - return DAG.from_amat(np.zeros((4, 4)), list("ABCD")) + return DAG.from_amat(np.zeros((4, 4)), list("ABCD"), name='empty_dag') + + +@pytest.fixture(scope="function") +def temp_out(): + """ + Create temporary directory for storing test outputs. + """ + with tempfile.TemporaryDirectory() as tmpdir: + yield Path(tmpdir).resolve()