1+ import numpy as np
2+ import copy
3+
4+ # Implement Greedy Algorithm for split finding in a regression tree
5+ class RegressionTree :
6+ def __init__ (self , n_estimator = 1 , max_depth = 3 , reg_lambda = 1.0 , prune_gamma = 0.0 ):
7+ self .n_estimator = n_estimator # Number of estimators
8+ self .max_depth = max_depth # Maximum depth of the tree
9+ self .reg_lambda = reg_lambda # Regularization constant
10+ self .prune_gamma = prune_gamma # Pruning threshold
11+ self .estimator1 = None # Tree structure before assigning leaf values
12+ self .estimator2 = None # Tree structure with leaf values
13+ self .feature = None # Feature matrix (X)
14+ self .residual = None # Residuals (y - prediction)
15+ self .base_score = None # Initial prediction (mean of residuals)
16+
17+ # Regularized learning objective:
18+ # Split a node into left and right to maximize gain
19+ def node_split (self , did ):
20+ r = self .reg_lambda
21+ max_gain = - np .inf
22+ d = self .feature .shape [1 ] # Number of features
23+
24+ # Calculate gradient before split
25+ G = - self .residual [did ].sum () # Sum of residuals
26+ H = did .shape [0 ] # Number of samples in node
27+ p_score = (G ** 2 ) / (H + r ) # Score before the split
28+
29+ best_split = None
30+
31+ # Iterate over all features to find best split
32+ for k in range (d ):
33+ X_feat = self .feature [did , k ]
34+ x_uniq = np .unique (X_feat )
35+ # Candidate split points: midpoints between unique sorted feature values
36+ s_point = [(x_uniq [i - 1 ] + x_uniq [i ])/ 2 for i in range (1 , len (x_uniq ))]
37+
38+ l_bound = - np .inf
39+ for j in s_point :
40+ # Split samples into left and right nodes
41+ left = did [(X_feat > l_bound ) & (X_feat <= j )]
42+ right = did [X_feat > j ]
43+
44+ if len (left ) == 0 or len (right ) == 0 :
45+ continue
46+
47+ # Calculate gradients and hessians for left and right
48+ GL = - self .residual [left ].sum ()
49+ HL = left .shape [0 ]
50+ GR = G - GL
51+ HR = H - HL
52+
53+ # Calculate gain for this split
54+ gain = (GL ** 2 )/ (HL + r ) + (GR ** 2 )/ (HR + r ) - p_score
55+
56+ if gain > max_gain :
57+ max_gain = gain
58+ best_split = {"fid" : k , "split_point" : j , "left" : left , "right" : right }
59+
60+ l_bound = j
61+
62+ # Only split if gain exceeds pruning threshold
63+ if max_gain >= self .prune_gamma :
64+ return best_split
65+ return np .nan # No valid split found
66+
67+ # Recursively split tree nodes until maximum depth is reached
68+ def recursive_split (self , node , curr_depth ):
69+ if curr_depth >= self .max_depth or not isinstance (node , dict ):
70+ return
71+
72+ self .recursive_split (node .get ("left" ), curr_depth + 1 )
73+ self .recursive_split (node .get ("right" ), curr_depth + 1 )
74+
75+ # Calculate output value for a leaf node (regularized)
76+ def output_value (self , did ):
77+ return np .sum (self .residual [did ]) / (did .shape [0 ] + self .reg_lambda )
78+
79+ # Assign output values to all leaf nodes in the tree
80+ def output_leaf (self , d ):
81+ if isinstance (d , dict ):
82+ for key in ["left" , "right" ]:
83+ val = d [key ]
84+ if isinstance (val , dict ):
85+ self .output_leaf (val )
86+ else :
87+ # Replace node indices with actual leaf value
88+ d [key ] = self .output_value (val )
89+
90+ # Fit the regression tree to feature matrix X and residuals y
91+ def fit (self , x , y ):
92+ self .feature = x
93+ self .residual = y
94+ self .base_score = y .mean () # Initial prediction (mean of residuals)
95+
96+ # Build the tree from root
97+ root = self .node_split (np .arange (x .shape [0 ]))
98+ if isinstance (root , dict ):
99+ self .recursive_split (root , curr_depth = 1 )
100+ self .estimator1 = root
101+ self .estimator2 = copy .deepcopy (root )
102+ self .output_leaf (self .estimator2 ) # Assign leaf values
103+
104+ return self .estimator2
105+
106+ # Predict output for a single sample
107+ def x_predict (self , p , x ):
108+ if x [p ["fid" ]] <= p ["split_point" ]:
109+ if isinstance (p ["left" ], dict ):
110+ return self .x_predict (p ["left" ], x )
111+ else :
112+ return p ["left" ]
113+ else :
114+ if isinstance (p ["right" ], dict ):
115+ return self .x_predict (p ["right" ], x )
116+ else :
117+ return p ["right" ]
118+
119+ # Predict outputs for multiple samples
120+ def predict (self , x_test ):
121+ if self .estimator2 is None :
122+ # If tree is empty, return base score
123+ return np .array ([self .base_score ] * x_test .shape [0 ])
124+ # Traverse tree for each sample
125+ return np .array ([self .x_predict (self .estimator2 , x ) for x in x_test ])
126+ #Built XGBoostRegressor
127+ class MyXGBoostRegressor :
128+ def __init__ (self , n_estimators = 10 , max_depth = 3 , reg_lambda = 1.0 , prune_gamma = 0.0 , learning_rate = 0.1 ):
129+ self .n_estimators = n_estimators
130+ self .max_depth = max_depth
131+ self .reg_lambda = reg_lambda
132+ self .prune_gamma = prune_gamma
133+ self .learning_rate = learning_rate
134+ self .trees = []
135+ self .base_score = None
136+
137+ def fit (self , X , y ):
138+ n_samples = X .shape [0 ]
139+ # Initial prediction: mean of y
140+ self .base_score = y .mean ()
141+ y_pred = np .full (n_samples , self .base_score )
142+
143+ for m in range (self .n_estimators ):
144+ # Compute residuals (negative gradient for squared error)
145+ residuals = y - y_pred
146+ tree = RegressionTree (max_depth = self .max_depth , reg_lambda = self .reg_lambda , prune_gamma = self .prune_gamma )
147+ tree .fit (X , residuals )
148+ # Predict residuals and update y_pred
149+ update = tree .predict (X )
150+ y_pred += self .learning_rate * update
151+ self .trees .append (tree )
152+
153+ def predict (self , X ):
154+ y_pred = np .full (X .shape [0 ], self .base_score )
155+ for tree in self .trees :
156+ y_pred += self .learning_rate * tree .predict (X )
157+ return y_pred
0 commit comments