diff --git a/.gitignore b/.gitignore index 66a5171a..4b75fb4c 100644 --- a/.gitignore +++ b/.gitignore @@ -112,6 +112,7 @@ target/ *compss*.out *compss*.err + # ========== C & C++ ignores ================= # Prerequisites *.d diff --git a/Dockerfile b/Dockerfile index e8a72019..7b1ed215 100644 --- a/Dockerfile +++ b/Dockerfile @@ -4,6 +4,8 @@ MAINTAINER COMPSs Support COPY . dislib/ ENV PYTHONPATH=$PYTHONPATH:/dislib +ENV LC_ALL=C.UTF-8 +RUN pip3 install -r /dislib/requirements.txt # Expose SSH port and run SSHD EXPOSE 22 diff --git a/Jenkinsfile b/Jenkinsfile index 681857e0..eaf042e9 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -11,7 +11,7 @@ def setGithubCommitStatus(state, description) { pipeline { options { - timeout(time: 4, unit: 'HOURS') + timeout(time: 3, unit: 'HOURS') } agent { node { diff --git a/dislib/classification/__init__.py b/dislib/classification/__init__.py index 55bc2877..695dd571 100644 --- a/dislib/classification/__init__.py +++ b/dislib/classification/__init__.py @@ -1,4 +1,4 @@ from dislib.classification.csvm.base import CascadeSVM -from dislib.classification.rf.forest import RandomForestClassifier +from dislib.commons.rf.forest import RandomForestClassifier -__all__ = ['CascadeSVM', 'RandomForestClassifier'] +__all__ = ["CascadeSVM", "RandomForestClassifier"] diff --git a/dislib/classification/csvm/base.py b/dislib/classification/csvm/base.py index 8a052e0d..16e30741 100644 --- a/dislib/classification/csvm/base.py +++ b/dislib/classification/csvm/base.py @@ -192,7 +192,7 @@ def score(self, x, y, collect=False): Test samples. y : ds-array, shape=(n_samples, 1) True labels for x. - collect : bool + collect : bool, optional (default=False) When True, a synchronized result is returned. Returns diff --git a/dislib/classification/rf/_data.py b/dislib/classification/rf/_data.py deleted file mode 100644 index 9bd178b5..00000000 --- a/dislib/classification/rf/_data.py +++ /dev/null @@ -1,304 +0,0 @@ -import tempfile - -import numpy as np -from numpy.lib import format -from pycompss.api.parameter import FILE_IN, FILE_INOUT, COLLECTION_IN, Depth, \ - Type -from pycompss.api.task import task - -from dislib.data.array import Array - - -class RfDataset(object): - """Dataset format used by the fit() of the RandomForestClassifier. - - The RfDataset contains a file path for the samples and another one for the - labels. Optionally, a path can be provided for a transposed version of the - samples matrix, i.e., the features. - - Note: For a representation of a dataset distributed in multiple files, use - dislib.data.Dataset instead. - - Parameters - ---------- - samples_path : str - Path of the .npy file containing the 2-d array of samples. It can be a - pycompss.runtime.Future object. If so, self.n_samples and - self.n_features must be set manually (they can also be - pycompss.runtime.Future objects). - labels_path : str - Path of the .dat file containing the 1-d array of labels. It can be a - pycompss.runtime.Future object. - features_path : str, optional (default=None) - Path of the .npy file containing the 2-d array of samples transposed. - The array must be C-ordered. Providing this array may improve the - performance as it allows sequential access to the features. - - Attributes - ---------- - n_samples : int - The number of samples of the dataset. It can be a - pycompss.runtime.Future object. - n_features : int - The number of features of the dataset. It can be a - pycompss.runtime.Future object. - y_codes : ndarray - The codified array of labels for this RfDataset. The values are indices - of the array of classes, which contains the corresponding labels. The - dtype is np.int8. It can be a pycompss.runtime.Future object. - y_categories : ndarray - The array of classes for this RfDataset. The values are unique. It can - be a pycompss.runtime.Future object. - n_classes : int - The number of classes of this RfDataset. It can be a - pycompss.runtime.Future object. - - """ - - def __init__(self, samples_path, labels_path, features_path=None): - self.samples_path = samples_path - self.labels_path = labels_path - self.features_path = features_path - self.n_samples = None - self.n_features = None - - self.y_codes = None - self.y_categories = None - self.n_classes = None - - def get_n_samples(self): - """Gets the number of samples obtained from the samples file. - - Returns - ------- - n_samples : int - - Raises - ------ - AssertionError - If self.n_samples is None and self.samples_path is not a string. - ValueError - If invalid content is encountered in the samples file. - - """ - if self.n_samples is None: - assert isinstance(self.samples_path, str), \ - 'self.n_samples must be set manually if self.samples_path ' \ - 'is a pycompss.runtime.Future object' - shape = _NpyFile(self.samples_path).get_shape() - if len(shape) != 2: - raise ValueError('Cannot read 2D array from the samples file.') - self.n_samples, self.n_features = shape - return self.n_samples - - def get_n_features(self): - """Gets the number of features obtained from the samples file. - - Returns - ------- - n_features : int - - Raises - ------ - AssertionError - If self.n_features is None and self.samples_path is not a string. - ValueError - If invalid content is encountered in the samples file. - - """ - if self.n_features is None: - assert isinstance(self.samples_path, str), \ - 'self.n_features must be set manually if self.samples_path ' \ - 'is a pycompss.runtime.Future object' - shape = _NpyFile(self.samples_path).get_shape() - if len(shape) != 2: - raise ValueError('Cannot read 2D array from the samples file.') - self.n_samples, self.n_features = shape - return self.n_features - - def get_y_codes(self): - """Obtains the codified array of labels. - - Returns - ------- - y_codes : ndarray - - """ - if self.y_codes is None: - labels = _get_labels(self.labels_path) - self.y_codes, self.y_categories, self.n_classes = labels - return self.y_codes - - def get_classes(self): - """Obtains the array of label categories. - - Returns - ------- - y_categories : ndarray - - """ - if self.y_categories is None: - labels = _get_labels(self.labels_path) - self.y_codes, self.y_categories, self.n_classes = labels - return self.y_categories - - def get_n_classes(self): - """Obtains the number of classes. - - Returns - ------- - n_classes : int - - """ - if self.n_classes is None: - labels = _get_labels(self.labels_path) - self.y_codes, self.y_categories, self.n_classes = labels - return self.n_classes - - def validate_features_file(self): - """Validates the features file header information. - - Raises - ------ - ValueError - If the shape of the array in the features_file doesn't match this - class n_samples and n_features or if the array is in fortran order. - - """ - features_npy_file = _NpyFile(self.features_path) - shape = features_npy_file.get_shape() - fortran_order = features_npy_file.get_fortran_order() - if len(shape) != 2: - raise ValueError('Cannot read 2D array from features_file.') - if (self.get_n_features(), self.get_n_samples()) != shape: - raise ValueError('Invalid dimensions for the features_file.') - if fortran_order: - raise ValueError('Fortran order not supported for features array.') - - -def transform_to_rf_dataset(x: Array, y: Array) -> RfDataset: - """Creates a RfDataset object from samples x and labels y. - - This function creates a dislib.classification.rf.data.RfDataset by saving - x and y in files. - - Parameters - ---------- - x : ds-array, shape = (n_samples, n_features) - The training input samples. - y : ds-array, shape = (n_samples,) or (n_samples, n_outputs) - The target values. - - Returns - ------- - rf_dataset : dislib.classification.rf._data.RfDataset - - """ - n_samples = x.shape[0] - n_features = x.shape[1] - - samples_file = tempfile.NamedTemporaryFile(mode='wb', - prefix='tmp_rf_samples_', - delete=False) - samples_path = samples_file.name - samples_file.close() - _allocate_samples_file(samples_path, n_samples, n_features) - - start_idx = 0 - row_blocks_iterator = x._iterator(axis=0) - top_row = next(row_blocks_iterator) - _fill_samples_file(samples_path, top_row._blocks, start_idx) - start_idx += x._top_left_shape[0] - for x_row in row_blocks_iterator: - _fill_samples_file(samples_path, x_row._blocks, start_idx) - start_idx += x._reg_shape[0] - - labels_file = tempfile.NamedTemporaryFile(mode='w', - prefix='tmp_rf_labels_', - delete=False) - labels_path = labels_file.name - labels_file.close() - for y_row in y._iterator(axis=0): - _fill_labels_file(labels_path, y_row._blocks) - - rf_dataset = RfDataset(samples_path, labels_path) - rf_dataset.n_samples = n_samples - rf_dataset.n_features = n_features - return rf_dataset - - -class _NpyFile(object): - def __init__(self, path): - self.path = path - - self.shape = None - self.fortran_order = None - self.dtype = None - - def get_shape(self): - if self.shape is None: - self._read_header() - return self.shape - - def get_fortran_order(self): - if self.fortran_order is None: - self._read_header() - return self.fortran_order - - def get_dtype(self): - if self.dtype is None: - self._read_header() - return self.dtype - - def _read_header(self): - with open(self.path, 'rb') as fp: - version = format.read_magic(fp) - try: - format._check_version(version) - except ValueError: - raise ValueError('Invalid file format.') - header_data = format._read_array_header(fp, version) - self.shape, self.fortran_order, self.dtype = header_data - - -@task(labels_path=FILE_IN, returns=3) -def _get_labels(labels_path): - y = np.genfromtxt(labels_path, dtype=None, encoding='utf-8') - categories, codes = np.unique(y, return_inverse=True) - return codes.astype(np.int8), categories, len(categories) - - -@task(returns=1) -def _get_samples_shape(subset): - return subset.samples.shape - - -@task(returns=3) -def _merge_shapes(*samples_shapes): - n_samples = 0 - n_features = samples_shapes[0][1] - for shape in samples_shapes: - n_samples += shape[0] - assert shape[1] == n_features, 'Subsamples with different n_features.' - return samples_shapes, n_samples, n_features - - -@task(samples_path=FILE_INOUT) -def _allocate_samples_file(samples_path, n_samples, n_features): - np.lib.format.open_memmap(samples_path, mode='w+', dtype='float32', - shape=(int(n_samples), int(n_features))) - - -@task(samples_path=FILE_INOUT, row_blocks={Type: COLLECTION_IN, Depth: 2}) -def _fill_samples_file(samples_path, row_blocks, start_idx): - rows_samples = Array._merge_blocks(row_blocks) - rows_samples = rows_samples.astype(dtype='float32', casting='same_kind') - samples = np.lib.format.open_memmap(samples_path, mode='r+') - samples[start_idx: start_idx + rows_samples.shape[0]] = rows_samples - - -@task(labels_path=FILE_INOUT, row_blocks={Type: COLLECTION_IN, Depth: 2}) -def _fill_labels_file(labels_path, row_blocks): - rows_labels = Array._merge_blocks(row_blocks) - with open(labels_path, 'at') as f: - np.savetxt(f, rows_labels, fmt='%s', encoding='utf-8') diff --git a/dislib/classification/rf/forest.py b/dislib/classification/rf/forest.py deleted file mode 100644 index dd78b9e0..00000000 --- a/dislib/classification/rf/forest.py +++ /dev/null @@ -1,310 +0,0 @@ -import math -from collections import Counter - -import numpy as np -from pycompss.api.api import compss_wait_on -from pycompss.api.parameter import Type, COLLECTION_IN, Depth -from pycompss.api.task import task -from sklearn.base import BaseEstimator -from sklearn.utils import check_random_state - -from dislib.classification.rf.decision_tree import DecisionTreeClassifier -from dislib.data.array import Array -from dislib.utils.base import _paired_partition -from dislib.classification.rf._data import transform_to_rf_dataset - - -class RandomForestClassifier(BaseEstimator): - """A distributed random forest classifier. - - Parameters - ---------- - n_estimators : int, optional (default=10) - Number of trees to fit. - try_features : int, str or None, optional (default='sqrt') - The number of features to consider when looking for the best split: - - - If "sqrt", then `try_features=sqrt(n_features)`. - - If "third", then `try_features=n_features // 3`. - - If None, then `try_features=n_features`. - - Note: the search for a split does not stop until at least one - valid partition of the node samples is found, even if it requires - to effectively inspect more than ``try_features`` features. - max_depth : int or np.inf, optional (default=np.inf) - The maximum depth of the tree. If np.inf, then nodes are expanded - until all leaves are pure. - distr_depth : int or str, optional (default='auto') - Number of levels of the tree in which the nodes are split in a - distributed way. - sklearn_max: int or float, optional (default=1e8) - Maximum size (len(subsample)*n_features) of the arrays passed to - sklearn's DecisionTreeClassifier.fit(), which is called to fit subtrees - (subsamples) of our DecisionTreeClassifier. sklearn fit() is used - because it's faster, but requires loading the data to memory, which can - cause memory problems for large datasets. This parameter can be - adjusted to fit the hardware capabilities. - hard_vote : bool, optional (default=False) - If True, it uses majority voting over the predict() result of the - decision tree predictions. If False, it takes the class with the higher - probability given by predict_proba(), which is an average of the - probabilities given by the decision trees. - random_state : int, RandomState instance or None, optional (default=None) - If int, random_state is the seed used by the random number generator; - If RandomState instance, random_state is the random number generator; - If None, the random number generator is the RandomState instance used - by `np.random`. - - Attributes - ---------- - classes : None or ndarray - Array of distinct classes, set at fit(). - trees : list of DecisionTreeClassifier - List of the tree classifiers of this forest, populated at fit(). - """ - - def __init__(self, - n_estimators=10, - try_features='sqrt', - max_depth=np.inf, - distr_depth='auto', - sklearn_max=1e8, - hard_vote=False, - random_state=None): - self.n_estimators = n_estimators - self.try_features = try_features - self.max_depth = max_depth - self.distr_depth = distr_depth - self.sklearn_max = sklearn_max - self.hard_vote = hard_vote - self.random_state = random_state - - def fit(self, x, y): - """Fits the RandomForestClassifier. - - Parameters - ---------- - x : ds-array, shape=(n_samples, n_features) - The training input samples. Internally, its dtype will be converted - to ``dtype=np.float32``. - y : ds-array, shape=(n_samples, 1) - The target values. - - Returns - ------- - self : RandomForestClassifier - - """ - self.classes = None - self.trees = [] - - dataset = transform_to_rf_dataset(x, y) - - n_features = dataset.get_n_features() - try_features = _resolve_try_features(self.try_features, n_features) - random_state = check_random_state(self.random_state) - - self.classes = dataset.get_classes() - - if self.distr_depth == 'auto': - dataset.n_samples = compss_wait_on(dataset.get_n_samples()) - distr_depth = max(0, int(math.log10(dataset.n_samples)) - 4) - distr_depth = min(distr_depth, self.max_depth) - else: - distr_depth = self.distr_depth - - for i in range(self.n_estimators): - tree = DecisionTreeClassifier(try_features, self.max_depth, - distr_depth, self.sklearn_max, - bootstrap=True, - random_state=random_state) - self.trees.append(tree) - - for tree in self.trees: - tree.fit(dataset) - - return self - - def predict_proba(self, x): - """Predicts class probabilities using a fitted forest. - - The probabilities are obtained as an average of the probabilities of - each decision tree. - - - Parameters - ---------- - x : ds-array, shape=(n_samples, n_features) - The input samples. - - Returns - ------- - probabilities : ds-array, shape=(n_samples, n_classes) - Predicted probabilities for the samples to belong to each class. - The columns of the array correspond to the classes given at - self.classes. - - """ - assert self.trees is not None, 'The random forest is not fitted.' - prob_blocks = [] - for x_row in x._iterator(axis=0): - tree_predictions = [] - for tree in self.trees: - tree_predictions.append(tree.predict_proba(x_row)) - prob_blocks.append([_join_predictions(*tree_predictions)]) - self.classes = compss_wait_on(self.classes) - n_classes = len(self.classes) - - probabilities = Array(blocks=prob_blocks, - top_left_shape=(x._top_left_shape[0], n_classes), - reg_shape=(x._reg_shape[0], n_classes), - shape=(x.shape[0], n_classes), sparse=False) - return probabilities - - def predict(self, x): - """Predicts classes using a fitted forest. - - Parameters - ---------- - x : ds-array, shape=(n_samples, n_features) - The input samples. - - Returns - ------- - y_pred : ds-array, shape=(n_samples, 1) - Predicted class labels for x. - - """ - assert self.trees is not None, 'The random forest is not fitted.' - pred_blocks = [] - if self.hard_vote: - for x_row in x._iterator(axis=0): - tree_predictions = [] - for tree in self.trees: - tree_predictions.append(tree.predict(x_row)) - pred_blocks.append(_hard_vote(self.classes, *tree_predictions)) - else: - for x_row in x._iterator(axis=0): - tree_predictions = [] - for tree in self.trees: - tree_predictions.append(tree.predict_proba(x_row)) - pred_blocks.append(_soft_vote(self.classes, *tree_predictions)) - - y_pred = Array(blocks=[pred_blocks], - top_left_shape=(x._top_left_shape[0], 1), - reg_shape=(x._reg_shape[0], 1), shape=(x.shape[0], 1), - sparse=False) - - return y_pred - - def score(self, x, y, collect=False): - """Accuracy classification score. - - Returns the mean accuracy on the given test data. - - - Parameters - ---------- - x : ds-array, shape=(n_samples, n_features) - The training input samples. - y : ds-array, shape (n_samples, 1) - The true labels. - collect : bool - When True, a synchronized result is returned. - - Returns - ------- - score : float (as future object) - Fraction of correctly classified samples. - - """ - assert self.trees is not None, 'The random forest is not fitted.' - partial_scores = [] - if self.hard_vote: - for x_row, y_row in _paired_partition(x, y): - tree_predictions = [] - for tree in self.trees: - tree_predictions.append(tree.predict(x_row)) - subset_score = _hard_vote_score(y_row._blocks, self.classes, - *tree_predictions) - partial_scores.append(subset_score) - else: - for x_row, y_row in _paired_partition(x, y): - tree_predictions = [] - for tree in self.trees: - tree_predictions.append(tree.predict_proba(x_row)) - subset_score = _soft_vote_score(y_row._blocks, self.classes, - *tree_predictions) - partial_scores.append(subset_score) - - score = _merge_scores(*partial_scores) - - return compss_wait_on(score) if collect else score - - -@task(returns=1) -def _resolve_try_features(try_features, n_features): - if try_features is None: - return n_features - elif try_features == 'sqrt': - return int(math.sqrt(n_features)) - elif try_features == 'third': - return max(1, n_features // 3) - else: - return int(try_features) - - -@task(returns=1) -def _join_predictions(*predictions): - aggregate = predictions[0] - for p in predictions[1:]: - aggregate += p - labels = aggregate / len(predictions) - return labels - - -@task(returns=1) -def _soft_vote(classes, *predictions): - aggregate = predictions[0] - for p in predictions[1:]: - aggregate += p - labels = classes[np.argmax(aggregate, axis=1)] - return labels - - -@task(returns=1) -def _hard_vote(classes, *predictions): - mode = np.empty((len(predictions[0]),), dtype=int) - for sample_i, votes in enumerate(zip(*predictions)): - mode[sample_i] = Counter(votes).most_common(1)[0][0] - labels = classes[mode] - return labels - - -@task(y_blocks={Type: COLLECTION_IN, Depth: 2}, returns=1) -def _soft_vote_score(y_blocks, classes, *predictions): - real_labels = Array._merge_blocks(y_blocks).flatten() - aggregate = predictions[0] - for p in predictions[1:]: - aggregate += p - predicted_labels = classes[np.argmax(aggregate, axis=1)] - correct = np.count_nonzero(predicted_labels == real_labels) - return correct, len(real_labels) - - -@task(y_blocks={Type: COLLECTION_IN, Depth: 2}, returns=1) -def _hard_vote_score(y_blocks, classes, *predictions): - real_labels = Array._merge_blocks(y_blocks).flatten() - mode = np.empty((len(predictions[0]),), dtype=int) - for sample_i, votes in enumerate(zip(*predictions)): - mode[sample_i] = Counter(votes).most_common(1)[0][0] - predicted_labels = classes[mode] - correct = np.count_nonzero(predicted_labels == real_labels) - return correct, len(real_labels) - - -@task(returns=1) -def _merge_scores(*partial_scores): - correct = sum(subset_score[0] for subset_score in partial_scores) - total = sum(subset_score[1] for subset_score in partial_scores) - return correct / total diff --git a/dislib/classification/rf/test_split.py b/dislib/classification/rf/test_split.py deleted file mode 100644 index 70922783..00000000 --- a/dislib/classification/rf/test_split.py +++ /dev/null @@ -1,50 +0,0 @@ -from sys import float_info - -import numpy as np - - -def gini_criteria_proxy(l_weight, l_length, r_weight, r_length, not_repeated): - """ - Maximizing the Gini gain is equivalent to minimizing this proxy function. - - """ - return -(l_weight / l_length + r_weight / r_length) * not_repeated - - -def test_split(sample, y_s, feature, n_classes): - size = y_s.shape[0] - if size == 0: - return float_info.max, np.float64(np.inf) - - f = feature[sample] - sort_indices = np.argsort(f) - y_sorted = y_s[sort_indices] - f_sorted = f[sort_indices] - - not_repeated = np.empty(size, dtype=np.bool_) - not_repeated[0: size - 1] = (f_sorted[1:] != f_sorted[:-1]) - not_repeated[size - 1] = True - - l_freq = np.zeros((n_classes, size), dtype=np.int64) - l_freq[y_sorted, np.arange(size)] = 1 - - r_freq = np.zeros((n_classes, size), dtype=np.int64) - r_freq[:, 1:] = l_freq[:, :0:-1] - - l_weight = np.sum(np.square(np.cumsum(l_freq, axis=-1)), axis=0) - r_weight = np.sum(np.square(np.cumsum(r_freq, axis=-1)), axis=0)[::-1] - - l_length = np.arange(1, size + 1, dtype=np.int32) - r_length = np.arange(size - 1, -1, -1, dtype=np.int32) - r_length[size - 1] = 1 # Avoid div by zero, the right score is 0 anyways - - scores = gini_criteria_proxy(l_weight, l_length, r_weight, r_length, - not_repeated) - - min_index = size - np.argmin(scores[::-1]) - 1 - - if min_index + 1 == size: - b_value = np.float64(np.inf) - else: - b_value = (f_sorted[min_index] + f_sorted[min_index + 1]) / 2 - return scores[min_index], b_value diff --git a/dislib/classification/rf/__init__.py b/dislib/commons/rf/__init__.py similarity index 100% rename from dislib/classification/rf/__init__.py rename to dislib/commons/rf/__init__.py diff --git a/dislib/commons/rf/data.py b/dislib/commons/rf/data.py new file mode 100644 index 00000000..e5155bdc --- /dev/null +++ b/dislib/commons/rf/data.py @@ -0,0 +1,430 @@ +import tempfile + +import numpy as np +from numpy.lib import format +from pycompss.api.parameter import ( + FILE_IN, + FILE_INOUT, + COLLECTION_IN, + Depth, + Type, +) +from pycompss.api.task import task + +from dislib.data.array import Array + + +class RfBaseDataset: + """Base class for Dataset format used by the fit() of the + RandomForestRegressor and RandomForestClassifier. + + Warning: This class should not be used directly. Use derived classes + instead. + """ + + def __init__(self, samples_path, targets_path, features_path=None): + self.samples_path = samples_path + self.targets_path = targets_path + self.features_path = features_path + self.n_samples = None + self.n_features = None + self.y_targets = None + + def get_n_samples(self): + """Gets the number of samples obtained from the samples file. + + Returns + ------- + n_samples: int + + Raises + ------ + AssertionError + If self.n_samples is None and self.samples_path is not a string. + ValueError + If invalid content is encountered in the samples file. + + """ + if self.n_samples is None: + if not isinstance(self.samples_path, str): + raise TypeError( + "self.n_samples must be set manually if self.samples_path " + "is a pycompss.runtime.Future object" + ) + shape = _NpyFile(self.samples_path).get_shape() + self.n_samples, self.n_features = shape + return self.n_samples + + def get_n_features(self): + """Gets the number of features obtained from the samples file. + + Returns + ------- + n_features: int + + Raises + ------ + AssertionError + If self.n_features is None and self.samples_path is not a string. + ValueError + If invalid content is encountered in the samples file. + + """ + if self.n_features is None: + if not isinstance(self.samples_path, str): + raise TypeError( + "self.n_features must be set manually if self.samples_path" + " is a pycompss.runtime.Future object" + ) + shape = _NpyFile(self.samples_path).get_shape() + self.n_samples, self.n_features = shape + return self.n_features + + def validate_features_file(self): + """Validates the features file header information. + + Raises + ------ + ValueError + If the shape of the array in the features_file doesn't match this + class n_samples and n_features or if the array is in fortran order. + + """ + features_npy_file = _NpyFile(self.features_path) + shape = features_npy_file.get_shape() + fortran_order = features_npy_file.get_fortran_order() + if (self.get_n_features(), self.get_n_samples()) != shape: + raise ValueError("Invalid dimensions for the features_file.") + if fortran_order: + raise ValueError("Fortran order not supported for features array.") + + +class RfClassifierDataset(RfBaseDataset): + """Dataset format used by the fit() of the RandomForestClassifier. + + The RfDataset contains a file path for the samples and another one for the + labels. Optionally, a path can be provided for a transposed version of the + samples matrix, i.e., the features. + + Note: For a representation of a dataset distributed in multiple files, use + dislib.data.Dataset instead. + + Parameters + ---------- + samples_path: str + Path of the .npy file containing the 2-d array of samples. It can be a + pycompss.runtime.Future object. If so, self.n_samples and + self.n_features must be set manually (they can also be + pycompss.runtime.Future objects). + targets_path: str + Path of the .dat file containing the 1-d array of target labels. + It can be a pycompss.runtime.Future object. + features_path: str, optional (default=None) + Path of the .npy file containing the 2-d array of samples transposed. + The array must be C-ordered. Providing this array may improve the + performance as it allows sequential access to the features. + + Attributes + ---------- + n_samples: int + The number of samples of the dataset. It can be a + pycompss.runtime.Future object. + n_features: int + The number of features of the dataset. It can be a + pycompss.runtime.Future object. + y_targets: ndarray + The codified array of labels for this RfDataset. The values are indices + of the array of classes, which contains the corresponding labels. The + dtype is np.int8. It can be a pycompss.runtime.Future object. + y_categories: ndarray + The array of classes for this RfDataset. The values are unique. It can + be a pycompss.runtime.Future object. + n_classes: int + The number of classes of this RfDataset. It can be a + pycompss.runtime.Future object. + + """ + + def __init__(self, samples_path, targets_path, features_path=None): + super().__init__(samples_path, targets_path, features_path) + self.y_categories = None + self.n_classes = None + + def get_y_targets(self): + """Obtains the codified array of target labels. + + Returns + ------- + y_targets: ndarray + + """ + if self.y_targets is None: + labels = _get_labels(self.targets_path) + self.y_targets, self.y_categories, self.n_classes = labels + return self.y_targets + + def get_classes(self): + """Obtains the array of label categories. + + Returns + ------- + y_categories: ndarray + + """ + if self.y_categories is None: + labels = _get_labels(self.targets_path) + self.y_targets, self.y_categories, self.n_classes = labels + return self.y_categories + + def get_n_classes(self): + """Obtains the number of classes. + + Returns + ------- + n_classes: int + + """ + if self.n_classes is None: + labels = _get_labels(self.targets_path) + self.y_targets, self.y_categories, self.n_classes = labels + return self.n_classes + + +class RfRegressorDataset(RfBaseDataset): + """Dataset format used by the fit() of the RandomForestRegressor. + + The RfDataset contains a file path for the samples and another one for the + targets. Optionally, a path can be provided for a transposed version of the + samples matrix, i.e., the features. + + Note: For a representation of a dataset distributed in multiple files, use + dislib.data.Dataset instead. + + Parameters + ---------- + samples_path: str + Path of the .npy file containing the 2-d array of samples. It can be a + pycompss.runtime.Future object. If so, self.n_samples and + self.n_features must be set manually (they can also be + pycompss.runtime.Future objects). + targets_path: str + Path of the .dat file containing the 1-d array of target values. + It can be a pycompss.runtime.Future object. + features_path: str, optional (default=None) + Path of the .npy file containing the 2-d array of samples transposed. + The array must be C-ordered. Providing this array may improve the + performance as it allows sequential access to the features. + + Attributes + ---------- + n_samples: int + The number of samples of the dataset. It can be a + pycompss.runtime.Future object. + n_features: int + The number of features of the dataset. It can be a + pycompss.runtime.Future object. + y_targets: ndarray + The array of targets for this RfDataset. It can be a + pycompss.runtime.Future object. + + """ + + def __init__(self, samples_path, targets_path, features_path=None): + super().__init__(samples_path, targets_path, features_path) + + def get_y_targets(self): + """Obtains the array of target values. + + Returns + ------- + y_targets: ndarray + + """ + if self.y_targets is None: + targets = _get_values(self.targets_path) + self.y_targets = targets + return self.y_targets + + def get_n_classes(self): + return None + + def get_classes(self): + return None + + +def transform_to_rf_dataset( + x: Array, y: Array, task: str, features_file=False +) -> RfRegressorDataset or RfClassifierDataset: + """Creates a RfDataset object from samples x and targets y. + + This function creates a dislib.commons.rf.data.RfDataset by saving + x and y in files. + + Parameters + ---------- + x: ds-array, shape = (n_samples, n_features) + The training input samples. + y: ds-array, shape = (n_samples,) or (n_samples, n_outputs) + The target values. + task: {"classification", "regression"} + Task of the Random Forest. + + Returns + ------- + rf_dataset: dislib.regression.rf._data.RfDataset + + """ + n_samples = x.shape[0] + n_features = x.shape[1] + + # Samples + samples_file = tempfile.NamedTemporaryFile( + mode="wb", prefix="tmp_rf_samples_", delete=False + ) + samples_path = samples_file.name + samples_file.close() + _allocate_samples_file(samples_path, n_samples, n_features) + + start_idx = 0 + row_blocks_iterator = x._iterator(axis=0) + top_row = next(row_blocks_iterator) + _fill_samples_file(samples_path, top_row._blocks, start_idx) + start_idx += x._top_left_shape[0] + for x_row in row_blocks_iterator: + _fill_samples_file(samples_path, x_row._blocks, start_idx) + start_idx += x._reg_shape[0] + + # Targets + targets_file = tempfile.NamedTemporaryFile( + mode="w", prefix="tmp_rf_targets_", delete=False + ) + targets_path = targets_file.name + targets_file.close() + for y_row in y._iterator(axis=0): + _fill_targets_file(targets_path, y_row._blocks) + + # Features + if features_file: + features_file = tempfile.NamedTemporaryFile( + mode="wb", prefix="tmp_rf_features_", delete=False + ) + features_path = features_file.name + features_file.close() + _allocate_features_file(features_path, n_samples, n_features) + + start_idx = 0 + row_blocks_iterator = x._iterator(axis=0) + top_row = next(row_blocks_iterator) + _fill_features_file(features_path, top_row._blocks, start_idx) + start_idx += x._top_left_shape[0] + for x_row in row_blocks_iterator: + _fill_features_file(features_path, x_row._blocks, start_idx) + start_idx += x._reg_shape[0] + else: + features_path = None + + if task == "classification": + rf_dataset = RfClassifierDataset( + samples_path, targets_path, features_path + ) + elif task == "regression": + rf_dataset = RfRegressorDataset( + samples_path, targets_path, features_path + ) + else: + raise ValueError("task must be either classification or regression.") + rf_dataset.n_samples = n_samples + rf_dataset.n_features = n_features + return rf_dataset + + +class _NpyFile(object): + def __init__(self, path): + self.path = path + + self.shape = None + self.fortran_order = None + self.dtype = None + + def get_shape(self): + if self.shape is None: + self._read_header() + return self.shape + + def get_fortran_order(self): + if self.fortran_order is None: + self._read_header() + return self.fortran_order + + def get_dtype(self): + if self.dtype is None: + self._read_header() + return self.dtype + + def _read_header(self): + with open(self.path, "rb") as fp: + version = format.read_magic(fp) + try: + format._check_version(version) + except ValueError: + raise ValueError("Invalid file format.") + header_data = format._read_array_header(fp, version) + self.shape, self.fortran_order, self.dtype = header_data + + +@task(targets_path=FILE_IN, returns=3) +def _get_labels(targets_path): + # Classification + y = np.genfromtxt(targets_path, dtype=None, encoding="utf-8") + categories, codes = np.unique(y, return_inverse=True) + return codes.astype(np.int8), categories, len(categories) + + +@task(targets_path=FILE_IN, returns=1) +def _get_values(targets_path): + # Regression + y = np.genfromtxt(targets_path, dtype=None, encoding="utf-8") + return y.astype(np.float64) + + +@task(samples_path=FILE_INOUT) +def _allocate_samples_file(samples_path, n_samples, n_features): + np.lib.format.open_memmap( + samples_path, + mode="w+", + dtype="float32", + shape=(int(n_samples), int(n_features)), + ) + + +@task(samples_path=FILE_INOUT) +def _allocate_features_file(samples_path, n_samples, n_features): + np.lib.format.open_memmap( + samples_path, + mode="w+", + dtype="float32", + shape=(int(n_features), int(n_samples)), + ) + + +@task(samples_path=FILE_INOUT, row_blocks={Type: COLLECTION_IN, Depth: 2}) +def _fill_samples_file(samples_path, row_blocks, start_idx): + rows_samples = Array._merge_blocks(row_blocks) + rows_samples = rows_samples.astype(dtype="float32", casting="same_kind") + samples = np.lib.format.open_memmap(samples_path, mode="r+") + samples[start_idx: start_idx + rows_samples.shape[0]] = rows_samples + + +@task(samples_path=FILE_INOUT, row_blocks={Type: COLLECTION_IN, Depth: 2}) +def _fill_features_file(samples_path, row_blocks, start_idx): + rows_samples = Array._merge_blocks(row_blocks) + rows_samples = rows_samples.astype(dtype="float32", casting="same_kind") + samples = np.lib.format.open_memmap(samples_path, mode="r+") + samples[:, start_idx: start_idx + rows_samples.shape[0]] = rows_samples.T + + +@task(targets_path=FILE_INOUT, row_blocks={Type: COLLECTION_IN, Depth: 2}) +def _fill_targets_file(targets_path, row_blocks): + rows_targets = Array._merge_blocks(row_blocks) + with open(targets_path, "at") as f: + np.savetxt(f, rows_targets, fmt="%s", encoding="utf-8") diff --git a/dislib/classification/rf/decision_tree.py b/dislib/commons/rf/decision_tree.py similarity index 52% rename from dislib/classification/rf/decision_tree.py rename to dislib/commons/rf/decision_tree.py index 0725fcfa..751983d4 100644 --- a/dislib/classification/rf/decision_tree.py +++ b/dislib/commons/rf/decision_tree.py @@ -6,66 +6,28 @@ from pycompss.api.parameter import FILE_IN, Type, COLLECTION_IN, Depth from pycompss.api.task import task from sklearn.tree import DecisionTreeClassifier as SklearnDTClassifier +from sklearn.tree import DecisionTreeRegressor as SklearnDTRegressor -from dislib.classification.rf.test_split import test_split +from dislib.commons.rf.test_split import test_split from dislib.data.array import Array -class DecisionTreeClassifier: - """A distributed decision tree classifier. - - Parameters - ---------- - try_features : int - The number of features to consider when looking for the best split. - - Note: the search for a split does not stop until at least one - valid partition of the node samples is found, even if it requires - to effectively inspect more than ``try_features`` features. - max_depth : int - The maximum depth of the tree. If np.inf, then nodes are expanded - until all leaves are pure. - distr_depth : int - Number of levels of the tree in which the nodes are split in a - distributed way. - bootstrap : bool - Randomly select n_instances samples with repetition (used in random - forests). - random_state : RandomState instance - The random number generator. - - Attributes - ---------- - n_features : int - The number of features of the dataset. It can be a - pycompss.runtime.Future object. - n_classes : int - The number of classes of this RfDataset. It can be a - pycompss.runtime.Future object. - tree : None or _Node - The root node of the tree after the tree is fitted. - nodes_info : None or list of _InnerNodeInfo and _LeafInfo - List of the node information for the nodes of the tree in the same - order as obtained in the fit() method, up to ``distr_depth`` depth. - After fit(), it is a pycompss.runtime.Future object. - subtrees : None or list of _Node - List of subtrees of the tree at ``distr_depth`` depth obtained in the - fit() method. After fit(), it is a list of pycompss.runtime.Future - objects. - - Methods - ------- - fit(dataset) - Fits the DecisionTreeClassifier. - predict(x_row) - Predicts classes for the given samples using a fitted tree. - predict_proba(x_row) - Predicts class probabilities for the given smaples using a fitted tree. +class BaseDecisionTree: + """Base class for distributed decision trees. + Warning: This class should not be used directly. + Use derived classes instead. """ - def __init__(self, try_features, max_depth, distr_depth, sklearn_max, - bootstrap, random_state): + def __init__( + self, + try_features, + max_depth, + distr_depth, + sklearn_max, + bootstrap, + random_state, + ): self.try_features = try_features self.max_depth = max_depth self.distr_depth = distr_depth @@ -81,7 +43,7 @@ def __init__(self, try_features, max_depth, distr_depth, sklearn_max, self.subtrees = None def fit(self, dataset): - """Fits the DecisionTreeClassifier. + """Fits the DecisionTree. Parameters ---------- @@ -94,43 +56,55 @@ def fit(self, dataset): samples_path = dataset.samples_path features_path = dataset.features_path n_samples = dataset.get_n_samples() - y_codes = dataset.get_y_codes() + y_targets = dataset.get_y_targets() seed = self.random_state.randint(np.iinfo(np.int32).max) - sample, y_s = _sample_selection(n_samples, y_codes, self.bootstrap, - seed) + sample, y_s = _sample_selection( + n_samples, y_targets, self.bootstrap, seed + ) + Node = _ClassificationNode if self.n_classes else _RegressionNode - self.tree = _Node() + self.tree = Node() self.nodes_info = [] self.subtrees = [] tree_traversal = [(self.tree, sample, y_s, 0)] while tree_traversal: node, sample, y_s, depth = tree_traversal.pop() if depth < self.distr_depth: - split = _split_node_wrapper(sample, self.n_features, y_s, - self.n_classes, self.try_features, - self.random_state, - samples_file=samples_path, - features_file=features_path) + split = _split_node_wrapper( + sample, + self.n_features, + y_s, + self.n_classes, + self.try_features, + self.random_state, + samples_file=samples_path, + features_file=features_path, + ) node_info, left_group, y_l, right_group, y_r = split compss_delete_object(sample) compss_delete_object(y_s) node.content = len(self.nodes_info) self.nodes_info.append(node_info) - node.left = _Node() - node.right = _Node() + node.left = Node() + node.right = Node() depth = depth + 1 tree_traversal.append((node.right, right_group, y_r, depth)) tree_traversal.append((node.left, left_group, y_l, depth)) else: - subtree = _build_subtree_wrapper(sample, y_s, self.n_features, - self.max_depth - depth, - self.n_classes, - self.try_features, - self.sklearn_max, - self.random_state, - samples_path, features_path) + subtree = _build_subtree_wrapper( + sample, + y_s, + self.n_features, + self.max_depth - depth, + self.n_classes, + self.try_features, + self.sklearn_max, + self.random_state, + samples_path, + features_path, + ) node.content = len(self.subtrees) self.subtrees.append(subtree) compss_delete_object(sample) @@ -138,7 +112,8 @@ def fit(self, dataset): self.nodes_info = _merge(*self.nodes_info) def predict(self, x_row): - """Predicts classes for the given samples using a fitted tree. + """Predicts target values or classes for the given samples using + a fitted tree. Parameters ---------- @@ -148,21 +123,100 @@ def predict(self, x_row): Returns ------- predicted : ndarray - An array with the predicted classes for the given samples. The - values are codes of the fitted + An array with the predicted classes or values for the given + samples. For classification, the values are codes of the fitted dislib.classification.rf.data.RfDataset. The returned object can be a pycompss.runtime.Future object. - """ - assert self.tree is not None, 'The decision tree is not fitted.' + assert self.tree is not None, "The decision tree is not fitted." branch_predictions = [] for i, subtree in enumerate(self.subtrees): - pred = _predict_branch(x_row._blocks, self.tree, self.nodes_info, - i, subtree, self.distr_depth) + pred = _predict_branch( + x_row._blocks, + self.tree, + self.nodes_info, + i, + subtree, + self.distr_depth, + ) branch_predictions.append(pred) - return _merge_branches(None, *branch_predictions) + return _merge_branches( + None, *branch_predictions, classification=self.n_classes is None + ) + + +class DecisionTreeClassifier(BaseDecisionTree): + """A distributed decision tree classifier. + + Parameters + ---------- + try_features : int + The number of features to consider when looking for the best split. + + Note: the search for a split does not stop until at least one + valid partition of the node samples is found, even if it requires + to effectively inspect more than ``try_features`` features. + max_depth : int + The maximum depth of the tree. If np.inf, then nodes are expanded + until all leaves are pure. + distr_depth : int + Number of levels of the tree in which the nodes are split in a + distributed way. + bootstrap : bool + Randomly select n_instances samples with repetition (used in random + forests). + random_state : RandomState instance + The random number generator. + + Attributes + ---------- + n_features : int + The number of features of the dataset. It can be a + pycompss.runtime.Future object. + n_classes : int + The number of classes of this RfDataset. It can be a + pycompss.runtime.Future object. + tree : None or _Node + The root node of the tree after the tree is fitted. + nodes_info : None or list of _InnerNodeInfo and _LeafInfo + List of the node information for the nodes of the tree in the same + order as obtained in the fit() method, up to ``distr_depth`` depth. + After fit(), it is a pycompss.runtime.Future object. + subtrees : None or list of _Node + List of subtrees of the tree at ``distr_depth`` depth obtained in the + fit() method. After fit(), it is a list of pycompss.runtime.Future + objects. + + Methods + ------- + fit(dataset) + Fits the DecisionTreeClassifier. + predict(x_row) + Predicts classes for the given samples using a fitted tree. + predict_proba(x_row) + Predicts class probabilities for the given smaples using a fitted tree. + + """ + + def __init__( + self, + try_features, + max_depth, + distr_depth, + sklearn_max, + bootstrap, + random_state, + ): + super().__init__( + try_features, + max_depth, + distr_depth, + sklearn_max, + bootstrap, + random_state, + ) def predict_proba(self, x_row): """Predicts class probabilities for a row block using a fitted tree. @@ -183,39 +237,121 @@ def predict_proba(self, x_row): """ - assert self.tree is not None, 'The decision tree is not fitted.' + assert self.tree is not None, "The decision tree is not fitted." branch_predictions = [] for i, subtree in enumerate(self.subtrees): - pred = _predict_branch_proba(x_row._blocks, self.tree, - self.nodes_info, i, subtree, - self.distr_depth, self.n_classes) + pred = _predict_branch_proba( + x_row._blocks, + self.tree, + self.nodes_info, + i, + subtree, + self.distr_depth, + self.n_classes, + ) branch_predictions.append(pred) - return _merge_branches(self.n_classes, *branch_predictions) + return _merge_branches( + self.n_classes, *branch_predictions, classification=True + ) + + +class DecisionTreeRegressor(BaseDecisionTree): + """A distributed decision tree regressor. + + Parameters + ---------- + try_features : int + The number of features to consider when looking for the best split. + + Note: the search for a split does not stop until at least one + valid partition of the node samples is found, even if it requires + to effectively inspect more than ``try_features`` features. + max_depth : int + The maximum depth of the tree. If np.inf, then nodes are expanded + until all leaves are pure. + distr_depth : int + Number of levels of the tree in which the nodes are split in a + distributed way. + bootstrap : bool + Randomly select n_instances samples with repetition (used in random + forests). + random_state : RandomState instance + The random number generator. + + Attributes + ---------- + n_features : int + The number of features of the dataset. It can be a + pycompss.runtime.Future object. + tree : None or _Node + The root node of the tree after the tree is fitted. + nodes_info : None or list of _InnerNodeInfo and _LeafInfo + List of the node information for the nodes of the tree in the same + order as obtained in the fit() method, up to ``distr_depth`` depth. + After fit(), it is a pycompss.runtime.Future object. + subtrees : None or list of _Node + List of subtrees of the tree at ``distr_depth`` depth obtained in the + fit() method. After fit(), it is a list of pycompss.runtime.Future + objects. + + Methods + ------- + fit(dataset) + Fits the DecisionTreeRegressor. + predict(x_row) + Predicts target values for the given samples using a fitted tree. + """ + + def __init__( + self, + try_features, + max_depth, + distr_depth, + sklearn_max, + bootstrap, + random_state, + ): + super().__init__( + try_features, + max_depth, + distr_depth, + sklearn_max, + bootstrap, + random_state, + ) class _Node: + """Base class for tree nodes""" - def __init__(self): + def __init__(self, is_classifier): self.content = None self.left = None self.right = None + self.is_classifier = is_classifier + self.predict_dtype = np.int64 if is_classifier else np.float64 def predict(self, sample): node_content = self.content if isinstance(node_content, _LeafInfo): - return np.full((len(sample),), node_content.mode) + return np.full((len(sample),), node_content.target) if isinstance(node_content, _SkTreeWrapper): if len(sample) > 0: return node_content.sk_tree.predict(sample) if isinstance(node_content, _InnerNodeInfo): - pred = np.empty((len(sample),), dtype=np.int64) + pred = np.empty((len(sample),), dtype=self.predict_dtype) left_mask = sample[:, node_content.index] <= node_content.value pred[left_mask] = self.left.predict(sample[left_mask]) pred[~left_mask] = self.right.predict(sample[~left_mask]) return pred - assert len(sample) == 0, 'Type not supported' - return np.empty((0,), dtype=np.int64) + assert len(sample) == 0, "Type not supported" + return np.empty((0,), dtype=self.predict_dtype) + + +class _ClassificationNode(_Node): + def __init__(self): + super().__init__(is_classifier=True) def predict_proba(self, sample, n_classes): node_content = self.content @@ -234,10 +370,15 @@ def predict_proba(self, sample, n_classes): pred[l_msk] = self.left.predict_proba(sample[l_msk], n_classes) pred[~l_msk] = self.right.predict_proba(sample[~l_msk], n_classes) return pred - assert len(sample) == 0, 'Type not supported' + assert len(sample) == 0, "Type not supported" return np.empty((0, n_classes), dtype=np.float64) +class _RegressionNode(_Node): + def __init__(self): + super().__init__(is_classifier=False) + + class _InnerNodeInfo: def __init__(self, index=None, value=None): self.index = index @@ -245,55 +386,48 @@ def __init__(self, index=None, value=None): class _LeafInfo: - def __init__(self, size=None, frequencies=None, mode=None): + def __init__(self, size=None, frequencies=None, target=None): self.size = size self.frequencies = frequencies - self.mode = mode + self.target = target class _SkTreeWrapper: def __init__(self, tree): self.sk_tree = tree - self.classes = tree.classes_ def _get_sample_attributes(samples_file, indices): - samples_mmap = np.load(samples_file, mmap_mode='r', allow_pickle=False) + samples_mmap = np.load(samples_file, mmap_mode="r", allow_pickle=False) x = samples_mmap[indices] return x -def _get_feature_mmap(features_file, i): - return _get_features_mmap(features_file)[i] - - -def _get_features_mmap(features_file): - return np.load(features_file, mmap_mode='r', allow_pickle=False) - - @task(priority=True, returns=2) -def _sample_selection(n_samples, y_codes, bootstrap, seed): +def _sample_selection(n_samples, y_targets, bootstrap, seed): if bootstrap: random_state = RandomState(seed) - selection = random_state.choice(n_samples, size=n_samples, - replace=True) + selection = random_state.choice( + n_samples, size=n_samples, replace=True + ) selection.sort() - return selection, y_codes[selection] + return selection, y_targets[selection] else: - return np.arange(n_samples), y_codes + return np.arange(n_samples), y_targets def _feature_selection(untried_indices, m_try, random_state): selection_len = min(m_try, len(untried_indices)) - return random_state.choice(untried_indices, size=selection_len, - replace=False) + return random_state.choice( + untried_indices, size=selection_len, replace=False + ) def _get_groups(sample, y_s, features_mmap, index, value): if index is None: empty_sample = np.array([], dtype=np.int64) - empty_labels = np.array([], dtype=np.int8) - return sample, y_s, empty_sample, empty_labels + empty_target = np.array([], dtype=y_s.dtype) + return sample, y_s, empty_sample, empty_target feature = features_mmap[index][sample] mask = feature < value left = sample[mask] @@ -304,52 +438,72 @@ def _get_groups(sample, y_s, features_mmap, index, value): def _compute_leaf_info(y_s, n_classes): - frequencies = np.bincount(y_s, minlength=n_classes) - mode = np.argmax(frequencies) - return _LeafInfo(len(y_s), frequencies, mode) - - -def _split_node_wrapper(sample, n_features, y_s, n_classes, m_try, - random_state, samples_file=None, features_file=None): + if n_classes is not None: + frequencies = np.bincount(y_s, minlength=n_classes) + mode = np.argmax(frequencies) + return _LeafInfo(len(y_s), frequencies, mode) + else: + return _LeafInfo(len(y_s), None, np.mean(y_s)) + + +def _split_node_wrapper( + sample, + n_features, + y_s, + n_classes, + m_try, + random_state, + samples_file=None, + features_file=None, +): seed = random_state.randint(np.iinfo(np.int32).max) if features_file is not None: - return _split_node_using_features(sample, n_features, y_s, n_classes, - m_try, features_file, seed) + return _split_node_using_features( + sample, n_features, y_s, n_classes, m_try, features_file, seed + ) elif samples_file is not None: - return _split_node(sample, n_features, y_s, n_classes, m_try, - samples_file, seed) + return _split_node( + sample, n_features, y_s, n_classes, m_try, samples_file, seed + ) else: - raise ValueError('Invalid combination of arguments. samples_file is ' - 'None and features_file is None.') + raise ValueError( + "Invalid combination of arguments. samples_file is " + "None and features_file is None." + ) @task(features_file=FILE_IN, returns=(object, list, list, list, list)) -def _split_node_using_features(sample, n_features, y_s, n_classes, m_try, - features_file, seed): - features_mmap = np.load(features_file, mmap_mode='r', allow_pickle=False) +def _split_node_using_features( + sample, n_features, y_s, n_classes, m_try, features_file, seed +): + features_mmap = np.load(features_file, mmap_mode="r", allow_pickle=False) random_state = RandomState(seed) - return _compute_split(sample, n_features, y_s, n_classes, m_try, - features_mmap, random_state) + return _compute_split( + sample, n_features, y_s, n_classes, m_try, features_mmap, random_state + ) @task(samples_file=FILE_IN, returns=(object, list, list, list, list)) def _split_node(sample, n_features, y_s, n_classes, m_try, samples_file, seed): - features_mmap = np.load(samples_file, mmap_mode='r', allow_pickle=False).T + features_mmap = np.load(samples_file, mmap_mode="r", allow_pickle=False).T random_state = RandomState(seed) - return _compute_split(sample, n_features, y_s, n_classes, m_try, - features_mmap, random_state) + return _compute_split( + sample, n_features, y_s, n_classes, m_try, features_mmap, random_state + ) -def _compute_split(sample, n_features, y_s, n_classes, m_try, features_mmap, - random_state): +def _compute_split( + sample, n_features, y_s, n_classes, m_try, features_mmap, random_state +): node_info = left_group = y_l = right_group = y_r = None split_ended = False tried_indices = [] while not split_ended: untried_indices = np.setdiff1d(np.arange(n_features), tried_indices) - index_selection = _feature_selection(untried_indices, m_try, - random_state) + index_selection = _feature_selection( + untried_indices, m_try, random_state + ) b_score = float_info.max b_index = None b_value = None @@ -371,54 +525,127 @@ def _compute_split(sample, n_features, y_s, n_classes, m_try, features_mmap, left_group = sample y_l = y_s right_group = np.array([], dtype=np.int64) - y_r = np.array([], dtype=np.int8) + y_r = np.array([], dtype=y_s.dtype) return node_info, left_group, y_l, right_group, y_r -def _build_subtree_wrapper(sample, y_s, n_features, max_depth, n_classes, - m_try, sklearn_max, random_state, samples_file, - features_file): +def _build_subtree_wrapper( + sample, + y_s, + n_features, + max_depth, + n_classes, + m_try, + sklearn_max, + random_state, + samples_file, + features_file, +): seed = random_state.randint(np.iinfo(np.int32).max) if features_file is not None: - return _build_subtree_using_features(sample, y_s, n_features, - max_depth, n_classes, m_try, - sklearn_max, seed, samples_file, - features_file) + return _build_subtree_using_features( + sample, + y_s, + n_features, + max_depth, + n_classes, + m_try, + sklearn_max, + seed, + samples_file, + features_file, + ) else: - return _build_subtree(sample, y_s, n_features, max_depth, n_classes, - m_try, sklearn_max, seed, samples_file) + return _build_subtree( + sample, + y_s, + n_features, + max_depth, + n_classes, + m_try, + sklearn_max, + seed, + samples_file, + ) @task(samples_file=FILE_IN, features_file=FILE_IN, returns=_Node) -def _build_subtree_using_features(sample, y_s, n_features, max_depth, - n_classes, m_try, sklearn_max, seed, - samples_file, features_file): +def _build_subtree_using_features( + sample, + y_s, + n_features, + max_depth, + n_classes, + m_try, + sklearn_max, + seed, + samples_file, + features_file, +): random_state = RandomState(seed) - return _compute_build_subtree(sample, y_s, n_features, max_depth, - n_classes, m_try, sklearn_max, random_state, - samples_file, features_file=features_file) + return _compute_build_subtree( + sample, + y_s, + n_features, + max_depth, + n_classes, + m_try, + sklearn_max, + random_state, + samples_file, + features_file=features_file, + ) @task(samples_file=FILE_IN, returns=_Node) -def _build_subtree(sample, y_s, n_features, max_depth, n_classes, m_try, - sklearn_max, seed, samples_file): +def _build_subtree( + sample, + y_s, + n_features, + max_depth, + n_classes, + m_try, + sklearn_max, + seed, + samples_file, +): random_state = RandomState(seed) - return _compute_build_subtree(sample, y_s, n_features, max_depth, - n_classes, m_try, sklearn_max, random_state, - samples_file) - - -def _compute_build_subtree(sample, y_s, n_features, max_depth, n_classes, - m_try, sklearn_max, random_state, samples_file, - features_file=None, use_sklearn=True): + return _compute_build_subtree( + sample, + y_s, + n_features, + max_depth, + n_classes, + m_try, + sklearn_max, + random_state, + samples_file, + ) + + +def _compute_build_subtree( + sample, + y_s, + n_features, + max_depth, + n_classes, + m_try, + sklearn_max, + random_state, + samples_file, + features_file=None, + use_sklearn=True, +): + Node = _ClassificationNode if n_classes else _RegressionNode + SklearnDT = SklearnDTClassifier if n_classes else SklearnDTRegressor if not sample.size: - return _Node() + return Node() if features_file is not None: - mmap = np.load(features_file, mmap_mode='r', allow_pickle=False) + mmap = np.load(features_file, mmap_mode="r", allow_pickle=False) else: - mmap = np.load(samples_file, mmap_mode='r', allow_pickle=False).T - subtree = _Node() + mmap = np.load(samples_file, mmap_mode="r", allow_pickle=False).T + subtree = Node() tree_traversal = [(subtree, sample, y_s, 0)] while tree_traversal: node, sample, y_s, depth = tree_traversal.pop() @@ -428,28 +655,40 @@ def _compute_build_subtree(sample, y_s, n_features, max_depth, n_classes, sklearn_max_depth = None else: sklearn_max_depth = max_depth - depth - dt = SklearnDTClassifier(max_features=m_try, - max_depth=sklearn_max_depth, - random_state=random_state) - unique = np.unique(sample, return_index=True, - return_counts=True) + dt = SklearnDT( + max_features=m_try, + max_depth=sklearn_max_depth, + random_state=random_state, + ) + unique = np.unique( + sample, return_index=True, return_counts=True + ) sample, new_indices, sample_weight = unique x = _get_sample_attributes(samples_file, sample) y_s = y_s[new_indices] dt.fit(x, y_s, sample_weight=sample_weight, check_input=False) node.content = _SkTreeWrapper(dt) else: - split = _compute_split(sample, n_features, y_s, n_classes, - m_try, mmap, random_state) + split = _compute_split( + sample, + n_features, + y_s, + n_classes, + m_try, + mmap, + random_state, + ) node_info, left_group, y_l, right_group, y_r = split node.content = node_info if isinstance(node_info, _InnerNodeInfo): - node.left = _Node() - node.right = _Node() - tree_traversal.append((node.right, right_group, y_r, - depth + 1)) - tree_traversal.append((node.left, left_group, y_l, - depth + 1)) + node.left = Node() + node.right = Node() + tree_traversal.append( + (node.right, right_group, y_r, depth + 1) + ) + tree_traversal.append( + (node.left, left_group, y_l, depth + 1) + ) else: node.content = _compute_leaf_info(y_s, n_classes) return subtree @@ -462,7 +701,7 @@ def _merge(*object_list): def _get_subtree_path(subtree_index, distr_depth): if distr_depth == 0: - return '' + return "" return bin(subtree_index)[2:].zfill(distr_depth) @@ -471,12 +710,12 @@ def _get_predicted_indices(samples, tree, nodes_info, path): for direction in path: node_info = nodes_info[tree.content] if isinstance(node_info, _LeafInfo): - if direction == '1': + if direction == "1": idx_mask[:] = 0 else: col = node_info.index value = node_info.value - if direction == '0': + if direction == "0": idx_mask[idx_mask] = samples[idx_mask, col] <= value tree = tree.left else: @@ -486,8 +725,9 @@ def _get_predicted_indices(samples, tree, nodes_info, path): @task(row_blocks={Type: COLLECTION_IN, Depth: 2}, returns=1) -def _predict_branch(row_blocks, tree, nodes_info, subtree_index, subtree, - distr_depth): +def _predict_branch( + row_blocks, tree, nodes_info, subtree_index, subtree, distr_depth +): samples = Array._merge_blocks(row_blocks) path = _get_subtree_path(subtree_index, distr_depth) indices_mask = _get_predicted_indices(samples, tree, nodes_info, path) @@ -496,8 +736,15 @@ def _predict_branch(row_blocks, tree, nodes_info, subtree_index, subtree, @task(row_blocks={Type: COLLECTION_IN, Depth: 2}, returns=1) -def _predict_branch_proba(row_blocks, tree, nodes_info, subtree_index, subtree, - distr_depth, n_classes): +def _predict_branch_proba( + row_blocks, + tree, + nodes_info, + subtree_index, + subtree, + distr_depth, + n_classes, +): samples = Array._merge_blocks(row_blocks) path = _get_subtree_path(subtree_index, distr_depth) indices_mask = _get_predicted_indices(samples, tree, nodes_info, path) @@ -506,14 +753,19 @@ def _predict_branch_proba(row_blocks, tree, nodes_info, subtree_index, subtree, @task(returns=list) -def _merge_branches(n_classes, *predictions): +def _merge_branches(n_classes, *predictions, classification): samples_len = len(predictions[0][0]) - if n_classes is not None: # predict - shape = (samples_len, n_classes) - dtype = np.float64 - else: # predict_proba + if classification: + if n_classes is not None: # predict class + shape = (samples_len, n_classes) + dtype = np.float64 + else: # predict_proba + shape = (samples_len,) + dtype = np.int64 + else: # predict value shape = (samples_len,) - dtype = np.int64 + dtype = np.float64 + merged_prediction = np.empty(shape, dtype=dtype) for selected, prediction in predictions: merged_prediction[selected] = prediction diff --git a/dislib/commons/rf/forest.py b/dislib/commons/rf/forest.py new file mode 100644 index 00000000..be2e668c --- /dev/null +++ b/dislib/commons/rf/forest.py @@ -0,0 +1,489 @@ +import math +from collections import Counter + +import numpy as np +from pycompss.api.api import compss_wait_on +from pycompss.api.parameter import Type, COLLECTION_IN, Depth +from pycompss.api.task import task +from sklearn.base import BaseEstimator +from sklearn.utils import check_random_state + +from dislib.commons.rf.decision_tree import ( + DecisionTreeClassifier, + DecisionTreeRegressor, +) +from dislib.data.array import Array +from dislib.utils.base import _paired_partition +from dislib.commons.rf.data import transform_to_rf_dataset + + +class BaseRandomForest(BaseEstimator): + """Base class for distributed random forests. + + Warning: This class should not be used directly. + Use derived classes instead. + """ + + def __init__( + self, + n_estimators, + try_features, + max_depth, + distr_depth, + sklearn_max, + hard_vote, + random_state, + ): + self.n_estimators = n_estimators + self.try_features = try_features + self.max_depth = max_depth + self.distr_depth = distr_depth + self.sklearn_max = sklearn_max + self.hard_vote = hard_vote + self.random_state = random_state + + def fit(self, x, y): + """Fits the RandomForest. + + Parameters + ---------- + x : ds-array, shape=(n_samples, n_features) + The training input samples. Internally, its dtype will be converted + to ``dtype=np.float32``. + y : ds-array, shape=(n_samples, 1) + The target values. + + Returns + ------- + self : RandomForest + + """ + self.classes = None + self.trees = [] + + if self.hard_vote is not None: + # Classification + task = "classification" + Tree = DecisionTreeClassifier + else: + # Regression + task = "regression" + Tree = DecisionTreeRegressor + + dataset = transform_to_rf_dataset(x, y, task) + + n_features = dataset.get_n_features() + try_features = _resolve_try_features(self.try_features, n_features) + random_state = check_random_state(self.random_state) + + self.classes = dataset.get_classes() + + if self.distr_depth == "auto": + dataset.n_samples = compss_wait_on(dataset.get_n_samples()) + distr_depth = max(0, int(math.log10(dataset.n_samples)) - 4) + distr_depth = min(distr_depth, self.max_depth) + else: + distr_depth = self.distr_depth + + for i in range(self.n_estimators): + tree = Tree( + try_features, + self.max_depth, + distr_depth, + self.sklearn_max, + bootstrap=True, + random_state=random_state, + ) + self.trees.append(tree) + + for tree in self.trees: + tree.fit(dataset) + + return self + + def predict(self, x): + """Predicts target classes or values using a fitted forest. + + Parameters + ---------- + x : ds-array, shape=(n_samples, n_features) + The input samples. + + Returns + ------- + y_pred : ds-array, shape=(n_samples, 1) + Predicted class labels or values for x. + + """ + assert self.trees is not None, "The random forest is not fitted." + pred_blocks = [] + if self.hard_vote is not None: + # Classification + if self.hard_vote: + for x_row in x._iterator(axis=0): + tree_predictions = [] + for tree in self.trees: + tree_predictions.append(tree.predict(x_row)) + pred_blocks.append( + _hard_vote(self.classes, *tree_predictions) + ) + else: + for x_row in x._iterator(axis=0): + tree_predictions = [] + for tree in self.trees: + tree_predictions.append(tree.predict_proba(x_row)) + pred_blocks.append( + _soft_vote(self.classes, *tree_predictions) + ) + else: + # Regression + for x_row in x._iterator(axis=0): + tree_predictions = [] + for tree in self.trees: + tree_predictions.append(tree.predict(x_row)) + pred_blocks.append(_join_predictions(*tree_predictions)) + + y_pred = Array( + blocks=[pred_blocks], + top_left_shape=(x._top_left_shape[0], 1), + reg_shape=(x._reg_shape[0], 1), + shape=(x.shape[0], 1), + sparse=False, + ) + + return y_pred + + def score(self, x, y, collect=False): + """Accuracy classification score. + + For classification returns the mean accuracy on the given test data. + + For regression returns the coefficient of determination $R^2$ of + the prediction. + The coefficient $R^2$ is defined as $(1-u/v)$, where $u$ + is the residual sum of squares `((y_true - y_pred) ** 2).sum()` and + $v$ is the total sum of squares + `((y_true - y_true.mean()) ** 2).sum()`. + The best possible score is 1.0 and it can be negative + if the model is arbitrarily worse. + A constant model that always predicts the expected value of y, + disregarding the input features, would get a $R^2$ score of 0.0. + + Parameters + ---------- + x : ds-array, shape=(n_samples, n_features) + The training input samples. + y : ds-array, shape (n_samples, 1) + The true labels. + collect : bool, optional (default=False) + When True, a synchronized result is returned. + + + Returns + ------- + score : float (as future object) + Fraction of correctly classified samples for classification + or coefficient of determination $R^2$ for regression. + + """ + assert self.trees is not None, "The random forest is not fitted." + partial_scores = [] + if self.hard_vote is not None: + # Classification + if self.hard_vote: + for x_row, y_row in _paired_partition(x, y): + tree_predictions = [] + for tree in self.trees: + tree_predictions.append(tree.predict(x_row)) + subset_score = _hard_vote_score( + y_row._blocks, self.classes, *tree_predictions + ) + partial_scores.append(subset_score) + else: + for x_row, y_row in _paired_partition(x, y): + tree_predictions = [] + for tree in self.trees: + tree_predictions.append(tree.predict_proba(x_row)) + subset_score = _soft_vote_score( + y_row._blocks, self.classes, *tree_predictions + ) + partial_scores.append(subset_score) + score = _merge_classification_scores(*partial_scores) + else: + # Regression + for x_row, y_row in _paired_partition(x, y): + tree_predictions = [] + for tree in self.trees: + tree_predictions.append(tree.predict(x_row)) + subset_score = _regression_score( + y_row._blocks, *tree_predictions + ) + partial_scores.append(subset_score) + score = _merge_regression_scores(*partial_scores) + + return compss_wait_on(score) if collect else score + + +class RandomForestClassifier(BaseRandomForest): + """A distributed random forest classifier. + + Parameters + ---------- + n_estimators : int, optional (default=10) + Number of trees to fit. + try_features : int, str or None, optional (default='sqrt') + The number of features to consider when looking for the best split: + + - If "sqrt", then `try_features=sqrt(n_features)`. + - If "third", then `try_features=n_features // 3`. + - If None, then `try_features=n_features`. + + Note: the search for a split does not stop until at least one + valid partition of the node samples is found, even if it requires + to effectively inspect more than ``try_features`` features. + max_depth : int or np.inf, optional (default=np.inf) + The maximum depth of the tree. If np.inf, then nodes are expanded + until all leaves are pure. + distr_depth : int or str, optional (default='auto') + Number of levels of the tree in which the nodes are split in a + distributed way. + sklearn_max: int or float, optional (default=1e8) + Maximum size (len(subsample)*n_features) of the arrays passed to + sklearn's DecisionTreeClassifier.fit(), which is called to fit subtrees + (subsamples) of our DecisionTreeClassifier. sklearn fit() is used + because it's faster, but requires loading the data to memory, which can + cause memory problems for large datasets. This parameter can be + adjusted to fit the hardware capabilities. + hard_vote : bool, optional (default=False) + If True, it uses majority voting over the predict() result of the + decision tree predictions. If False, it takes the class with the higher + probability given by predict_proba(), which is an average of the + probabilities given by the decision trees. + random_state : int, RandomState instance or None, optional (default=None) + If int, random_state is the seed used by the random number generator; + If RandomState instance, random_state is the random number generator; + If None, the random number generator is the RandomState instance used + by `np.random`. + + Attributes + ---------- + classes : None or ndarray + Array of distinct classes, set at fit(). + trees : list of DecisionTreeClassifier + List of the tree classifiers of this forest, populated at fit(). + """ + + def __init__( + self, + n_estimators=10, + try_features="sqrt", + max_depth=np.inf, + distr_depth="auto", + sklearn_max=1e8, + hard_vote=False, + random_state=None, + ): + super().__init__( + n_estimators, + try_features, + max_depth, + distr_depth, + sklearn_max, + hard_vote, + random_state, + ) + + def predict_proba(self, x): + """Predicts class probabilities using a fitted forest. + + The probabilities are obtained as an average of the probabilities of + each decision tree. + + + Parameters + ---------- + x : ds-array, shape=(n_samples, n_features) + The input samples. + + Returns + ------- + probabilities : ds-array, shape=(n_samples, n_classes) + Predicted probabilities for the samples to belong to each class. + The columns of the array correspond to the classes given at + self.classes. + + """ + assert self.trees is not None, "The random forest is not fitted." + prob_blocks = [] + for x_row in x._iterator(axis=0): + tree_predictions = [] + for tree in self.trees: + tree_predictions.append(tree.predict_proba(x_row)) + prob_blocks.append([_join_predictions(*tree_predictions)]) + self.classes = compss_wait_on(self.classes) + n_classes = len(self.classes) + + probabilities = Array( + blocks=prob_blocks, + top_left_shape=(x._top_left_shape[0], n_classes), + reg_shape=(x._reg_shape[0], n_classes), + shape=(x.shape[0], n_classes), + sparse=False, + ) + return probabilities + + +class RandomForestRegressor(BaseRandomForest): + """A distributed random forest regressor. + + Parameters + ---------- + n_estimators : int, optional (default=10) + Number of trees to fit. + try_features : int, str or None, optional (default='sqrt') + The number of features to consider when looking for the best split: + + - If "sqrt", then `try_features=sqrt(n_features)`. + - If "third", then `try_features=n_features // 3`. + - If None, then `try_features=n_features`. + + Note: the search for a split does not stop until at least one + valid partition of the node samples is found, even if it requires + to effectively inspect more than ``try_features`` features. + max_depth : int or np.inf, optional (default=np.inf) + The maximum depth of the tree. If np.inf, then nodes are expanded + until all leaves are pure. + distr_depth : int or str, optional (default='auto') + Number of levels of the tree in which the nodes are split in a + distributed way. + sklearn_max: int or float, optional (default=1e8) + Maximum size (len(subsample)*n_features) of the arrays passed to + sklearn's DecisionTreeRegressor.fit(), which is called to fit subtrees + (subsamples) of our DecisionTreeRegressor. sklearn fit() is used + because it's faster, but requires loading the data to memory, which can + cause memory problems for large datasets. This parameter can be + adjusted to fit the hardware capabilities. + random_state : int, RandomState instance or None, optional (default=None) + If int, random_state is the seed used by the random number generator; + If RandomState instance, random_state is the random number generator; + If None, the random number generator is the RandomState instance used + by `np.random`. + + Attributes + ---------- + trees : list of DecisionTreeRegressor + List of the tree regressors of this forest, populated at fit(). + """ + + def __init__( + self, + n_estimators=10, + try_features="sqrt", + max_depth=np.inf, + distr_depth="auto", + sklearn_max=1e8, + random_state=None, + ): + hard_vote = None + super().__init__( + n_estimators, + try_features, + max_depth, + distr_depth, + sklearn_max, + hard_vote, + random_state, + ) + + +@task(returns=1) +def _resolve_try_features(try_features, n_features): + if try_features is None: + return n_features + elif try_features == "sqrt": + return int(math.sqrt(n_features)) + elif try_features == "third": + return max(1, n_features // 3) + else: + return int(try_features) + + +@task(returns=1) +def _join_predictions(*predictions): + aggregate = predictions[0] + for p in predictions[1:]: + aggregate += p + labels = aggregate / len(predictions) + return labels + + +@task(returns=1) +def _soft_vote(classes, *predictions): + aggregate = predictions[0] + for p in predictions[1:]: + aggregate += p + labels = classes[np.argmax(aggregate, axis=1)] + return labels + + +@task(returns=1) +def _hard_vote(classes, *predictions): + mode = np.empty((len(predictions[0]),), dtype=int) + for sample_i, votes in enumerate(zip(*predictions)): + mode[sample_i] = Counter(votes).most_common(1)[0][0] + labels = classes[mode] + return labels + + +@task(y_blocks={Type: COLLECTION_IN, Depth: 2}, returns=1) +def _soft_vote_score(y_blocks, classes, *predictions): + real_labels = Array._merge_blocks(y_blocks).flatten() + aggregate = predictions[0] + for p in predictions[1:]: + aggregate += p + predicted_labels = classes[np.argmax(aggregate, axis=1)] + correct = np.count_nonzero(predicted_labels == real_labels) + return correct, len(real_labels) + + +@task(y_blocks={Type: COLLECTION_IN, Depth: 2}, returns=1) +def _hard_vote_score(y_blocks, classes, *predictions): + real_labels = Array._merge_blocks(y_blocks).flatten() + mode = np.empty((len(predictions[0]),), dtype=int) + for sample_i, votes in enumerate(zip(*predictions)): + mode[sample_i] = Counter(votes).most_common(1)[0][0] + predicted_labels = classes[mode] + correct = np.count_nonzero(predicted_labels == real_labels) + return correct, len(real_labels) + + +@task(y_blocks={Type: COLLECTION_IN, Depth: 2}, returns=1) +def _regression_score(y_blocks, *predictions): + y_true = Array._merge_blocks(y_blocks).flatten() + y_pred = np.mean(predictions, axis=0) + n_samples = y_true.shape[0] + y_avg = np.mean(y_true) + u_partial = np.sum(np.square(y_true - y_pred), axis=0) + v_partial = np.sum(np.square(y_true - y_avg), axis=0) + return u_partial, v_partial, y_avg, n_samples + + +@task(returns=1) +def _merge_classification_scores(*partial_scores): + correct = sum(subset_score[0] for subset_score in partial_scores) + total = sum(subset_score[1] for subset_score in partial_scores) + return correct / total + + +@task(returns=1) +def _merge_regression_scores(*partial_scores): + u = v = avg = n = 0 + for u_p, v_p, avg_p, n_p in partial_scores: + u += u_p + + delta = avg_p - avg + avg += delta * n_p / (n + n_p) + v += v_p + delta ** 2 * n * n_p / (n + n_p) + n += n_p + + return 1 - u / v diff --git a/dislib/commons/rf/test_split.py b/dislib/commons/rf/test_split.py new file mode 100644 index 00000000..428fbc88 --- /dev/null +++ b/dislib/commons/rf/test_split.py @@ -0,0 +1,59 @@ +from sys import float_info + +import numpy as np + + +def criteria_proxy(l_weight, l_length, r_weight, r_length, not_repeated): + """ + Maximizing the MSE or Gini gain is equivalent to minimizing + this proxy function. + """ + return -(l_weight / l_length + r_weight / r_length) * not_repeated + + +def test_split(sample, y_s, feature, n_classes): + size = y_s.shape[0] + if size == 0: + return float_info.max, np.float64(np.inf) + + f = feature[sample] + sort_indices = np.argsort(f) + y_sorted = y_s[sort_indices] + f_sorted = f[sort_indices] + + # Threshold value must not be that value of a sample + not_repeated = np.empty(size, dtype=np.bool_) + not_repeated[0: size - 1] = f_sorted[1:] != f_sorted[:-1] + not_repeated[size - 1] = True + + if n_classes is not None: # Classification + l_freq = np.zeros((n_classes, size), dtype=np.int64) + l_freq[y_sorted, np.arange(size)] = 1 + + r_freq = np.zeros((n_classes, size), dtype=np.int64) + r_freq[:, 1:] = l_freq[:, :0:-1] + + l_weight = np.sum(np.square(np.cumsum(l_freq, axis=-1)), axis=0) + r_weight = np.sum(np.square(np.cumsum(r_freq, axis=-1)), axis=0)[::-1] + + else: # Regression + # Square of the sum of the y values of each branch + r_weight = np.zeros(size) + l_weight = np.square(np.cumsum(y_sorted, axis=-1)) + r_weight[:-1] = np.square(np.cumsum(y_sorted[::-1], axis=-1)[-2::-1]) + + # Number of samples of each branch + l_length = np.arange(1, size + 1, dtype=np.int32) + r_length = np.arange(size - 1, -1, -1, dtype=np.int32) + r_length[size - 1] = 1 # Avoid div by zero, the right score is 0 + + scores = criteria_proxy( + l_weight, l_length, r_weight, r_length, not_repeated + ) + + min_index = size - np.argmin(scores[::-1]) - 1 + if min_index + 1 == size: + b_value = np.float64(np.inf) + else: + b_value = (f_sorted[min_index] + f_sorted[min_index + 1]) / 2 + return scores[min_index], b_value diff --git a/dislib/regression/__init__.py b/dislib/regression/__init__.py index e3287a0b..a47cd17d 100644 --- a/dislib/regression/__init__.py +++ b/dislib/regression/__init__.py @@ -1,4 +1,5 @@ from dislib.regression.linear.base import LinearRegression from dislib.regression.lasso.base import Lasso +from dislib.commons.rf.forest import RandomForestRegressor -__all__ = ['LinearRegression', 'Lasso'] +__all__ = ["LinearRegression", "Lasso", "RandomForestRegressor"] diff --git a/dislib/utils/__init__.py b/dislib/utils/__init__.py index 34b84166..299601a7 100644 --- a/dislib/utils/__init__.py +++ b/dislib/utils/__init__.py @@ -1,3 +1,4 @@ from dislib.utils.base import shuffle +from dislib.utils.saving import save_model, load_model -__all__ = ['shuffle'] +__all__ = ["shuffle", "save_model", "load_model"] diff --git a/dislib/utils/saving.py b/dislib/utils/saving.py new file mode 100644 index 00000000..02ecfb8a --- /dev/null +++ b/dislib/utils/saving.py @@ -0,0 +1,360 @@ +import json +import os +import numpy as np + +from pycompss.api.api import compss_wait_on + +from sklearn.svm import SVC as SklearnSVC +from sklearn.tree import DecisionTreeClassifier as SklearnDTClassifier +from sklearn.tree import DecisionTreeRegressor as SklearnDTRegressor +from sklearn.tree._tree import Tree as SklearnTree +from scipy.sparse import csr_matrix + +import dislib as ds +import dislib.classification +import dislib.cluster +import dislib.recommendation +import dislib.regression +from dislib.data.array import Array +from dislib.commons.rf.decision_tree import ( + DecisionTreeClassifier, + DecisionTreeRegressor, + _Node, + _ClassificationNode, + _RegressionNode, + _InnerNodeInfo, + _LeafInfo, + _SkTreeWrapper, +) + +try: + import cbor2 +except ImportError: + cbor2 = None + +# Dislib models with saving tested (model: str -> module: str) +IMPLEMENTED_MODELS = { + "KMeans": "cluster", + "GaussianMixture": "cluster", + "CascadeSVM": "classification", + "RandomForestClassifier": "classification", + "RandomForestRegressor": "regression", + "ALS": "recommendation", + "LinearRegression": "regression", + "Lasso": "regression", +} + +# Classes used by models +DISLIB_CLASSES = { + "KMeans": dislib.cluster.KMeans, + "DecisionTreeClassifier": DecisionTreeClassifier, + "DecisionTreeRegressor": DecisionTreeRegressor, + "_Node": _Node, + "_ClassificationNode": _ClassificationNode, + "_RegressionNode": _RegressionNode, + "_InnerNodeInfo": _InnerNodeInfo, + "_LeafInfo": _LeafInfo, + "_SkTreeWrapper": _SkTreeWrapper, +} + +SKLEARN_CLASSES = { + "SVC": SklearnSVC, + "DecisionTreeClassifier": SklearnDTClassifier, + "DecisionTreeRegressor": SklearnDTRegressor, +} + + +def save_model(model, filepath, overwrite=True, save_format="json"): + """Saves a model to a file. + + The model is synchronized before saving and can be reinstantiated in the + exact same state, without any of the code used for model definition or + fitting. + + Parameters + ---------- + model : dislib model. + Dislib model to serialize and save. + filepath : str + Path where to save the model + overwrite : bool, optional (default=True) + Whether any existing model at the target + location should be overwritten. + save_format : str, optional (default='json) + Format used to save the models. + + Examples + -------- + >>> from dislib.cluster import KMeans + >>> from dislib.utils import save_model, load_model + >>> import numpy as np + >>> import dislib as ds + >>> x = np.array([[1, 2], [1, 4], [1, 0], [4, 2], [4, 4], [4, 0]]) + >>> x_train = ds.array(x, (2, 2)) + >>> model = KMeans(n_clusters=2, random_state=0) + >>> model.fit(x_train) + >>> save_model(model, '/tmp/model') + >>> loaded_model = load_model('/tmp/model') + >>> x_test = ds.array(np.array([[0, 0], [4, 4]]), (2, 2)) + >>> model_pred = model.predict(x_test) + >>> loaded_model_pred = loaded_model.predict(x_test) + >>> assert np.allclose(model_pred.collect(), loaded_model_pred.collect()) + """ + + # Check overwrite + if not overwrite and os.path.isfile(filepath): + return + + # Check for dislib model + model_name = model.__class__.__name__ + if model_name not in IMPLEMENTED_MODELS.keys(): + raise NotImplementedError( + "Saving has only been implemented for the following models:\n%s" + % IMPLEMENTED_MODELS.keys() + ) + + # Synchronize model + if model_name in ("RandomForestClassifier", "RandomForestRegressor"): + _sync_rf(model) + + _sync_obj(model.__dict__) + model_metadata = model.__dict__.copy() + model_metadata["model_name"] = model_name + + # Save model + if save_format == "json": + with open(filepath, "w") as f: + json.dump(model_metadata, f, default=_encode_helper) + elif save_format == "cbor": + if cbor2 is None: + raise ModuleNotFoundError("No module named 'cbor2'") + with open(filepath, "wb") as f: + cbor2.dump(model_metadata, f, default=_encode_helper_cbor) + else: + raise ValueError("Wrong save format.") + + +def load_model(filepath, load_format="json"): + """Loads a model from a file. + + The model is reinstantiated in the exact same state in which it was saved, + without any of the code used for model definition or fitting. + + Parameters + ---------- + filepath : str + Path of the saved the model + load_format : str, optional (default='json') + Format used to load the model. + + Examples + -------- + >>> from dislib.cluster import KMeans + >>> from dislib.utils import save_model, load_model + >>> import numpy as np + >>> import dislib as ds + >>> x = np.array([[1, 2], [1, 4], [1, 0], [4, 2], [4, 4], [4, 0]]) + >>> x_train = ds.array(x, (2, 2)) + >>> model = KMeans(n_clusters=2, random_state=0) + >>> model.fit(x_train) + >>> save_model(model, '/tmp/model') + >>> loaded_model = load_model('/tmp/model') + >>> x_test = ds.array(np.array([[0, 0], [4, 4]]), (2, 2)) + >>> model_pred = model.predict(x_test) + >>> loaded_model_pred = loaded_model.predict(x_test) + >>> assert np.allclose(model_pred.collect(), loaded_model_pred.collect()) + """ + # Load model + if load_format == "json": + with open(filepath, "r") as f: + model_metadata = json.load(f, object_hook=_decode_helper) + elif load_format == "cbor": + if cbor2 is None: + raise ModuleNotFoundError("No module named 'cbor2'") + with open(filepath, "rb") as f: + model_metadata = cbor2.load(f, object_hook=_decode_helper_cbor) + else: + raise ValueError("Wrong load format.") + + # Check for dislib model + model_name = model_metadata["model_name"] + if model_name not in IMPLEMENTED_MODELS.keys(): + raise NotImplementedError( + "Saving has only been implemented for the following models:\n%s" + % IMPLEMENTED_MODELS.keys() + ) + del model_metadata["model_name"] + + # Create model + model_module = getattr(ds, IMPLEMENTED_MODELS[model_name]) + model_class = getattr(model_module, model_name) + model = model_class() + for key, val in model_metadata.items(): + setattr(model, key, val) + + return model + + +def _encode_helper_cbor(encoder, obj): + """Special encoder wrapper for dislib using cbor2.""" + encoder.encode(_encode_helper(obj)) + + +def _decode_helper_cbor(decoder, obj): + """Special decoder wrapper for dislib using cbor2.""" + return _decode_helper(obj) + + +def _encode_helper(obj): + """Special encoder for dislib that serializes the different objectes + and stores their state for future loading. + """ + if isinstance(obj, np.generic): + return obj.item() + elif isinstance(obj, csr_matrix): + return { + "class_name": "csr_matrix", + **obj.__dict__, + } + elif isinstance(obj, np.ndarray): + return { + "class_name": "ndarray", + "dtype_list": len(obj.dtype.descr) > 1, + "dtype": str(obj.dtype), + "items": obj.tolist(), + } + elif isinstance(obj, Array): + return {"class_name": "dsarray", **obj.__dict__} + elif isinstance(obj, np.random.RandomState): + return {"class_name": "RandomState", "items": obj.get_state()} + elif callable(obj): + return { + "class_name": "callable", + "module": obj.__module__, + "name": obj.__name__, + } + elif isinstance(obj, SklearnTree): + return { + "class_name": obj.__class__.__name__, + "n_features": obj.n_features, + "n_classes": obj.n_classes, + "n_outputs": obj.n_outputs, + "items": obj.__getstate__(), + } + elif isinstance( + obj, tuple(DISLIB_CLASSES.values()) + tuple(SKLEARN_CLASSES.values()) + ): + return { + "class_name": obj.__class__.__name__, + "module_name": obj.__module__, + "items": obj.__dict__, + } + raise TypeError("Not JSON Serializable:", obj) + + +def _decode_helper(obj): + """Special decoder for dislib that instantiates the different objects + and updates their attributes to recover the saved state. + """ + if isinstance(obj, dict) and "class_name" in obj: + + class_name = obj["class_name"] + if class_name == "ndarray": + if obj["dtype_list"]: + items = list(map(tuple, obj["items"])) + return np.rec.fromrecords(items, dtype=eval(obj["dtype"])) + else: + return np.array(obj["items"], dtype=obj["dtype"]) + elif class_name == "csr_matrix": + return csr_matrix( + (obj["data"], obj["indices"], obj["indptr"]), + shape=obj["_shape"], + ) + elif class_name == "dsarray": + return Array( + blocks=obj["_blocks"], + top_left_shape=obj["_top_left_shape"], + reg_shape=obj["_reg_shape"], + shape=obj["_shape"], + sparse=obj["_sparse"], + delete=obj["_delete"], + ) + elif class_name == "RandomState": + random_state = np.random.RandomState() + random_state.set_state(_decode_helper(obj["items"])) + return random_state + elif class_name == "Tree": + dict_ = _decode_helper(obj["items"]) + model = SklearnTree( + obj["n_features"], obj["n_classes"], obj["n_outputs"] + ) + model.__setstate__(dict_) + return model + elif ( + class_name in DISLIB_CLASSES.keys() + and "dislib" in obj["module_name"] + ): + dict_ = _decode_helper(obj["items"]) + if class_name in ( + "DecisionTreeClassifier", "DecisionTreeRegressor" + ): + model = DISLIB_CLASSES[obj["class_name"]]( + try_features=dict_.pop("try_features"), + max_depth=dict_.pop("max_depth"), + distr_depth=dict_.pop("distr_depth"), + sklearn_max=dict_.pop("sklearn_max"), + bootstrap=dict_.pop("bootstrap"), + random_state=dict_.pop("random_state"), + ) + elif class_name == "_SkTreeWrapper": + sk_tree = _decode_helper(dict_.pop("sk_tree")) + model = DISLIB_CLASSES[obj["class_name"]](sk_tree) + else: + model = DISLIB_CLASSES[obj["class_name"]]() + model.__dict__.update(dict_) + return model + elif ( + class_name in SKLEARN_CLASSES.keys() + and "sklearn" in obj["module_name"] + ): + dict_ = _decode_helper(obj["items"]) + model = SKLEARN_CLASSES[obj["class_name"]]() + model.__dict__.update(dict_) + return model + elif class_name == "callable": + if obj["module"] == "numpy": + return getattr(np, obj["name"]) + return None + + return obj + + +def _sync_obj(obj): + """Recursively synchronizes the Future objects of a list or dictionary + by using `compss_wait_on(obj)`. + """ + if isinstance(obj, dict): + iterator = iter(obj.items()) + elif isinstance(obj, list): + iterator = iter(enumerate(obj)) + else: + raise TypeError("Expected dict or list and received %s." % type(obj)) + + for key, val in iterator: + if isinstance(val, (dict, list)): + _sync_obj(obj[key]) + else: + obj[key] = compss_wait_on(val) + if isinstance(getattr(obj[key], "__dict__", None), dict): + _sync_obj(obj[key].__dict__) + + +def _sync_rf(rf): + """Sync the `try_features` and `n_classes` attribute of the different trees + since they cannot be synced recursively. + """ + try_features = compss_wait_on(rf.trees[0].try_features) + n_classes = compss_wait_on(rf.trees[0].n_classes) + for tree in rf.trees: + tree.try_features = try_features + tree.n_classes = n_classes diff --git a/docs/source/user-guide.rst b/docs/source/user-guide.rst index 8e91e7e2..3fb02dc1 100644 --- a/docs/source/user-guide.rst +++ b/docs/source/user-guide.rst @@ -294,7 +294,7 @@ scalability of the estimator is limited by the reduction phase of the cascade. Random forest classifier ........................ -:class:`RandomForestClassifier ` +:class:`RandomForestClassifier ` is a classifier that uses an ensemble of decision trees and aggregates their predictions. The process of building each decision tree includes some randomization in order to make them different. The accuracy of the joint @@ -565,6 +565,76 @@ shape ``(n_features, n_features)`` and process it as a single block. this with a distributed implementation of a method for solving a system of linear equations.) + +Random forest regressor +........................ + +:class:`RandomForestRegressor ` +is a regressor that uses an ensemble of decision trees and aggregates their +predictions. The process of building each decision tree includes some +randomization in order to make them different. The accuracy of the joint +prediction can be greater than that of individual decision trees. One advantage +of Random Forests is that you cannot overfit by increasing the number of +trees. Several variations of random forests have been proposed and implemented. +A fundamental paper that has been cited extensively is [Bre01]_, which +describes a method for classification problems that can be adapted to regression +problems: + + For building each tree, the original sample set is replaced by a set of the + same size, obtained by drawing with replacement (this method is called + bootstrap aggregating or bagging). At each tree node, a certain number of + random features is selected (random feature selection). The sample set + is splitted in two according to the values of these features, and a + metric called 'Mean Squared Error' is computed for every split. The MSE + measures the squared residuals with respect to the average value of the + target variables, which could be interpreted as a measure of the sample + variance. The split with the lowest MSE value is selected, and + the subsamples are propagated to the children nodes. The trees grown are + not pruned. + +Ensemble estimators can be implemented in an embarrassingly parallel pattern. +You can do this with scikit-learn's RandomForestClassifier using a +``joblib.parallel_backend`` and setting the ``n_jobs`` parameter. However, you +need to be able to load your data into memory for each processor or to use +memory mapped arrays, which can be tricky specially with a distributed backend. + +In our implementation, the samples as a whole are written into a binary file +and accessed using memory maps (the COMPSs runtime manages the transfers to +other nodes when needed). We used this approach because the performance penalty +of using distributed data was too large. Storing the samples file and saving +the decision trees introduces a big load to the disk storage of all nodes. If +your execution fails because you reach your disk storage limits, you can try +reducing the number of trees or reducing their size by setting the +``max_depth`` parameter. If this is not enough, you may consider reducing +the samples. + +In order to get further parallelism, each decision tree is not necessarily +built in a single task: there are tasks for building just a subtree, just a +node or even just part of a node. You can use the ``distr_depth`` parameter to +control the number of tasks used for each tree. However, be aware that the +number of tasks grows exponentially when you increase ``distr_depth``, and that +the task loads become very unbalanced. The fitted decision trees are not +synchronized, so the prediction is equally distributed. + +The results of the RandomForestRegressor can vary in every execution, due to +its random nature. To get reproducible results, a RandomState (pseudorandom +number generator) or an int can be provided to the ``random_state`` +parameter of the constructor. This works by passing a seed (generated by the +master's RandomState) to each task that uses randomness, and creating a new +RandomState inside the task. + +.. topic:: References: + + .. [Chan79] `Updating Formulae and a Pairwise Algorithm for Computing Sample Variances. + `_ + T. F. Chan, G. H. Golub, R. J. LeVeque, 1979 + Technical Report STAN-CS-79-773, Department of Computer Science, Stanford University. + .. [Tor99] `Inductive Learning of Tree-based Regression Models + `_ + L. Torgo, 1999 + Chapter 3, PhD Thesis, Faculdade de Ciè‚ncias da Universidade do Porto + + Decomposition ------------- diff --git a/requirements.txt b/requirements.txt index 4100177f..3fc50ee3 100644 --- a/requirements.txt +++ b/requirements.txt @@ -3,3 +3,4 @@ scipy>=1.3.0 numpy>=1.18.1, <=1.19.5 numpydoc>=0.8.0 cvxpy>=1.1.5 +cbor2>=5.4.0 \ No newline at end of file diff --git a/tests/test_decision_tree.py b/tests/test_decision_tree.py new file mode 100644 index 00000000..e935dc56 --- /dev/null +++ b/tests/test_decision_tree.py @@ -0,0 +1,171 @@ +import unittest + +import numpy as np +from pycompss.api.api import compss_wait_on + +import dislib as ds +import dislib.commons.rf.decision_tree as dt +import dislib.commons.rf.data as data + + +class DecisionTreeTest(unittest.TestCase): + def test_decision_tree(self): + x1 = np.array( + [ + [0.3, -0.3], + [0.4, -0.5], + [0.5, -0.4], + [0.3, 0.3], + [0.4, 0.5], + [0.5, 0.4], + [-0.3, -0.3], + [-0.4, -0.5], + [-0.5, -0.4], + ] + ) + x2 = np.array([[0.4, -0.3], [0.4, 0.3], [-0.4, -0.3]]) + y1 = np.array([0, 0, 0, 1, 1, 1, 2, 2, 2]) + y2 = np.array([0, 1, 2]) + + x1_ds = ds.array(x1, (3, 2)) + x2_ds = ds.array(x2, (3, 2)) + y1_ds = ds.array(y1[:, np.newaxis], (3, 1)) + + data1 = data.transform_to_rf_dataset( + x1_ds, y1_ds, "classification", features_file=True + ) + + # Model + try_features = 2 + max_depth = np.inf + distr_depth = 2 + sklearn_max = 1e8 + bootstrap = True + seed = 0 + random_state = np.random.RandomState(seed) + n_samples, n_features = x1.shape + n_classes = np.bincount(y1).shape[0] + features_mmap = x1.T + + # Test bootstrap + sample1, y_s1 = compss_wait_on( + dt._sample_selection(n_samples, y1, True, seed) + ) + sample2, y_s2 = compss_wait_on( + dt._sample_selection(n_samples, y1, False, seed) + ) + self.assertTrue( + np.array_equal(sample1, np.array([0, 2, 3, 3, 3, 4, 5, 5, 7])) + ) + self.assertTrue( + np.array_equal(sample2, np.array([0, 1, 2, 3, 4, 5, 6, 7, 8])) + ) + self.assertTrue( + np.array_equal(y_s1, np.array([0, 0, 1, 1, 1, 1, 1, 1, 2])) + ) + self.assertTrue( + np.array_equal(y_s2, np.array([0, 0, 0, 1, 1, 1, 2, 2, 2])) + ) + + # Assert split wrapper + sample, y_s = sample2, y_s2 + with self.assertRaises(ValueError): + dt._split_node_wrapper( + sample, + n_features, + y_s, + n_classes, + try_features, + random_state, + samples_file=None, + features_file=None, + ) + + split = dt._split_node_wrapper( + sample, + n_features, + y_s, + n_classes, + try_features, + random_state, + samples_file=data1.samples_path, + features_file=data1.features_path, + ) + split = compss_wait_on(split) + node_info, left_group, y_l, right_group, y_r = split + self.assertTrue(node_info.index in (0, 1)) + if node_info.index == 0: + self.assertTrue(np.array_equal(left_group, np.array([6, 7, 8]))) + self.assertTrue(np.array_equal(y_l, np.array([2, 2, 2]))) + self.assertTrue( + np.array_equal(right_group, np.array([0, 1, 2, 3, 4, 5])) + ) + self.assertTrue(np.array_equal(y_r, np.array([0, 0, 0, 1, 1, 1]))) + self.assertAlmostEqual(node_info.value, 0.0) + split_l = dt._compute_split( + left_group, + n_features, + y_l, + n_classes, + try_features, + features_mmap, + random_state, + ) + node_info, left_group, y_l, right_group, y_r = split_l + self.assertTrue(np.array_equal(left_group, np.array([6, 7, 8]))) + self.assertTrue(np.array_equal(y_l, np.array([2, 2, 2]))) + self.assertTrue(np.array_equal(right_group, np.array([]))) + self.assertTrue(np.array_equal(y_r, np.array([]))) + self.assertTrue( + np.array_equal(node_info.frequencies, np.array([0, 0, 3])) + ) + self.assertEqual(node_info.size, 3) + self.assertEqual(node_info.target, 2) + elif node_info.index == 1: + self.assertTrue( + np.array_equal(left_group, np.array([0, 1, 2, 6, 7, 8])) + ) + self.assertTrue(np.array_equal(y_l, np.array([0, 0, 0, 2, 2, 2]))) + self.assertTrue(np.array_equal(right_group, np.array([3, 4, 5]))) + self.assertTrue(np.array_equal(y_r, np.array([1, 1, 1]))) + self.assertAlmostEqual(node_info.value, 0.0) + split_r = dt._compute_split( + right_group, + n_features, + y_r, + n_classes, + try_features, + features_mmap, + random_state, + ) + node_info, left_group, y_l, right_group, y_r = split_r + self.assertTrue(np.array_equal(left_group, np.array([3, 4, 5]))) + self.assertTrue(np.array_equal(y_l, np.array([1, 1, 1]))) + self.assertTrue(np.array_equal(right_group, np.array([]))) + self.assertTrue(np.array_equal(y_r, np.array([]))) + self.assertTrue( + np.array_equal(node_info.frequencies, np.array([0, 3, 0])) + ) + self.assertEqual(node_info.size, 3) + self.assertEqual(node_info.target, 1) + + # Test tree + tree = dt.DecisionTreeClassifier( + try_features, + max_depth, + distr_depth, + sklearn_max, + bootstrap, + random_state, + ) + tree.fit(data1) + y_pred = compss_wait_on(tree.predict(x2_ds)) + self.assertTrue(np.array_equal(y_pred, y2)) + + +def main(): + unittest.main() + + +if __name__ == "__main__": + main() diff --git a/tests/test_rf.py b/tests/test_rf_classifier.py similarity index 100% rename from tests/test_rf.py rename to tests/test_rf_classifier.py diff --git a/tests/test_rf_dataset.py b/tests/test_rf_dataset.py new file mode 100644 index 00000000..de55fc76 --- /dev/null +++ b/tests/test_rf_dataset.py @@ -0,0 +1,224 @@ +import unittest + +import os +import shutil +from sklearn.datasets import make_classification +import dislib as ds +from dislib.commons.rf import data +from dislib.commons.rf import test_split +from dislib.data.array import Array +import numpy as np +from sys import float_info +from pycompss.api.api import compss_wait_on + +DIRPATH = "tests/files/saving" + + +class RFDatasetTest(unittest.TestCase): + def setUp(self) -> None: + os.makedirs(DIRPATH, exist_ok=True) + return super().setUp() + + def tearDown(self) -> None: + shutil.rmtree(DIRPATH) + return super().tearDown() + + def test_rf_dataset(self): + # Save samples and features + x, y = make_classification( + n_samples=900, + n_features=10, + n_classes=3, + n_informative=4, + random_state=0, + ) + x_ds_1 = ds.array(x, (300, 10)) + x_ds_2 = ds.array(x[:600], (300, 10)) + y_ds_1 = ds.array(y[:, np.newaxis], (300, 1)) + y_ds_2 = ds.array(y[:600][:, np.newaxis], (300, 1)) + samples_path_1 = os.path.join(DIRPATH, "feats_1") + samples_path_2 = os.path.join(DIRPATH, "feats_2") + targets_path_1 = os.path.join(DIRPATH, "targets_1") + targets_path_2 = os.path.join(DIRPATH, "targets_2") + features_path_f = os.path.join(DIRPATH, "targets_f") + save_samples(x_ds_1, samples_path_1, False) + save_samples(x_ds_2, samples_path_2, False) + save_targets(y_ds_1, targets_path_1) + save_targets(y_ds_2, targets_path_2) + save_features(x_ds_2, features_path_f, True) + + # Regression and classification datatser + rf_regr = data.RfRegressorDataset(samples_path_1, targets_path_1) + rf_class = data.RfClassifierDataset(samples_path_1, targets_path_1) + + # Test get number of samples and features + self.assertEqual(rf_regr.get_n_samples(), 900) + self.assertEqual(rf_class.get_n_samples(), 900) + self.assertEqual(rf_regr.get_n_features(), 10) + self.assertEqual(rf_class.get_n_features(), 10) + + # Test get y targets + y_regr = compss_wait_on(rf_regr.get_y_targets()) + y_class = compss_wait_on(rf_class.get_y_targets()) + self.assertTrue(np.all(y_regr == y_ds_1.collect())) + self.assertTrue(np.all(y_class == y_ds_1.collect())) + + # Test get number of classes and classes + n_class = compss_wait_on(rf_regr.get_n_classes()) + classes = compss_wait_on(rf_regr.get_classes()) + self.assertTrue(n_class is None) + self.assertTrue(classes is None) + + rf_class.n_classes = None + n_class = compss_wait_on(rf_class.get_n_classes()) + rf_class.y_categories = None + classes = compss_wait_on(rf_class.get_classes()) + self.assertEqual(n_class, 3) + self.assertTrue(np.all(classes == [0, 1, 2])) + + # Sample and feature paths must be str + rf_dataset = data.RfBaseDataset(None, None) + with self.assertRaises(TypeError): + rf_dataset.get_n_samples() + with self.assertRaises(TypeError): + rf_dataset.get_n_features() + + # Task must be classification or regression + with self.assertRaises(ValueError): + rf_dataset = data.transform_to_rf_dataset(x_ds_1, y_ds_1, "aaa") + + # Validate dimension + rf_dataset = data.RfBaseDataset( + samples_path_1, targets_path_1, features_path_f + ) + rf_dataset.samples_path = samples_path_2 + with self.assertRaises(ValueError): + rf_dataset.validate_features_file() + + # Validate Fortran order + rf_dataset = data.RfBaseDataset( + samples_path_1, targets_path_1, features_path_f + ) + with self.assertRaises(ValueError): + rf_dataset.validate_features_file() + + # Dataset creation + rf_regr = data.transform_to_rf_dataset( + x_ds_1, y_ds_1, "regression", features_file=True + ) + rf_class = data.transform_to_rf_dataset( + x_ds_1, y_ds_1, "classification", features_file=True + ) + self.assertEquals(compss_wait_on(rf_regr.get_n_samples()), 900) + self.assertEquals(compss_wait_on(rf_regr.get_n_features()), 10) + self.assertEquals(compss_wait_on(rf_class.get_n_samples()), 900) + self.assertEquals(compss_wait_on(rf_class.get_n_features()), 10) + + # Npy files + file = data._NpyFile(features_path_f) + file.shape = None + self.assertEqual(file.get_shape(), (10, 600)) + file.fortran_order = None + self.assertTrue(file.get_fortran_order()) + file.dtype = None + self.assertEqual(file.get_dtype().name, "float32") + + file = data._NpyFile(samples_path_2) + file.shape = None + self.assertEqual(file.get_shape(), (600, 10)) + file.fortran_order = None + self.assertFalse(file.get_fortran_order()) + file.dtype = None + self.assertEqual(file.get_dtype().name, "float32") + + # Test returns for empty size + score, value = test_split.test_split(None, np.array([]), None, None) + self.assertEqual(score, float_info.max) + self.assertEqual(value, np.float64(np.inf)) + + +def _fill_samples_file(samples_path, row_blocks, start_idx, fortran_order): + rows_samples = Array._merge_blocks(row_blocks) + rows_samples = rows_samples.astype(dtype="float32", casting="same_kind") + samples = np.lib.format.open_memmap( + samples_path, mode="r+", fortran_order=fortran_order + ) + samples[start_idx: start_idx + rows_samples.shape[0]] = rows_samples + + +def _fill_features_file(samples_path, row_blocks, start_idx, fortran_order): + rows_samples = Array._merge_blocks(row_blocks) + rows_samples = rows_samples.astype(dtype="float32", casting="same_kind") + samples = np.lib.format.open_memmap( + samples_path, mode="r+", fortran_order=fortran_order + ) + samples[:, start_idx: start_idx + rows_samples.shape[0]] = rows_samples.T + + +def _fill_targets_file(targets_path, row_blocks): + rows_targets = Array._merge_blocks(row_blocks) + with open(targets_path, "at") as f: + np.savetxt(f, rows_targets, fmt="%s", encoding="utf-8") + + +def save_samples(x, samples_path, fortran_order): + n_samples = x.shape[0] + n_features = x.shape[1] + + open(samples_path, "w").close() + np.lib.format.open_memmap( + samples_path, + mode="w+", + dtype="float32", + fortran_order=fortran_order, + shape=(int(n_samples), int(n_features)), + ) + start_idx = 0 + row_blocks_iterator = x._iterator(axis=0) + top_row = next(row_blocks_iterator) + _fill_samples_file(samples_path, top_row._blocks, start_idx, fortran_order) + start_idx += x._top_left_shape[0] + for x_row in row_blocks_iterator: + _fill_samples_file( + samples_path, x_row._blocks, start_idx, fortran_order + ) + start_idx += x._reg_shape[0] + + +def save_targets(y, targets_path): + open(targets_path, "w").close() + for y_row in y._iterator(axis=0): + _fill_targets_file(targets_path, y_row._blocks) + + +def save_features(x, features_path, fortran_order): + n_samples = x.shape[0] + n_features = x.shape[1] + + np.lib.format.open_memmap( + features_path, + mode="w+", + dtype="float32", + fortran_order=fortran_order, + shape=(int(n_features), int(n_samples)), + ) + start_idx = 0 + row_blocks_iterator = x._iterator(axis=0) + top_row = next(row_blocks_iterator) + _fill_features_file( + features_path, top_row._blocks, start_idx, fortran_order + ) + start_idx += x._top_left_shape[0] + for x_row in row_blocks_iterator: + _fill_features_file( + features_path, x_row._blocks, start_idx, fortran_order + ) + start_idx += x._reg_shape[0] + + +def main(): + unittest.main() + + +if __name__ == "__main__": + main() diff --git a/tests/test_rf_regressor.py b/tests/test_rf_regressor.py new file mode 100644 index 00000000..36da50f7 --- /dev/null +++ b/tests/test_rf_regressor.py @@ -0,0 +1,105 @@ +import unittest + +import numpy as np +from pycompss.api.api import compss_wait_on +from sklearn.datasets import make_regression + +import dislib as ds +from dislib.regression import RandomForestRegressor + + +def _determination_coefficient(y_true, y_pred): + u = np.sum(np.square(y_true - y_pred)) + v = np.sum(np.square(y_true - np.mean(y_true))) + return 1 - u / v + + +class RandomForestRegressorTest(unittest.TestCase): + def test_make_regression(self): + """Tests RandomForestRegressor fit and score with default params.""" + x, y = make_regression( + n_samples=3000, + n_features=10, + n_informative=4, + shuffle=True, + random_state=0, + ) + x_train = ds.array(x[: len(x) // 2], (300, 10)) + y_train = ds.array(y[: len(y) // 2][:, np.newaxis], (300, 1)) + x_test = ds.array(x[len(x) // 2:], (300, 10)) + y_test = ds.array(y[len(y) // 2:][:, np.newaxis], (300, 1)) + + rf = RandomForestRegressor(random_state=0) + + rf.fit(x_train, y_train) + accuracy1 = compss_wait_on(rf.score(x_test, y_test)) + + y_pred = rf.predict(x_test).collect() + y_true = y[len(y) // 2:] + accuracy2 = _determination_coefficient(y_true, y_pred) + + self.assertGreater(accuracy1, 0.85) + self.assertGreater(accuracy2, 0.85) + self.assertAlmostEqual(accuracy1, accuracy2) + + def test_make_regression_predict_and_distr_depth(self): + """Tests RandomForestRegressor fit and predict with a distr_depth.""" + x, y = make_regression( + n_samples=3000, + n_features=10, + n_informative=4, + shuffle=True, + random_state=0, + ) + x_train = ds.array(x[: len(x) // 2], (300, 10)) + y_train = ds.array(y[: len(y) // 2][:, np.newaxis], (300, 1)) + x_test = ds.array(x[len(x) // 2:], (300, 10)) + y_test = ds.array(y[len(y) // 2:][:, np.newaxis], (300, 1)) + + rf = RandomForestRegressor(distr_depth=2, random_state=0) + + rf.fit(x_train, y_train) + accuracy1 = compss_wait_on(rf.score(x_test, y_test)) + + y_pred = rf.predict(x_test).collect() + y_true = y[len(y) // 2:] + accuracy2 = _determination_coefficient(y_true, y_pred) + + self.assertGreater(accuracy1, 0.85) + self.assertGreater(accuracy2, 0.85) + self.assertAlmostEqual(accuracy1, accuracy2) + + def test_make_regression_sklearn_max_predict(self): + """Tests RandomForestRegressor predict with sklearn_max.""" + x, y = make_regression( + n_samples=3000, + n_features=10, + n_informative=4, + shuffle=True, + random_state=0, + ) + x_train = ds.array(x[: len(x) // 2], (300, 10)) + y_train = ds.array(y[: len(y) // 2][:, np.newaxis], (300, 1)) + x_test = ds.array(x[len(x) // 2:], (300, 10)) + y_test = ds.array(y[len(y) // 2:][:, np.newaxis], (300, 1)) + + rf = RandomForestRegressor(random_state=0, sklearn_max=10) + + rf.fit(x_train, y_train) + accuracy1 = compss_wait_on(rf.score(x_test, y_test)) + + y_pred = rf.predict(x_test).collect() + y_true = y[len(y) // 2:] + accuracy2 = _determination_coefficient(y_true, y_pred) + + self.assertGreater(accuracy1, 0.85) + self.assertGreater(accuracy2, 0.85) + self.assertAlmostEqual(accuracy1, accuracy2) + + +def main(): + unittest.main() + + +if __name__ == "__main__": + main() diff --git a/tests/test_saving.py b/tests/test_saving.py new file mode 100644 index 00000000..523ed5cc --- /dev/null +++ b/tests/test_saving.py @@ -0,0 +1,83 @@ +import unittest +import json +import os +import shutil +from dislib.cluster import KMeans +from dislib.cluster import DBSCAN +import dislib.utils.saving as saving + +DIRPATH = "tests/files/saving" + + +class SavingTest(unittest.TestCase): + def setUp(self) -> None: + os.makedirs(DIRPATH, exist_ok=True) + return super().setUp() + + def tearDown(self) -> None: + shutil.rmtree(DIRPATH) + return super().tearDown() + + def test_errors(self): + filepath = os.path.join(DIRPATH, "model.json") + + # Models + km = KMeans(n_clusters=2) + km2 = KMeans(n_clusters=10) + dbscan = DBSCAN() + + # Import error + cbor2_module = saving.cbor2 + saving.cbor2 = None + with self.assertRaises(ModuleNotFoundError): + saving.save_model(km, filepath, save_format="cbor") + with self.assertRaises(ModuleNotFoundError): + saving.load_model(filepath, load_format="cbor") + saving.cbor2 = cbor2_module + + # Saving model not implemented + with self.assertRaises(NotImplementedError): + saving.save_model(dbscan, filepath) + + # Wrong save format + with self.assertRaises(ValueError): + saving.save_model(km, filepath, save_format="xxxx") + + # Overwrite + saving.save_model(km, filepath, save_format="json") + with open(filepath, "r") as f: + json_str = f.read() + saving.save_model( + km2, filepath, overwrite=False, save_format="json" + ) + with open(filepath, "r") as f: + json_str2 = f.read() + self.assertEqual(json_str, json_str2) + + # Wrong load format + with self.assertRaises(ValueError): + saving.load_model(filepath, load_format="xxxx") + + # Load model not implemented + model_data = {"model_name": "dbscan"} + with open(filepath, "w") as f: + json.dump(model_data, f) + with self.assertRaises(NotImplementedError): + saving.load_model(filepath, load_format="json") + + # Not JSON serializable + setattr(km, "n_clusters", dbscan) + with self.assertRaises(TypeError): + saving.save_model(km, filepath, save_format="json") + + # Not dict or list + with self.assertRaises(TypeError): + saving._sync_obj(km) + + +def main(): + unittest.main() + + +if __name__ == "__main__": + main() diff --git a/tests/test_saving_cbor.py b/tests/test_saving_cbor.py new file mode 100644 index 00000000..5a0ef438 --- /dev/null +++ b/tests/test_saving_cbor.py @@ -0,0 +1,349 @@ +import unittest +import os +import shutil +import numpy as np +from scipy.sparse import csr_matrix +from sklearn.metrics import r2_score +from sklearn.datasets import make_classification, make_regression + +import dislib as ds +from dislib.cluster import KMeans +from dislib.cluster import GaussianMixture +from dislib.classification import CascadeSVM +from dislib.classification import RandomForestClassifier +from dislib.regression import RandomForestRegressor +from dislib.regression import Lasso +from dislib.regression import LinearRegression +from dislib.recommendation import ALS +from dislib.utils import save_model, load_model + +from pycompss.api.api import compss_wait_on + +DIRPATH = "tests/files/saving" + + +class CBORSavingTest(unittest.TestCase): + def setUp(self) -> None: + os.makedirs(DIRPATH, exist_ok=True) + return super().setUp() + + def tearDown(self) -> None: + shutil.rmtree(DIRPATH) + return super().tearDown() + + def test_saving_kmeans(self): + file_ = "tests/files/libsvm/2" + filepath = os.path.join(DIRPATH, "kmeans.cbor") + + x_sp, _ = ds.load_svmlight_file(file_, (10, 300), 780, True) + x_ds, _ = ds.load_svmlight_file(file_, (10, 300), 780, False) + + kmeans = KMeans(random_state=170) + kmeans.fit(x_sp) + + save_model(kmeans, filepath, save_format="cbor") + kmeans2 = load_model(filepath, load_format="cbor") + + y_sparse = kmeans.predict(x_sp).collect() + y_sparse2 = kmeans2.predict(x_sp).collect() + + sparse_c = kmeans.centers.toarray() + sparse_c2 = kmeans2.centers.toarray() + + kmeans = KMeans(random_state=170) + + y_dense = kmeans.fit_predict(x_ds).collect() + dense_c = kmeans.centers + + self.assertTrue(np.allclose(sparse_c, dense_c)) + self.assertTrue(np.allclose(sparse_c2, dense_c)) + self.assertTrue(np.array_equal(y_sparse, y_dense)) + self.assertTrue(np.array_equal(y_sparse2, y_dense)) + + def test_saving_gm(self): + file_ = "tests/files/libsvm/2" + filepath = os.path.join(DIRPATH, "gm.cbor") + + x_sparse, _ = ds.load_svmlight_file(file_, (10, 780), 780, True) + x_dense, _ = ds.load_svmlight_file(file_, (10, 780), 780, False) + + covariance_types = "full", "tied", "diag", "spherical" + + for cov_type in covariance_types: + gm = GaussianMixture( + n_components=4, random_state=0, covariance_type=cov_type + ) + gm.fit(x_sparse) + save_model(gm, filepath, save_format="cbor") + gm2 = load_model(filepath, load_format="cbor") + labels_sparse = gm.predict(x_sparse).collect() + labels_sparse2 = gm2.predict(x_sparse).collect() + + gm = GaussianMixture( + n_components=4, random_state=0, covariance_type=cov_type + ) + gm.fit(x_dense) + save_model(gm, filepath, save_format="cbor") + gm2 = load_model(filepath, load_format="cbor") + labels_dense = gm.predict(x_dense).collect() + labels_dense2 = gm2.predict(x_dense).collect() + + self.assertTrue(np.array_equal(labels_sparse, labels_sparse2)) + self.assertTrue(np.array_equal(labels_sparse, labels_dense)) + self.assertTrue(np.array_equal(labels_sparse2, labels_dense2)) + + def test_saving_csvm(self): + seed = 666 + train = "tests/files/libsvm/3" + filepath = os.path.join(DIRPATH, "csvm.cbor") + + x_sp, y_sp = ds.load_svmlight_file(train, (10, 300), 780, True) + x_d, y_d = ds.load_svmlight_file(train, (10, 300), 780, False) + + csvm_sp = CascadeSVM(random_state=seed) + csvm_sp.fit(x_sp, y_sp) + save_model(csvm_sp, filepath, save_format="cbor") + csvm_sp2 = load_model(filepath, load_format="cbor") + + csvm_d = CascadeSVM(random_state=seed) + csvm_d.fit(x_d, y_d) + save_model(csvm_d, filepath, save_format="cbor") + csvm_d2 = load_model(filepath, load_format="cbor") + + sv_d = csvm_d._clf.support_vectors_ + sv_sp = csvm_sp._clf.support_vectors_.toarray() + sv_d2 = csvm_d2._clf.support_vectors_ + sv_sp2 = csvm_sp2._clf.support_vectors_.toarray() + + self.assertTrue(np.array_equal(sv_d, sv_sp)) + self.assertTrue(np.array_equal(sv_d2, sv_sp2)) + self.assertTrue(np.array_equal(sv_d, sv_d2)) + + coef_d = csvm_d._clf.dual_coef_ + coef_sp = csvm_sp._clf.dual_coef_.toarray() + coef_d2 = csvm_d2._clf.dual_coef_ + coef_sp2 = csvm_sp2._clf.dual_coef_.toarray() + + self.assertTrue(np.array_equal(coef_d, coef_sp)) + self.assertTrue(np.array_equal(coef_d2, coef_sp2)) + self.assertTrue(np.array_equal(coef_d, coef_d2)) + + def test_saving_rf_class(self): + filepath = os.path.join(DIRPATH, "rf_class.cbor") + x, y = make_classification( + n_samples=3000, + n_features=10, + n_classes=3, + n_informative=4, + n_redundant=2, + n_repeated=1, + n_clusters_per_class=2, + shuffle=True, + random_state=0, + ) + x_train = ds.array(x[: len(x) // 2], (300, 10)) + y_train = ds.array(y[: len(y) // 2][:, np.newaxis], (300, 1)) + x_test = ds.array(x[len(x) // 2:], (300, 10)) + y_test = y[len(y) // 2:] + + rf = RandomForestClassifier(random_state=0, sklearn_max=10) + rf.fit(x_train, y_train) + save_model(rf, filepath, save_format="cbor") + rf2 = load_model(filepath, load_format="cbor") + + probabilities = rf.predict_proba(x_test).collect() + probabilities2 = rf2.predict_proba(x_test).collect() + rf.classes = compss_wait_on(rf.classes) + rf2.classes = compss_wait_on(rf2.classes) + y_pred = rf.classes[np.argmax(probabilities, axis=1)] + y_pred2 = rf2.classes[np.argmax(probabilities2, axis=1)] + accuracy = np.count_nonzero(y_pred == y_test) / len(y_test) + accuracy2 = np.count_nonzero(y_pred2 == y_test) / len(y_test) + self.assertGreater(accuracy, 0.7) + self.assertGreater(accuracy2, 0.7) + + def test_saving_rf_regr(self): + filepath = os.path.join(DIRPATH, "rf_regr.cbor") + + def determination_coefficient(y_true, y_pred): + u = np.sum(np.square(y_true - y_pred)) + v = np.sum(np.square(y_true - np.mean(y_true))) + return 1 - u / v + + x, y = make_regression( + n_samples=3000, + n_features=10, + n_informative=4, + shuffle=True, + random_state=0, + ) + x_train = ds.array(x[: len(x) // 2], (300, 10)) + y_train = ds.array(y[: len(y) // 2][:, np.newaxis], (300, 1)) + x_test = ds.array(x[len(x) // 2:], (300, 10)) + y_test = ds.array(y[len(y) // 2:][:, np.newaxis], (300, 1)) + + rf = RandomForestRegressor(random_state=0, sklearn_max=10) + + rf.fit(x_train, y_train) + save_model(rf, filepath, save_format="cbor") + rf2 = load_model(filepath, load_format="cbor") + + accuracy1 = compss_wait_on(rf.score(x_test, y_test)) + accuracy2 = compss_wait_on(rf2.score(x_test, y_test)) + y_pred = rf.predict(x_test).collect() + y_true = y[len(y) // 2:] + y_pred2 = rf2.predict(x_test).collect() + y_true2 = y[len(y) // 2:] + coef1 = determination_coefficient(y_true, y_pred) + coef2 = determination_coefficient(y_true2, y_pred2) + + self.assertGreater(accuracy1, 0.85) + self.assertGreater(accuracy2, 0.85) + self.assertGreater(coef1, 0.85) + self.assertGreater(coef2, 0.85) + self.assertAlmostEqual(accuracy1, accuracy2) + self.assertAlmostEqual(coef1, coef2) + + def test_saving_lasso(self): + filepath = os.path.join(DIRPATH, "lasso.cbor") + np.random.seed(42) + + n_samples, n_features = 50, 100 + X = np.random.randn(n_samples, n_features) + + # Decreasing coef w. alternated signs for visualization + idx = np.arange(n_features) + coef = (-1) ** idx * np.exp(-idx / 10) + coef[10:] = 0 # sparsify coef + y = np.dot(X, coef) + + # Add noise + y += 0.01 * np.random.normal(size=n_samples) + + n_samples = X.shape[0] + X_train, y_train = X[: n_samples // 2], y[: n_samples // 2] + X_test, y_test = X[n_samples // 2:], y[n_samples // 2:] + + lasso = Lasso(lmbd=0.1, max_iter=50) + + lasso.fit(ds.array(X_train, (5, 100)), ds.array(y_train, (5, 1))) + save_model(lasso, filepath, save_format="cbor") + lasso2 = load_model(filepath, load_format="cbor") + + y_pred_lasso = lasso.predict(ds.array(X_test, (25, 100))) + r2_score_lasso = r2_score(y_test, y_pred_lasso.collect()) + y_pred_lasso2 = lasso2.predict(ds.array(X_test, (25, 100))) + r2_score_lasso2 = r2_score(y_test, y_pred_lasso2.collect()) + + self.assertAlmostEqual(r2_score_lasso, 0.9481746925431124) + self.assertAlmostEqual(r2_score_lasso2, 0.9481746925431124) + + def test_saving_linear(self): + filepath = os.path.join(DIRPATH, "linear_regression.cbor") + + x_data = np.array([[1, 2], [2, 0], [3, 1], [4, 4], [5, 3]]) + y_data = np.array([2, 1, 1, 2, 4.5]) + + bn, bm = 2, 2 + + x = ds.array(x=x_data, block_size=(bn, bm)) + y = ds.array(x=y_data, block_size=(bn, 1)) + + reg = LinearRegression() + reg.fit(x, y) + save_model(reg, filepath, save_format="cbor") + reg2 = load_model(filepath, load_format="cbor") + + self.assertTrue(np.allclose(reg.coef_.collect(), [0.421875, 0.296875])) + self.assertTrue( + np.allclose(reg2.coef_.collect(), [0.421875, 0.296875]) + ) + self.assertTrue(np.allclose(reg.intercept_.collect(), 0.240625)) + self.assertTrue(np.allclose(reg2.intercept_.collect(), 0.240625)) + + # Predict one sample + x_test = np.array([3, 2]) + test_data = ds.array(x=x_test, block_size=(1, bm)) + pred = reg.predict(test_data).collect() + pred2 = reg2.predict(test_data).collect() + self.assertTrue(np.allclose(pred, 2.1)) + self.assertTrue(np.allclose(pred2, 2.1)) + + # Predict multiple samples + x_test = np.array([[3, 2], [4, 4], [1, 3]]) + test_data = ds.array(x=x_test, block_size=(bn, bm)) + pred = reg.predict(test_data).collect() + self.assertTrue(np.allclose(pred, [2.1, 3.115625, 1.553125])) + + def test_saving_als(self): + filepath = os.path.join(DIRPATH, "als.cbor") + + data = np.array([[0, 0, 5], [3, 0, 5], [3, 1, 2]]) + ratings = csr_matrix(data) + train = ds.array(x=ratings, block_size=(1, 1)) + als = ALS(tol=0.01, random_state=666, n_f=5, verbose=False) + als.fit(train) + save_model(als, filepath, save_format="cbor") + als2 = load_model(filepath, load_format="cbor") + + predictions = als.predict_user(user_id=0) + predictions2 = als2.predict_user(user_id=0) + + # Check that the ratings for user 0 are similar to user 1 because they + # share preferences (third movie), thus it is expected that user 0 + # will rate movie 1 similarly to user 1. + self.assertTrue( + 2.75 < predictions[0] < 3.25 + and predictions[1] < 1 + and predictions[2] > 4.5 + ) + self.assertTrue( + 2.75 < predictions2[0] < 3.25 + and predictions2[1] < 1 + and predictions2[2] > 4.5 + ) + self.assertTrue( + np.array_equal(predictions, predictions2, equal_nan=True) + ) + + +def load_movielens(train_ratio=0.9): + file = "tests/files/sample_movielens_ratings.csv" + + # 'user_id', 'movie_id', 'rating', 'timestamp' + + data = np.genfromtxt(file, dtype="int", delimiter=",", usecols=range(3)) + + # just in case there are movies/user without rating + # movie_id + n_m = max(len(np.unique(data[:, 1])), max(data[:, 1]) + 1) + # user_id + n_u = max(len(np.unique(data[:, 0])), max(data[:, 0]) + 1) + + idx = int(data.shape[0] * train_ratio) + + train_data = data[:idx] + test_data = data[idx:] + + train = csr_matrix( + (train_data[:, 2], (train_data[:, 0], train_data[:, 1])), + shape=(n_u, n_m), + ) + + test = csr_matrix((test_data[:, 2], (test_data[:, 0], test_data[:, 1]))) + + x_size, y_size = train.shape[0] // 4, train.shape[1] // 4 + train_arr = ds.array(train, block_size=(x_size, y_size)) + + x_size, y_size = test.shape[0] // 4, test.shape[1] // 4 + test_arr = ds.array(test, block_size=(x_size, y_size)) + + return train_arr, test_arr + + +def main(): + unittest.main() + + +if __name__ == "__main__": + main() diff --git a/tests/test_saving_json.py b/tests/test_saving_json.py new file mode 100644 index 00000000..783f9f31 --- /dev/null +++ b/tests/test_saving_json.py @@ -0,0 +1,349 @@ +import unittest +import os +import shutil +import numpy as np +from scipy.sparse import csr_matrix +from sklearn.metrics import r2_score +from sklearn.datasets import make_classification, make_regression + +import dislib as ds +from dislib.cluster import KMeans +from dislib.cluster import GaussianMixture +from dislib.classification import CascadeSVM +from dislib.classification import RandomForestClassifier +from dislib.regression import RandomForestRegressor +from dislib.regression import Lasso +from dislib.regression import LinearRegression +from dislib.recommendation import ALS +from dislib.utils import save_model, load_model + +from pycompss.api.api import compss_wait_on + +DIRPATH = "tests/files/saving" + + +class JSONSavingTest(unittest.TestCase): + def setUp(self) -> None: + os.makedirs(DIRPATH, exist_ok=True) + return super().setUp() + + def tearDown(self) -> None: + shutil.rmtree(DIRPATH) + return super().tearDown() + + def test_saving_kmeans(self): + file_ = "tests/files/libsvm/2" + filepath = os.path.join(DIRPATH, "kmeans.json") + + x_sp, _ = ds.load_svmlight_file(file_, (10, 300), 780, True) + x_ds, _ = ds.load_svmlight_file(file_, (10, 300), 780, False) + + kmeans = KMeans(random_state=170) + kmeans.fit(x_sp) + + save_model(kmeans, filepath, save_format="json") + kmeans2 = load_model(filepath, load_format="json") + + y_sparse = kmeans.predict(x_sp).collect() + y_sparse2 = kmeans2.predict(x_sp).collect() + + sparse_c = kmeans.centers.toarray() + sparse_c2 = kmeans2.centers.toarray() + + kmeans = KMeans(random_state=170) + + y_dense = kmeans.fit_predict(x_ds).collect() + dense_c = kmeans.centers + + self.assertTrue(np.allclose(sparse_c, dense_c)) + self.assertTrue(np.allclose(sparse_c2, dense_c)) + self.assertTrue(np.array_equal(y_sparse, y_dense)) + self.assertTrue(np.array_equal(y_sparse2, y_dense)) + + def test_saving_gm(self): + file_ = "tests/files/libsvm/2" + filepath = os.path.join(DIRPATH, "gm.json") + + x_sparse, _ = ds.load_svmlight_file(file_, (10, 780), 780, True) + x_dense, _ = ds.load_svmlight_file(file_, (10, 780), 780, False) + + covariance_types = "full", "tied", "diag", "spherical" + + for cov_type in covariance_types: + gm = GaussianMixture( + n_components=4, random_state=0, covariance_type=cov_type + ) + gm.fit(x_sparse) + save_model(gm, filepath, save_format="json") + gm2 = load_model(filepath, load_format="json") + labels_sparse = gm.predict(x_sparse).collect() + labels_sparse2 = gm2.predict(x_sparse).collect() + + gm = GaussianMixture( + n_components=4, random_state=0, covariance_type=cov_type + ) + gm.fit(x_dense) + save_model(gm, filepath, save_format="json") + gm2 = load_model(filepath, load_format="json") + labels_dense = gm.predict(x_dense).collect() + labels_dense2 = gm2.predict(x_dense).collect() + + self.assertTrue(np.array_equal(labels_sparse, labels_sparse2)) + self.assertTrue(np.array_equal(labels_sparse, labels_dense)) + self.assertTrue(np.array_equal(labels_sparse2, labels_dense2)) + + def test_saving_csvm(self): + seed = 666 + train = "tests/files/libsvm/3" + filepath = os.path.join(DIRPATH, "csvm.json") + + x_sp, y_sp = ds.load_svmlight_file(train, (10, 300), 780, True) + x_d, y_d = ds.load_svmlight_file(train, (10, 300), 780, False) + + csvm_sp = CascadeSVM(random_state=seed) + csvm_sp.fit(x_sp, y_sp) + save_model(csvm_sp, filepath, save_format="json") + csvm_sp2 = load_model(filepath, load_format="json") + + csvm_d = CascadeSVM(random_state=seed) + csvm_d.fit(x_d, y_d) + save_model(csvm_d, filepath, save_format="json") + csvm_d2 = load_model(filepath, load_format="json") + + sv_d = csvm_d._clf.support_vectors_ + sv_sp = csvm_sp._clf.support_vectors_.toarray() + sv_d2 = csvm_d2._clf.support_vectors_ + sv_sp2 = csvm_sp2._clf.support_vectors_.toarray() + + self.assertTrue(np.array_equal(sv_d, sv_sp)) + self.assertTrue(np.array_equal(sv_d2, sv_sp2)) + self.assertTrue(np.array_equal(sv_d, sv_d2)) + + coef_d = csvm_d._clf.dual_coef_ + coef_sp = csvm_sp._clf.dual_coef_.toarray() + coef_d2 = csvm_d2._clf.dual_coef_ + coef_sp2 = csvm_sp2._clf.dual_coef_.toarray() + + self.assertTrue(np.array_equal(coef_d, coef_sp)) + self.assertTrue(np.array_equal(coef_d2, coef_sp2)) + self.assertTrue(np.array_equal(coef_d, coef_d2)) + + def test_saving_rf_class(self): + filepath = os.path.join(DIRPATH, "rf_class.json") + x, y = make_classification( + n_samples=3000, + n_features=10, + n_classes=3, + n_informative=4, + n_redundant=2, + n_repeated=1, + n_clusters_per_class=2, + shuffle=True, + random_state=0, + ) + x_train = ds.array(x[: len(x) // 2], (300, 10)) + y_train = ds.array(y[: len(y) // 2][:, np.newaxis], (300, 1)) + x_test = ds.array(x[len(x) // 2:], (300, 10)) + y_test = y[len(y) // 2:] + + rf = RandomForestClassifier(random_state=0, sklearn_max=10) + rf.fit(x_train, y_train) + save_model(rf, filepath, save_format="json") + rf2 = load_model(filepath, load_format="json") + + probabilities = rf.predict_proba(x_test).collect() + probabilities2 = rf2.predict_proba(x_test).collect() + rf.classes = compss_wait_on(rf.classes) + rf2.classes = compss_wait_on(rf2.classes) + y_pred = rf.classes[np.argmax(probabilities, axis=1)] + y_pred2 = rf2.classes[np.argmax(probabilities2, axis=1)] + accuracy = np.count_nonzero(y_pred == y_test) / len(y_test) + accuracy2 = np.count_nonzero(y_pred2 == y_test) / len(y_test) + self.assertGreater(accuracy, 0.7) + self.assertGreater(accuracy2, 0.7) + + def test_saving_rf_regr(self): + filepath = os.path.join(DIRPATH, "rf_regr.json") + + def determination_coefficient(y_true, y_pred): + u = np.sum(np.square(y_true - y_pred)) + v = np.sum(np.square(y_true - np.mean(y_true))) + return 1 - u / v + + x, y = make_regression( + n_samples=3000, + n_features=10, + n_informative=4, + shuffle=True, + random_state=0, + ) + x_train = ds.array(x[: len(x) // 2], (300, 10)) + y_train = ds.array(y[: len(y) // 2][:, np.newaxis], (300, 1)) + x_test = ds.array(x[len(x) // 2:], (300, 10)) + y_test = ds.array(y[len(y) // 2:][:, np.newaxis], (300, 1)) + + rf = RandomForestRegressor(random_state=0, sklearn_max=10) + + rf.fit(x_train, y_train) + save_model(rf, filepath, save_format="json") + rf2 = load_model(filepath, load_format="json") + + accuracy1 = compss_wait_on(rf.score(x_test, y_test)) + accuracy2 = compss_wait_on(rf2.score(x_test, y_test)) + y_pred = rf.predict(x_test).collect() + y_true = y[len(y) // 2:] + y_pred2 = rf2.predict(x_test).collect() + y_true2 = y[len(y) // 2:] + coef1 = determination_coefficient(y_true, y_pred) + coef2 = determination_coefficient(y_true2, y_pred2) + + self.assertGreater(accuracy1, 0.85) + self.assertGreater(accuracy2, 0.85) + self.assertGreater(coef1, 0.85) + self.assertGreater(coef2, 0.85) + self.assertAlmostEqual(accuracy1, accuracy2) + self.assertAlmostEqual(coef1, coef2) + + def test_saving_lasso(self): + filepath = os.path.join(DIRPATH, "lasso.json") + np.random.seed(42) + + n_samples, n_features = 50, 100 + X = np.random.randn(n_samples, n_features) + + # Decreasing coef w. alternated signs for visualization + idx = np.arange(n_features) + coef = (-1) ** idx * np.exp(-idx / 10) + coef[10:] = 0 # sparsify coef + y = np.dot(X, coef) + + # Add noise + y += 0.01 * np.random.normal(size=n_samples) + + n_samples = X.shape[0] + X_train, y_train = X[: n_samples // 2], y[: n_samples // 2] + X_test, y_test = X[n_samples // 2:], y[n_samples // 2:] + + lasso = Lasso(lmbd=0.1, max_iter=50) + + lasso.fit(ds.array(X_train, (5, 100)), ds.array(y_train, (5, 1))) + save_model(lasso, filepath, save_format="json") + lasso2 = load_model(filepath, load_format="json") + + y_pred_lasso = lasso.predict(ds.array(X_test, (25, 100))) + r2_score_lasso = r2_score(y_test, y_pred_lasso.collect()) + y_pred_lasso2 = lasso2.predict(ds.array(X_test, (25, 100))) + r2_score_lasso2 = r2_score(y_test, y_pred_lasso2.collect()) + + self.assertAlmostEqual(r2_score_lasso, 0.9481746925431124) + self.assertAlmostEqual(r2_score_lasso2, 0.9481746925431124) + + def test_saving_linear(self): + filepath = os.path.join(DIRPATH, "linear_regression.json") + + x_data = np.array([[1, 2], [2, 0], [3, 1], [4, 4], [5, 3]]) + y_data = np.array([2, 1, 1, 2, 4.5]) + + bn, bm = 2, 2 + + x = ds.array(x=x_data, block_size=(bn, bm)) + y = ds.array(x=y_data, block_size=(bn, 1)) + + reg = LinearRegression() + reg.fit(x, y) + save_model(reg, filepath, save_format="json") + reg2 = load_model(filepath, load_format="json") + + self.assertTrue(np.allclose(reg.coef_.collect(), [0.421875, 0.296875])) + self.assertTrue( + np.allclose(reg2.coef_.collect(), [0.421875, 0.296875]) + ) + self.assertTrue(np.allclose(reg.intercept_.collect(), 0.240625)) + self.assertTrue(np.allclose(reg2.intercept_.collect(), 0.240625)) + + # Predict one sample + x_test = np.array([3, 2]) + test_data = ds.array(x=x_test, block_size=(1, bm)) + pred = reg.predict(test_data).collect() + pred2 = reg2.predict(test_data).collect() + self.assertTrue(np.allclose(pred, 2.1)) + self.assertTrue(np.allclose(pred2, 2.1)) + + # Predict multiple samples + x_test = np.array([[3, 2], [4, 4], [1, 3]]) + test_data = ds.array(x=x_test, block_size=(bn, bm)) + pred = reg.predict(test_data).collect() + self.assertTrue(np.allclose(pred, [2.1, 3.115625, 1.553125])) + + def test_saving_als(self): + filepath = os.path.join(DIRPATH, "als.json") + + data = np.array([[0, 0, 5], [3, 0, 5], [3, 1, 2]]) + ratings = csr_matrix(data) + train = ds.array(x=ratings, block_size=(1, 1)) + als = ALS(tol=0.01, random_state=666, n_f=5, verbose=False) + als.fit(train) + save_model(als, filepath, save_format="json") + als2 = load_model(filepath, load_format="json") + + predictions = als.predict_user(user_id=0) + predictions2 = als2.predict_user(user_id=0) + + # Check that the ratings for user 0 are similar to user 1 because they + # share preferences (third movie), thus it is expected that user 0 + # will rate movie 1 similarly to user 1. + self.assertTrue( + 2.75 < predictions[0] < 3.25 + and predictions[1] < 1 + and predictions[2] > 4.5 + ) + self.assertTrue( + 2.75 < predictions2[0] < 3.25 + and predictions2[1] < 1 + and predictions2[2] > 4.5 + ) + self.assertTrue( + np.array_equal(predictions, predictions2, equal_nan=True) + ) + + +def load_movielens(train_ratio=0.9): + file = "tests/files/sample_movielens_ratings.csv" + + # 'user_id', 'movie_id', 'rating', 'timestamp' + + data = np.genfromtxt(file, dtype="int", delimiter=",", usecols=range(3)) + + # just in case there are movies/user without rating + # movie_id + n_m = max(len(np.unique(data[:, 1])), max(data[:, 1]) + 1) + # user_id + n_u = max(len(np.unique(data[:, 0])), max(data[:, 0]) + 1) + + idx = int(data.shape[0] * train_ratio) + + train_data = data[:idx] + test_data = data[idx:] + + train = csr_matrix( + (train_data[:, 2], (train_data[:, 0], train_data[:, 1])), + shape=(n_u, n_m), + ) + + test = csr_matrix((test_data[:, 2], (test_data[:, 0], test_data[:, 1]))) + + x_size, y_size = train.shape[0] // 4, train.shape[1] // 4 + train_arr = ds.array(train, block_size=(x_size, y_size)) + + x_size, y_size = test.shape[0] // 4, test.shape[1] // 4 + test_arr = ds.array(test, block_size=(x_size, y_size)) + + return train_arr, test_arr + + +def main(): + unittest.main() + + +if __name__ == "__main__": + main()