Skip to content

Commit

Permalink
Merge pull request Stoffle#6 from predictive-analytics-lab/discrete_s…
Browse files Browse the repository at this point in the history
…ampling

Sampling, serialisation
  • Loading branch information
Stoffle authored Mar 31, 2020
2 parents 33253d4 + 510475d commit ed5c931
Show file tree
Hide file tree
Showing 7 changed files with 296 additions and 226 deletions.
125 changes: 89 additions & 36 deletions baynet/parameters.py
Original file line number Diff line number Diff line change
@@ -1,38 +1,49 @@
"""Parameter tables for Graph objects."""
from typing import List, Tuple, Union
from typing import List, Tuple, Optional, Dict, Any
import numpy as np
import igraph


class ConditionalProbabilityTable:
"""Conditional probability table for categorical data."""

def __init__(self, node: igraph.Vertex) -> None:
def __init__(self, node: Optional[igraph.Vertex] = None) -> None:
"""Initialise a conditional probability table."""
if node is None:
return
self._scaled = False
# sorted_parents = sorted(node.neighbors(mode="in"), key = lambda x: x['name'])
# print(sorted_parents)
parent_levels = [v['levels'] for v in node.neighbors(mode="in")]
self._n_parents = len(parent_levels)
self.parents = np.array([parent.index for parent in node.neighbors(mode="in")], dtype=int)
self.parent_names = [parent['name'] for parent in node.neighbors(mode="in")]
if any([pl is None for pl in parent_levels]):
self.parent_levels = [v['levels'] for v in node.neighbors(mode="in")]
if any([pl is None for pl in self.parent_levels]):
raise ValueError(f"Parent of {node['name']} missing attribute 'levels'")
self.n_parent_levels = [len(v['levels']) for v in node.neighbors(mode="in")]
self.parents = np.array([parent.index for parent in node.neighbors(mode="in")], dtype=int)

node_levels = node['levels']
if node_levels is None:
raise ValueError(f"Node {node['name']} missing attribute 'levels'")

self.array = np.zeros([*parent_levels, node_levels], dtype=float)
self._levels = node_levels

def __getitem__(self, indexer: Union[int, Tuple[int, ...]]) -> np.ndarray:
"""
Return CPT row corresponding to given indexer.
Wraps the stored array's __getitem___.
"""
return self.array[indexer]
self.levels = node_levels
self.n_levels = len(node_levels)

self.array = np.zeros([*self.n_parent_levels, len(node_levels)], dtype=float)
self.cumsum_array = np.zeros([*self.n_parent_levels, len(node_levels)], dtype=float)

@classmethod
def from_dict(cls, **kwargs: Dict[str, Any]) -> 'ConditionalProbabilityTable':
"""Instantiate from dict generated by `to_dict()`."""
cpd = cls()
kwargs['array'] = np.array(kwargs['array'])
kwargs['parents'] = np.array(kwargs['parents'])
kwargs['cumsum_array'] = np.array(kwargs['cumsum_array'])
cpd.__dict__.update(**kwargs)
return cpd

def to_dict(self) -> Dict[str, Any]:
"""Generate dictionary representation for serialisation."""
kwargs = self.__dict__.copy()
kwargs['array'] = self.array.tolist()
kwargs['parents'] = self.parents.tolist()
kwargs['cumsum_array'] = self.cumsum_array.tolist()
return kwargs

def rescale_probabilities(self) -> None:
"""
Expand All @@ -44,23 +55,35 @@ def rescale_probabilities(self) -> None:
"""
# Anywhere with sum(probs) == 0, we set to all 1 prior to scaling
self.array[self.array.sum(axis=-1) == 0] = 1
self.array = np.nan_to_num(self.array, nan=1e-8, posinf=1.0 - 1e-8)
# Rescale probabilities to sum to 1
self.array /= np.expand_dims(self.array.sum(axis=-1), axis=-1)
self.array = self.array.cumsum(axis=-1)
self.cumsum_array = self.array.cumsum(axis=-1)
self._scaled = True

def sample(self, incomplete_data: np.ndarray) -> np.ndarray:
"""Sample based on parent values."""
if not self._scaled:
raise ValueError("CPT not scaled; use .rescale_probabilities() before sampling")
parent_values = incomplete_data[:, self.parents]
random_vector = np.random.uniform(size=parent_values.shape[0])
parent_values: List[Tuple[int, ...]] = list(map(tuple, parent_values))
return _sample_cpt(self.array, parent_values, random_vector)
parent_values_array = incomplete_data[:, self.parents].astype(int)
random_vector = np.random.uniform(size=parent_values_array.shape[0])
parent_values: List[Tuple[int, ...]] = list(map(tuple, parent_values_array))
return _sample_cpt(self.cumsum_array, parent_values, random_vector)

def sample_parameters(self) -> None:
def sample_parameters(
self, alpha: Optional[float] = None, seed: Optional[int] = None
) -> np.ndarray:
"""Sample CPT from dirichlet distribution."""
raise NotImplementedError
if alpha is None:
alpha = 20.0
if seed is not None:
np.random.seed(seed)
parent_levels = int(np.prod(np.array(self.n_parent_levels, dtype=np.int64)))
alpha_norm: np.float64 = np.max(np.array([0.01, alpha / (parent_levels * self.n_levels)]))
self.array = np.random.dirichlet(
np.array([alpha_norm] * self.n_levels), parent_levels
).reshape(self.array.shape)
self.rescale_probabilities()


def _sample_cpt(
Expand All @@ -77,24 +100,54 @@ def _sample_cpt(
class ConditionalProbabilityDistribution:
"""Conditional probability distribution for continuous data."""

def __init__(self, node: igraph.Vertex, noise_scale: float = 1.0) -> None:
def __init__(
self,
node: Optional[igraph.Vertex] = None,
mean: Optional[float] = None,
std: Optional[float] = None,
) -> None:
"""Initialise a conditional probability table."""
self.noise_scale = noise_scale
if mean is None:
mean = 0.0
self.mean = mean
if std is None:
std = 1.0
self.std = std
if node is None:
return
self.parents = np.array([parent.index for parent in node.neighbors(mode="in")], dtype=int)
self.parent_names = [parent['name'] for parent in node.neighbors(mode="in")]
self._n_parents = len(self.parents)
self.array = np.zeros(self._n_parents, dtype=float)
self.array = np.zeros(len(self.parents), dtype=float)

@classmethod
def from_dict(cls, **kwargs: Dict[str, Any]) -> 'ConditionalProbabilityDistribution':
"""Instantiate from dict generated by `to_dict()`."""
cpd = cls()
kwargs['array'] = np.array(kwargs['array'])
kwargs['parents'] = np.array(kwargs['parents'])
cpd.__dict__.update(**kwargs)
return cpd

def to_dict(self) -> Dict[str, Any]:
"""Generate dictionary representation for serialisation."""
kwargs = self.__dict__.copy()
kwargs['array'] = self.array.tolist()
kwargs['parents'] = self.parents.tolist()
return kwargs

def sample_parameters(
self, weights: Union[List[float], Tuple[float, ...]] = (-2.0, -0.5, 0.5, 2.0)
self, weights: Optional[List[float]] = None, seed: Optional[int] = None
) -> None:
"""Sample parent weights uniformly from defined possible values."""
self.array = np.random.choice(weights, self._n_parents)
if seed is not None:
np.random.seed(seed)
if weights is None:
weights = [-2.0, -0.5, 0.5, 2.0]
self.array = np.random.choice(weights, len(self.parents))

def sample(self, incomplete_data: np.ndarray) -> np.ndarray:
"""Sample column based on parent columns in incomplete data matrix."""
noise = np.random.normal(loc=0.0, scale=self.noise_scale, size=incomplete_data.shape[0])
if self._n_parents == 0:
noise = np.random.normal(loc=self.mean, scale=self.std, size=incomplete_data.shape[0])
if len(self.parents) == 0:
return noise
parent_values = incomplete_data[:, self.parents]
return parent_values.dot(self.array) + noise
Loading

0 comments on commit ed5c931

Please sign in to comment.