diff --git a/dislib/regression/__init__.py b/dislib/regression/__init__.py index e3287a0b..4a222968 100644 --- a/dislib/regression/__init__.py +++ b/dislib/regression/__init__.py @@ -1,4 +1,5 @@ from dislib.regression.linear.base import LinearRegression from dislib.regression.lasso.base import Lasso +from dislib.regression.rf.forest import RandomForestRegressor -__all__ = ['LinearRegression', 'Lasso'] +__all__ = ["LinearRegression", "Lasso", "RandomForestRegressor"] diff --git a/dislib/regression/rf/decision_tree.py b/dislib/regression/rf/decision_tree.py index 43ecaf79..82730a5d 100644 --- a/dislib/regression/rf/decision_tree.py +++ b/dislib/regression/rf/decision_tree.py @@ -196,13 +196,13 @@ def predict(self, sample): if len(sample) > 0: return node_content.sk_tree.predict(sample) if isinstance(node_content, _InnerNodeInfo): - pred = np.empty((len(sample),), dtype=np.int64) + pred = np.empty((len(sample),), dtype=np.float64) left_mask = sample[:, node_content.index] <= node_content.value pred[left_mask] = self.left.predict(sample[left_mask]) pred[~left_mask] = self.right.predict(sample[~left_mask]) return pred assert len(sample) == 0, "Type not supported" - return np.empty((0,), dtype=np.int64) + return np.empty((0,), dtype=np.float64) class _InnerNodeInfo: @@ -220,7 +220,6 @@ def __init__(self, size=None, mean=None): class _SkTreeWrapper: def __init__(self, tree): self.sk_tree = tree - self.classes = tree.classes_ def _get_sample_attributes(samples_file, indices): @@ -260,8 +259,8 @@ def _feature_selection(untried_indices, m_try, random_state): def _get_groups(sample, y_s, features_mmap, index, value): if index is None: empty_sample = np.array([], dtype=np.int64) - empty_labels = np.array([], dtype=np.int8) - return sample, y_s, empty_sample, empty_labels + empty_target = np.array([], dtype=np.float64) + return sample, y_s, empty_sample, empty_target feature = features_mmap[index][sample] mask = feature < value left = sample[mask] @@ -351,7 +350,7 @@ def _compute_split( left_group = sample y_l = y_s right_group = np.array([], dtype=np.int64) - y_r = np.array([], dtype=np.int8) + y_r = np.array([], dtype=np.float64) return node_info, left_group, y_l, right_group, y_r @@ -558,7 +557,7 @@ def _merge_branches(n_classes, *predictions): dtype = np.float64 else: # predict_proba shape = (samples_len,) - dtype = np.int64 + dtype = np.float64 merged_prediction = np.empty(shape, dtype=dtype) for selected, prediction in predictions: merged_prediction[selected] = prediction diff --git a/dislib/regression/rf/forest.py b/dislib/regression/rf/forest.py index 8f6c0f2a..faae07c8 100644 --- a/dislib/regression/rf/forest.py +++ b/dislib/regression/rf/forest.py @@ -8,14 +8,14 @@ from sklearn.base import BaseEstimator from sklearn.utils import check_random_state -from dislib.classification.rf.decision_tree import DecisionTreeClassifier +from dislib.regression.rf.decision_tree import DecisionTreeRegressor from dislib.data.array import Array from dislib.utils.base import _paired_partition -from dislib.classification.rf._data import transform_to_rf_dataset +from dislib.regression.rf._data import transform_to_rf_dataset -class RandomForestClassifier(BaseEstimator): - """A distributed random forest classifier. +class RandomForestRegressor(BaseEstimator): + """A distributed random forest regressor. Parameters ---------- @@ -39,16 +39,11 @@ class RandomForestClassifier(BaseEstimator): distributed way. sklearn_max: int or float, optional (default=1e8) Maximum size (len(subsample)*n_features) of the arrays passed to - sklearn's DecisionTreeClassifier.fit(), which is called to fit subtrees - (subsamples) of our DecisionTreeClassifier. sklearn fit() is used + sklearn's DecisionTreeRegressor.fit(), which is called to fit subtrees + (subsamples) of our DecisionTreeRegressor. sklearn fit() is used because it's faster, but requires loading the data to memory, which can cause memory problems for large datasets. This parameter can be adjusted to fit the hardware capabilities. - hard_vote : bool, optional (default=False) - If True, it uses majority voting over the predict() result of the - decision tree predictions. If False, it takes the class with the higher - probability given by predict_proba(), which is an average of the - probabilities given by the decision trees. random_state : int, RandomState instance or None, optional (default=None) If int, random_state is the seed used by the random number generator; If RandomState instance, random_state is the random number generator; @@ -57,30 +52,28 @@ class RandomForestClassifier(BaseEstimator): Attributes ---------- - classes : None or ndarray - Array of distinct classes, set at fit(). - trees : list of DecisionTreeClassifier - List of the tree classifiers of this forest, populated at fit(). + trees : list of DecisionTreeRegressor + List of the tree regressors of this forest, populated at fit(). """ - def __init__(self, - n_estimators=10, - try_features='sqrt', - max_depth=np.inf, - distr_depth='auto', - sklearn_max=1e8, - hard_vote=False, - random_state=None): + def __init__( + self, + n_estimators=10, + try_features="sqrt", + max_depth=np.inf, + distr_depth="auto", + sklearn_max=1e8, + random_state=None, + ): self.n_estimators = n_estimators self.try_features = try_features self.max_depth = max_depth self.distr_depth = distr_depth self.sklearn_max = sklearn_max - self.hard_vote = hard_vote self.random_state = random_state def fit(self, x, y): - """Fits the RandomForestClassifier. + """Fits the RandomForestRegressor. Parameters ---------- @@ -92,10 +85,9 @@ def fit(self, x, y): Returns ------- - self : RandomForestClassifier + self : RandomForestRegressor """ - self.classes = None self.trees = [] dataset = transform_to_rf_dataset(x, y) @@ -104,20 +96,22 @@ def fit(self, x, y): try_features = _resolve_try_features(self.try_features, n_features) random_state = check_random_state(self.random_state) - self.classes = dataset.get_classes() - - if self.distr_depth == 'auto': + if self.distr_depth == "auto": dataset.n_samples = compss_wait_on(dataset.get_n_samples()) distr_depth = max(0, int(math.log10(dataset.n_samples)) - 4) distr_depth = min(distr_depth, self.max_depth) else: distr_depth = self.distr_depth - for i in range(self.n_estimators): - tree = DecisionTreeClassifier(try_features, self.max_depth, - distr_depth, self.sklearn_max, - bootstrap=True, - random_state=random_state) + for _ in range(self.n_estimators): + tree = DecisionTreeRegressor( + try_features, + self.max_depth, + distr_depth, + self.sklearn_max, + bootstrap=True, + random_state=random_state, + ) self.trees.append(tree) for tree in self.trees: @@ -125,44 +119,8 @@ def fit(self, x, y): return self - def predict_proba(self, x): - """Predicts class probabilities using a fitted forest. - - The probabilities are obtained as an average of the probabilities of - each decision tree. - - - Parameters - ---------- - x : ds-array, shape=(n_samples, n_features) - The input samples. - - Returns - ------- - probabilities : ds-array, shape=(n_samples, n_classes) - Predicted probabilities for the samples to belong to each class. - The columns of the array correspond to the classes given at - self.classes. - - """ - assert self.trees is not None, 'The random forest is not fitted.' - prob_blocks = [] - for x_row in x._iterator(axis=0): - tree_predictions = [] - for tree in self.trees: - tree_predictions.append(tree.predict_proba(x_row)) - prob_blocks.append([_join_predictions(*tree_predictions)]) - self.classes = compss_wait_on(self.classes) - n_classes = len(self.classes) - - probabilities = Array(blocks=prob_blocks, - top_left_shape=(x._top_left_shape[0], n_classes), - reg_shape=(x._reg_shape[0], n_classes), - shape=(x.shape[0], n_classes), sparse=False) - return probabilities - def predict(self, x): - """Predicts classes using a fitted forest. + """Predicts target values using a fitted forest. Parameters ---------- @@ -172,36 +130,40 @@ def predict(self, x): Returns ------- y_pred : ds-array, shape=(n_samples, 1) - Predicted class labels for x. + Predicted target values for x. """ - assert self.trees is not None, 'The random forest is not fitted.' + assert self.trees is not None, "The random forest is not fitted." pred_blocks = [] - if self.hard_vote: - for x_row in x._iterator(axis=0): - tree_predictions = [] - for tree in self.trees: - tree_predictions.append(tree.predict(x_row)) - pred_blocks.append(_hard_vote(self.classes, *tree_predictions)) - else: - for x_row in x._iterator(axis=0): - tree_predictions = [] - for tree in self.trees: - tree_predictions.append(tree.predict_proba(x_row)) - pred_blocks.append(_soft_vote(self.classes, *tree_predictions)) + for x_row in x._iterator(axis=0): + tree_predictions = [] + for tree in self.trees: + tree_predictions.append(tree.predict(x_row)) + pred_blocks.append(_join_predictions(*tree_predictions)) - y_pred = Array(blocks=[pred_blocks], - top_left_shape=(x._top_left_shape[0], 1), - reg_shape=(x._reg_shape[0], 1), shape=(x.shape[0], 1), - sparse=False) + y_pred = Array( + blocks=[pred_blocks], + top_left_shape=(x._top_left_shape[0], 1), + reg_shape=(x._reg_shape[0], 1), + shape=(x.shape[0], 1), + sparse=False, + ) return y_pred def score(self, x, y): - """Accuracy classification score. - - Returns the mean accuracy on the given test data. - + """Accuracy regression score. + + Return the coefficient of determination $R^2$ of + the prediction. + The coefficient $R^2$ is defined as $(1-u/v)$, where $u$ + is the residual sum of squares `((y_true - y_pred) ** 2).sum()` and + $v$ is the total sum of squares + `((y_true - y_true.mean()) ** 2).sum()`. + The best possible score is 1.0 and it can be negative + (because the model can be arbitrarily worse). + A constant model that always predicts the expected value of y, + disregarding the input features, would get a $R^2$ score of 0.0. Parameters ---------- @@ -213,27 +175,17 @@ def score(self, x, y): Returns ------- score : float (as future object) - Fraction of correctly classified samples. + Coefficient of determination $R^2$. """ - assert self.trees is not None, 'The random forest is not fitted.' + assert self.trees is not None, "The random forest is not fitted." partial_scores = [] - if self.hard_vote: - for x_row, y_row in _paired_partition(x, y): - tree_predictions = [] - for tree in self.trees: - tree_predictions.append(tree.predict(x_row)) - subset_score = _hard_vote_score(y_row._blocks, self.classes, - *tree_predictions) - partial_scores.append(subset_score) - else: - for x_row, y_row in _paired_partition(x, y): - tree_predictions = [] - for tree in self.trees: - tree_predictions.append(tree.predict_proba(x_row)) - subset_score = _soft_vote_score(y_row._blocks, self.classes, - *tree_predictions) - partial_scores.append(subset_score) + for x_row, y_row in _paired_partition(x, y): + tree_predictions = [] + for tree in self.trees: + tree_predictions.append(tree.predict(x_row)) + subset_score = _partial_score(y_row._blocks, *tree_predictions) + partial_scores.append(subset_score) return _merge_scores(*partial_scores) @@ -242,9 +194,9 @@ def score(self, x, y): def _resolve_try_features(try_features, n_features): if try_features is None: return n_features - elif try_features == 'sqrt': + elif try_features == "sqrt": return int(math.sqrt(n_features)) - elif try_features == 'third': + elif try_features == "third": return max(1, n_features // 3) else: return int(try_features) @@ -255,52 +207,30 @@ def _join_predictions(*predictions): aggregate = predictions[0] for p in predictions[1:]: aggregate += p - labels = aggregate / len(predictions) - return labels - - -@task(returns=1) -def _soft_vote(classes, *predictions): - aggregate = predictions[0] - for p in predictions[1:]: - aggregate += p - labels = classes[np.argmax(aggregate, axis=1)] - return labels - - -@task(returns=1) -def _hard_vote(classes, *predictions): - mode = np.empty((len(predictions[0]),), dtype=int) - for sample_i, votes in enumerate(zip(*predictions)): - mode[sample_i] = Counter(votes).most_common(1)[0][0] - labels = classes[mode] - return labels + target = aggregate / len(predictions) + return target @task(y_blocks={Type: COLLECTION_IN, Depth: 2}, returns=1) -def _soft_vote_score(y_blocks, classes, *predictions): - real_labels = Array._merge_blocks(y_blocks).flatten() - aggregate = predictions[0] - for p in predictions[1:]: - aggregate += p - predicted_labels = classes[np.argmax(aggregate, axis=1)] - correct = np.count_nonzero(predicted_labels == real_labels) - return correct, len(real_labels) - - -@task(y_blocks={Type: COLLECTION_IN, Depth: 2}, returns=1) -def _hard_vote_score(y_blocks, classes, *predictions): - real_labels = Array._merge_blocks(y_blocks).flatten() - mode = np.empty((len(predictions[0]),), dtype=int) - for sample_i, votes in enumerate(zip(*predictions)): - mode[sample_i] = Counter(votes).most_common(1)[0][0] - predicted_labels = classes[mode] - correct = np.count_nonzero(predicted_labels == real_labels) - return correct, len(real_labels) +def _partial_score(y_blocks, *predictions): + y_true = Array._merge_blocks(y_blocks).flatten() + y_pred = np.mean(predictions, axis=0) + n_samples = y_true.shape[0] + y_avg = np.mean(y_true) + u_partial = np.sum(np.square(y_true - y_pred), axis=0) + v_partial = np.sum(np.square(y_true - y_avg), axis=0) + return u_partial, v_partial, y_avg, n_samples @task(returns=1) def _merge_scores(*partial_scores): - correct = sum(subset_score[0] for subset_score in partial_scores) - total = sum(subset_score[1] for subset_score in partial_scores) - return correct / total + u = v = avg = n = 0 + for u_p, v_p, avg_p, n_p in partial_scores: + u += u_p + + delta = avg_p - avg + avg += delta * n_p / (n + n_p) + v += v_p + delta ** 2 * n * n_p / (n + n_p) + n += n_p + + return 1 - u / v diff --git a/tests/test_rf_regressor.py b/tests/test_rf_regressor.py new file mode 100644 index 00000000..2d82dbeb --- /dev/null +++ b/tests/test_rf_regressor.py @@ -0,0 +1,105 @@ +import unittest + +import numpy as np +from pycompss.api.api import compss_wait_on +from sklearn.datasets import make_regression + +import dislib as ds +from dislib.regression import RandomForestRegressor + + +def _determination_coefficient(y_true, y_pred): + u = np.sum(np.square(y_true - y_pred)) + v = np.sum(np.square(y_true - np.mean(y_true))) + return 1 - u / v + + +class RandomForestRegressorTest(unittest.TestCase): + def test_make_regression(self): + """Tests RandomForestRegressor fit and score with default params.""" + x, y = make_regression( + n_samples=3000, + n_features=10, + n_informative=4, + shuffle=True, + random_state=0, + ) + x_train = ds.array(x[: len(x) // 2], (300, 10)) + y_train = ds.array(y[: len(y) // 2][:, np.newaxis], (300, 1)) + x_test = ds.array(x[len(x) // 2 :], (300, 10)) + y_test = ds.array(y[len(y) // 2 :][:, np.newaxis], (300, 1)) + + rf = RandomForestRegressor(random_state=0) + + rf.fit(x_train, y_train) + accuracy1 = compss_wait_on(rf.score(x_test, y_test)) + + y_pred = rf.predict(x_test).collect() + y_true = y[len(y) // 2 :] + accuracy2 = _determination_coefficient(y_true, y_pred) + + self.assertGreater(accuracy1, 0.85) + self.assertGreater(accuracy2, 0.85) + self.assertAlmostEqual(accuracy1, accuracy2) + + def test_make_regression_predict_and_distr_depth(self): + """Tests RandomForestRegressor fit and predict with a distr_depth.""" + x, y = make_regression( + n_samples=3000, + n_features=10, + n_informative=4, + shuffle=True, + random_state=0, + ) + x_train = ds.array(x[: len(x) // 2], (300, 10)) + y_train = ds.array(y[: len(y) // 2][:, np.newaxis], (300, 1)) + x_test = ds.array(x[len(x) // 2 :], (300, 10)) + y_test = ds.array(y[len(y) // 2 :][:, np.newaxis], (300, 1)) + + rf = RandomForestRegressor(distr_depth=2, random_state=0) + + rf.fit(x_train, y_train) + accuracy1 = compss_wait_on(rf.score(x_test, y_test)) + + y_pred = rf.predict(x_test).collect() + y_true = y[len(y) // 2 :] + accuracy2 = _determination_coefficient(y_true, y_pred) + + self.assertGreater(accuracy1, 0.85) + self.assertGreater(accuracy2, 0.85) + self.assertAlmostEqual(accuracy1, accuracy2) + + def test_make_regression_sklearn_max_predict(self): + """Tests RandomForestRegressor predict with sklearn_max.""" + x, y = make_regression( + n_samples=3000, + n_features=10, + n_informative=4, + shuffle=True, + random_state=0, + ) + x_train = ds.array(x[: len(x) // 2], (300, 10)) + y_train = ds.array(y[: len(y) // 2][:, np.newaxis], (300, 1)) + x_test = ds.array(x[len(x) // 2 :], (300, 10)) + y_test = ds.array(y[len(y) // 2 :][:, np.newaxis], (300, 1)) + + rf = RandomForestRegressor(random_state=0, sklearn_max=10) + + rf.fit(x_train, y_train) + accuracy1 = compss_wait_on(rf.score(x_test, y_test)) + + y_pred = rf.predict(x_test).collect() + y_true = y[len(y) // 2 :] + accuracy2 = _determination_coefficient(y_true, y_pred) + + self.assertGreater(accuracy1, 0.85) + self.assertGreater(accuracy2, 0.85) + self.assertAlmostEqual(accuracy1, accuracy2) + + +def main(): + unittest.main() + + +if __name__ == "__main__": + main()