-
Notifications
You must be signed in to change notification settings - Fork 11
/
Copy pathmy_DT_hint.py
148 lines (126 loc) · 6.12 KB
/
my_DT_hint.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
import pandas as pd
import numpy as np
from collections import Counter
class my_DT:
def __init__(self, criterion="gini", max_depth=8, min_impurity_decrease=0, min_samples_split=2):
# criterion = {"gini", "entropy"},
# Stop training if depth = max_depth
# Only split node if impurity decrease >= min_impurity_decrease after the split
# Weighted impurity decrease: impurity - (N_t_R / N * right_impurity + N_t_L / N * left_impurity)
# Only split node with >= min_samples_split samples
self.criterion = criterion
self.max_depth = int(max_depth)
self.min_impurity_decrease = min_impurity_decrease
self.min_samples_split = int(min_samples_split)
def impurity(self, labels):
# Calculate impurity (unweighted)
# Input is a list (or np.array) of labels
# Output impurity score
stats = Counter(labels)
N = float(len(labels))
if self.criterion == "gini":
# Implement gini impurity
elif self.criterion == "entropy":
# Implement entropy impurity
else:
raise Exception("Unknown criterion.")
return impure
def find_best_split(self, pop, X, labels):
# Find the best split
# Inputs:
# pop: indices of data in the node
# X: independent variables of training data
# labels: dependent variables of training data
# Output: tuple(best feature to split, weighted impurity score of best split, splitting point of the feature, [indices of data in left node, indices of data in right node], [weighted impurity score of left node, weighted impurity score of right node])
######################
best_feature = None
for feature in X.keys():
cans = np.array(X[feature][pop])
return best_feature
def fit(self, X, y):
# X: pd.DataFrame, independent variables, float
# y: list, np.array or pd.Series, dependent variables, int or str
self.classes_ = list(set(list(y)))
labels = np.array(y)
N = len(y)
##### A Binary Tree structure implemented in the form of dictionary #####
# 0 is the root node
# node i have two childen: left = i*2+1, right = i*2+2
# self.tree[i] = tuple(feature to split on, value of the splitting point) if it is not a leaf
# = Counter(labels of the training data in this leaf) if it is a leaf node
self.tree = {}
# population keeps the indices of data points in each node
population = {0: np.array(range(N))}
# impurity stores the weighted impurity scores for each node (# data in node * unweighted impurity).
# NOTE: for simplicity reason we do not divide weighted impurity score by N here.
impurity = {0: self.impurity(labels[population[0]]) * N}
#########################################################################
level = 0
nodes = [0]
while level < self.max_depth and nodes:
# Breadth-first search to split nodes
next_nodes = []
for node in nodes:
current_pop = population[node]
current_impure = impurity[node]
if len(current_pop) < self.min_samples_split or current_impure == 0 or level+1 == self.max_depth:
# The node is a leaf node
self.tree[node] = Counter(labels[current_pop])
else:
# Find the best split using find_best_split function
best_feature = self.find_best_split(current_pop, X, labels)
if best_feature and (current_impure - best_feature[1]) > self.min_impurity_decrease * N:
# Split the node
self.tree[node] = (best_feature[0], best_feature[2])
next_nodes.extend([node * 2 + 1, node * 2 + 2])
population[node * 2 + 1] = best_feature[3][0]
population[node * 2 + 2] = best_feature[3][1]
impurity[node * 2 + 1] = best_feature[4][0]
impurity[node * 2 + 2] = best_feature[4][1]
else:
# The node is a leaf node
self.tree[node] = Counter(labels[current_pop])
nodes = next_nodes
level += 1
return
def predict(self, X):
# X: pd.DataFrame, independent variables, float
# return predictions: list
predictions = []
for i in range(len(X)):
node = 0
while True:
if type(self.tree[node]) == Counter:
label = list(self.tree[node].keys())[np.argmax(self.tree[node].values())]
predictions.append(label)
break
else:
if X[self.tree[node][0]][i] < self.tree[node][1]:
node = node * 2 + 1
else:
node = node * 2 + 2
return predictions
def predict_proba(self, X):
# X: pd.DataFrame, independent variables, float
# Eample:
# self.classes_ = {"2", "1"}
# the reached node for the test data point has {"1":2, "2":1}
# then the prob for that data point is {"2": 1/3, "1": 2/3}
# return probs = pd.DataFrame(list of prob, columns = self.classes_)
predictions = []
for i in range(len(X)):
node = 0
while True:
if type(self.tree[node]) == Counter:
# Calculate prediction probabilities for data point arriving at the leaf node.
# predictions = list of prob, e.g. prob = {"2": 1/3, "1": 2/3}
prob = {"write your own code"}
predictions.append(prob)
break
else:
if X[self.tree[node][0]][i] < self.tree[node][1]:
node = node * 2 + 1
else:
node = node * 2 + 2
probs = pd.DataFrame(predictions, columns=self.classes_)
return probs