-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathSVM(SMO算法).py
215 lines (195 loc) · 8.88 KB
/
SVM(SMO算法).py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
from numpy import *
# 读取数据
def loadDataSet(filename):
dataMat=[]
labelMat=[]
fr=open(filename)
for line in fr.readlines():
lineArr=line.strip().split(',')
dataMat.append([float(lineArr[0]),float(lineArr[1])])
labelMat.append(float(lineArr[2]))
return dataMat,labelMat # 返回数据特征和数据类别
def selectJrand(i,m): #在0-m中随机选择一个不是i的整数
j=i
while (j==i):
j=int(random.uniform(0,m))
return j
def clipAlpha(aj,H,L): #保证a在L和H范围内(L <= a <= H)
if aj>H:
aj=H
if L>aj:
aj=L
return aj
def kernelTrans(X, A, kTup):
'''
:param X: 支持向量的特征树
:param A: 某一行特征数据
:param kTup: ('lin',k1)核函数的类型和参数
:return:
'''
m, n = shape(X) # 获取域数据集的样本数目, 和特征维数
K = mat(zeros((m,1))) #
if kTup[0]=='lin': # 线性函数
K = X * A.T
elif kTup[0]=='rbf': # 径向基函数(radial bias function)
for j in range(m): # 遍历所有的样本
deltaRow = X[j,:] - A # 当前样本的减去A
K[j] = deltaRow*deltaRow.T #
# 高斯核函数
K = exp(K/(-1*kTup[1]**2)) # 返回生成的结果
else:
raise NameError('Houston We Have a Problem -- That Kernel is not recognized')
return K # 返回核函数
# 定义类,方便存储数据
class optStruct:
def __init__(self,dataMatIn, classLabels, C, toler, kTup): # 存储各类参数
self.X = dataMatIn # 数据集
self.labelMat = classLabels # 数据标签
self.C = C # 软间隔参数C,参数越大,非线性拟合能力越强
self.tol = toler # 停止阀值
self.m = shape(dataMatIn)[0] # 样本数目
self.alphas = mat(zeros((self.m, 1))) # 记录alpha值
self.b = 0 # 初始设为0
self.eCache = mat(zeros((self.m, 2))) # 缓存
self.K = mat(zeros((self.m, self.m))) # 核函数的计算结果
for i in range(self.m):
self.K[:, i] = kernelTrans(self.X, self.X[i, :], kTup) # 初始化核函数
def calcEk(oS, k): #计算Ek(参考《统计学习方法》p127公式7.105)
fXk = float(multiply(oS.alphas,oS.labelMat).T*oS.K[:,k] + oS.b)
Ek = fXk - float(oS.labelMat[k])
return Ek
#随机选取aj,并返回其E值
def selectJ(i, oS, Ei):
maxK = -1
maxDeltaE = 0
Ej = 0
oS.eCache[i] = [1,Ei]
validEcacheList = nonzero(oS.eCache[:,0].A)[0] # 返回矩阵中的非零位置的行数
if (len(validEcacheList)) > 1:
for k in validEcacheList:
if k == i:
continue
Ek = calcEk(oS, k)
deltaE = abs(Ei - Ek)
if (deltaE > maxDeltaE): # 返回步长最大的aj
maxK = k
maxDeltaE = deltaE
Ej = Ek
return maxK, Ej
else:
j = selectJrand(i, oS.m)
Ej = calcEk(oS, j)
return j, Ej
def updateEk(oS, k): #更新os数据
Ek = calcEk(oS, k)
oS.eCache[k] = [1,Ek]
# 首先检验ai是否满足KKT条件,如果不满足,随机选择aj进行优化,更新ai,aj,b值
def innerL(i, oS): # 输入参数i和所有参数数据
Ei = calcEk(oS, i) # 计算第i个样本的E值
if ((oS.labelMat[i]*Ei < -oS.tol) and (oS.alphas[i] < oS.C)) or ((oS.labelMat[i]*Ei > oS.tol) and (oS.alphas[i] > 0)): #检验这行数据是否符合KKT条件 参考《统计学习方法》p128公式7.111-113
j,Ej = selectJ(i, oS, Ei) # 随机选取aj,并返回其E值
alphaIold = oS.alphas[i].copy()
alphaJold = oS.alphas[j].copy()
if (oS.labelMat[i] != oS.labelMat[j]): #以下代码的公式参考《统计学习方法》p126
L = max(0, oS.alphas[j] - oS.alphas[i])
H = min(oS.C, oS.C + oS.alphas[j] - oS.alphas[i])
else:
L = max(0, oS.alphas[j] + oS.alphas[i] - oS.C)
H = min(oS.C, oS.alphas[j] + oS.alphas[i])
if L==H:
print("L==H")
return 0
eta = 2.0 * oS.K[i,j] - oS.K[i,i] - oS.K[j,j] #参考《统计学习方法》p127公式7.107
if eta >= 0:
print("eta>=0")
return 0
oS.alphas[j] -= oS.labelMat[j]*(Ei - Ej)/eta #参考《统计学习方法》p127公式7.106
oS.alphas[j] = clipAlpha(oS.alphas[j],H,L) #参考《统计学习方法》p127公式7.108
updateEk(oS, j)
if (abs(oS.alphas[j] - alphaJold) < oS.tol): #alpha变化大小阀值(自己设定)
print("j not moving enough")
return 0
oS.alphas[i] += oS.labelMat[j]*oS.labelMat[i]*(alphaJold - oS.alphas[j])#参考《统计学习方法》p127公式7.109
updateEk(oS, i) #更新数据
#以下求解b的过程,参考《统计学习方法》p129公式7.114-7.116
b1 = oS.b - Ei- oS.labelMat[i]*(oS.alphas[i]-alphaIold)*oS.K[i,i] - oS.labelMat[j]*(oS.alphas[j]-alphaJold)*oS.K[i,j]
b2 = oS.b - Ej- oS.labelMat[i]*(oS.alphas[i]-alphaIold)*oS.K[i,j]- oS.labelMat[j]*(oS.alphas[j]-alphaJold)*oS.K[j,j]
if (0 < oS.alphas[i]<oS.C):
oS.b = b1
elif (0 < oS.alphas[j]<oS.C):
oS.b = b2
else:
oS.b = (b1 + b2)/2.0
return 1
else:
return 0
# SMO函数,用于快速求解出alpha
def smoP(dataMatIn, classLabels, C, toler, maxIter,kTup=('lin', 0)):
'''
:param dataMatIn: 数据集
:param classLabels: 数据标记
:param C: 当C趋近于无穷大时:意味着分类严格不能有错误
当C趋近于很小的时:意味着可以有更大的错误容忍
:param toler: 阀值
:param maxIter: 最大迭代次数
:param kTup: 默认线性核
:return:
'''
oS = optStruct(mat(dataMatIn),mat(classLabels).transpose(),C,toler, kTup) # 创建SVM类
iter = 0 # 记录当前迭代次数
entireSet = True # 是否是全集
alphaPairsChanged = 0 # alpha对改变
while (iter < maxIter) and ((alphaPairsChanged > 0) or (entireSet)): # 判断迭代是否继续
alphaPairsChanged = 0 # 初始化alpha对
if entireSet: # 若果是全集
for i in range(oS.m): # 遍历所有数据
alphaPairsChanged += innerL(i, oS)
print("fullSet, iter: %d i:%d, pairs changed %d" % (iter,i,alphaPairsChanged)) #显示第多少次迭代,那行特征数据使alpha发生了改变,这次改变了多少次alpha
iter += 1
else:
nonBoundIs = nonzero((oS.alphas.A > 0) * (oS.alphas.A < C))[0]
for i in nonBoundIs: #遍历非边界的数据
alphaPairsChanged += innerL(i,oS)
print("non-bound, iter: %d i:%d, pairs changed %d" % (iter,i,alphaPairsChanged))
iter += 1
if entireSet:
entireSet = False
elif (alphaPairsChanged == 0):
entireSet = True
print("iteration number: %d" % iter)
return oS.b,oS.alphas
def testRbf(data_train,data_test):
dataArr,labelArr = loadDataSet(data_train) # 读取训练数据
b,alphas = smoP(dataArr, labelArr, 200, 0.0001, 10000, ('rbf', 1.3)) # 通过SMO算法得到b和alpha
datMat=mat(dataArr)
labelMat = mat(labelArr).transpose()
svInd=nonzero(alphas)[0] #选取不为0数据的行数(也就是支持向量)
sVs=datMat[svInd] #支持向量的特征数据
labelSV = labelMat[svInd] #支持向量的类别(1或-1)
print("there are %d Support Vectors" % shape(sVs)[0]) #打印出共有多少的支持向量
m,n = shape(datMat) #训练数据的行列数
errorCount = 0
for i in range(m):
kernelEval = kernelTrans(sVs,datMat[i,:],('rbf', 1.3)) #将支持向量转化为核函数
predict=kernelEval.T * multiply(labelSV,alphas[svInd]) + b #这一行的预测结果(代码来源于《统计学习方法》p133里面最后用于预测的公式)注意最后确定的分离平面只有那些支持向量决定。
if sign(predict)!=sign(labelArr[i]): #sign函数 -1 if x < 0, 0 if x==0, 1 if x > 0
errorCount += 1
print("the training error rate is: %f" % (float(errorCount)/m)) #打印出错误率
dataArr_test,labelArr_test = loadDataSet(data_test) #读取测试数据
errorCount_test = 0
datMat_test=mat(dataArr_test)
labelMat = mat(labelArr_test).transpose()
m,n = shape(datMat_test)
for i in range(m): #在测试数据上检验错误率
kernelEval = kernelTrans(sVs,datMat_test[i,:],('rbf', 1.3))
predict=kernelEval.T * multiply(labelSV,alphas[svInd]) + b
if sign(predict)!=sign(labelArr_test[i]):
errorCount_test += 1
print("the test error rate is: %f" % (float(errorCount_test)/m))
#主程序
def main():
filename_traindata='traindata.txt' # 训练数据集
filename_testdata='testdata.txt' # 测试数据集
testRbf(filename_traindata,filename_testdata)
if __name__=='__main__':
main()