-
Notifications
You must be signed in to change notification settings - Fork 2.9k
/
maxEntropy.py
299 lines (264 loc) · 12.4 KB
/
maxEntropy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
# coding=utf-8
# Author:Dodo
# Date:2018-11-30
# Email:lvtengchao@pku.edu.cn
# Blog:www.pkudodo.com
'''
数据集:Mnist
训练集数量:60000(实际使用:20000)
测试集数量:10000
------------------------------
运行结果:
正确率:96.9%
运行时长:8.8h
备注:对于mnist而言,李航的统计学习方法中有一些关键细节没有阐述,
建议先阅读我的个人博客,其中有详细阐述。阅读结束后再看该程序。
Blog:www.pkudodo.com
'''
import time
import numpy as np
from collections import defaultdict
def loadData(fileName):
'''
加载Mnist数据集
:param fileName:要加载的数据集路径
:return: list形式的数据集及标记
'''
# 存放数据及标记的list
dataList = []; labelList = []
# 打开文件
fr = open(fileName, 'r')
# 将文件按行读取
for line in fr.readlines():
# 对每一行数据按切割福','进行切割,返回字段列表
curLine = line.strip().split(',')
#二分类,list中放置标签
if int(curLine[0]) == 0:
labelList.append(1)
else:
labelList.append(0)
#二值化
dataList.append([int(int(num) > 128) for num in curLine[1:]])
#返回data和label
return dataList, labelList
class maxEnt:
'''
最大熵类
'''
def __init__(self, trainDataList, trainLabelList, testDataList, testLabelList):
'''
各参数初始化
'''
self.trainDataList = trainDataList #训练数据集
self.trainLabelList = trainLabelList #训练标签集
self.testDataList = testDataList #测试数据集
self.testLabelList = testLabelList #测试标签集
self.featureNum = len(trainDataList[0]) #特征数量
self.N = len(trainDataList) #总训练集长度
self.n = 0 #训练集中(xi,y)对数量
self.M = 10000 #
self.fixy = self.calc_fixy() #所有(x, y)对出现的次数
self.w = [0] * self.n #Pw(y|x)中的w
self.xy2idDict, self.id2xyDict = self.createSearchDict() #(x, y)->id和id->(x, y)的搜索字典
self.Ep_xy = self.calcEp_xy() #Ep_xy期望值
def calcEpxy(self):
'''
计算特征函数f(x, y)关于模型P(Y|X)与经验分布P_(X, Y)的期望值(P后带下划线“_”表示P上方的横线
程序中部分下划线表示“|”,部分表示上方横线,请根据具体公式自行判断,)
即“6.2.2 最大熵模型的定义”中第二个期望(83页最上方的期望)
:return:
'''
#初始化期望存放列表,对于每一个xy对都有一个期望
#这里的x是单个的特征,不是一个样本的全部特征。例如x={x1,x2,x3.....,xk},实际上是(x1,y),(x2,y),。。。
#但是在存放过程中需要将不同特诊的分开存放,李航的书可能是为了公式的泛化性高一点,所以没有对这部分提及
#具体可以看我的博客,里面有详细介绍 www.pkudodo.com
Epxy = [0] * self.n
#对于每一个样本进行遍历
for i in range(self.N):
#初始化公式中的P(y|x)列表
Pwxy = [0] * 2
#计算P(y = 0 } X)
#注:程序中X表示是一个样本的全部特征,x表示单个特征,这里是全部特征的一个样本
Pwxy[0] = self.calcPwy_x(self.trainDataList[i], 0)
#计算P(y = 1 } X)
Pwxy[1] = self.calcPwy_x(self.trainDataList[i], 1)
for feature in range(self.featureNum):
for y in range(2):
if (self.trainDataList[i][feature], y) in self.fixy[feature]:
id = self.xy2idDict[feature][(self.trainDataList[i][feature], y)]
Epxy[id] += (1 / self.N) * Pwxy[y]
return Epxy
def calcEp_xy(self):
'''
计算特征函数f(x, y)关于经验分布P_(x, y)的期望值(下划线表示P上方的横线,
同理Ep_xy中的“_”也表示p上方的横线)
即“6.2.2 最大熵的定义”中第一个期望(82页最下方那个式子)
:return: 计算得到的Ep_xy
'''
#初始化Ep_xy列表,长度为n
Ep_xy = [0] * self.n
#遍历每一个特征
for feature in range(self.featureNum):
#遍历每个特征中的(x, y)对
for (x, y) in self.fixy[feature]:
#获得其id
id = self.xy2idDict[feature][(x, y)]
#将计算得到的Ep_xy写入对应的位置中
#fixy中存放所有对在训练集中出现过的次数,处于训练集总长度N就是概率了
Ep_xy[id] = self.fixy[feature][(x, y)] / self.N
#返回期望
return Ep_xy
def createSearchDict(self):
'''
创建查询字典
xy2idDict:通过(x,y)对找到其id,所有出现过的xy对都有一个id
id2xyDict:通过id找到对应的(x,y)对
'''
#设置xy搜多id字典
#这里的x指的是单个的特征,而不是某个样本,因此将特征存入字典时也需要存入这是第几个特征
#这一信息,这是为了后续的方便,否则会乱套。
#比如说一个样本X = (0, 1, 1) label =(1)
#生成的标签对有(0, 1), (1, 1), (1, 1),三个(x,y)对并不能判断属于哪个特征的,后续就没法往下写
#不可能通过(1, 1)就能找到对应的id,因为对于(1, 1),字典中有多重映射
#所以在生成字典的时总共生成了特征数个字典,例如在mnist中样本有784维特征,所以生成784个字典,属于
#不同特征的xy存入不同特征内的字典中,使其不会混淆
xy2idDict = [{} for i in range(self.featureNum)]
#初始化id到xy对的字典。因为id与(x,y)的指向是唯一的,所以可以使用一个字典
id2xyDict = {}
#设置缩影,其实就是最后的id
index = 0
#对特征进行遍历
for feature in range(self.featureNum):
#对出现过的每一个(x, y)对进行遍历
#fixy:内部存放特征数目个字典,对于遍历的每一个特征,单独读取对应字典内的(x, y)对
for (x, y) in self.fixy[feature]:
#将该(x, y)对存入字典中,要注意存入时通过[feature]指定了存入哪个特征内部的字典
#同时将index作为该对的id号
xy2idDict[feature][(x, y)] = index
#同时在id->xy字典中写入id号,val为(x, y)对
id2xyDict[index] = (x, y)
#id加一
index += 1
#返回创建的两个字典
return xy2idDict, id2xyDict
def calc_fixy(self):
'''
计算(x, y)在训练集中出现过的次数
:return:
'''
#建立特征数目个字典,属于不同特征的(x, y)对存入不同的字典中,保证不被混淆
fixyDict = [defaultdict(int) for i in range(self.featureNum)]
#遍历训练集中所有样本
for i in range(len(self.trainDataList)):
#遍历样本中所有特征
for j in range(self.featureNum):
#将出现过的(x, y)对放入字典中并计数值加1
fixyDict[j][(self.trainDataList[i][j], self.trainLabelList[i])] += 1
#对整个大字典进行计数,判断去重后还有多少(x, y)对,写入n
for i in fixyDict:
self.n += len(i)
#返回大字典
return fixyDict
def calcPwy_x(self, X, y):
'''
计算“6.23 最大熵模型的学习” 式6.22
:param X: 要计算的样本X(一个包含全部特征的样本)
:param y: 该样本的标签
:return: 计算得到的Pw(Y|X)
'''
#分子
numerator = 0
#分母
Z = 0
#对每个特征进行遍历
for i in range(self.featureNum):
#如果该(xi,y)对在训练集中出现过
if (X[i], y) in self.xy2idDict[i]:
#在xy->id字典中指定当前特征i,以及(x, y)对:(X[i], y),读取其id
index = self.xy2idDict[i][(X[i], y)]
#分子是wi和fi(x,y)的连乘再求和,最后指数
#由于当(x, y)存在时fi(x,y)为1,因为xy对肯定存在,所以直接就是1
#对于分子来说,就是n个wi累加,最后再指数就可以了
#因为有n个w,所以通过id将w与xy绑定,前文的两个搜索字典中的id就是用在这里
numerator += self.w[index]
#同时计算其他一种标签y时候的分子,下面的z并不是全部的分母,再加上上式的分子以后
#才是完整的分母,即z = z + numerator
if (X[i], 1-y) in self.xy2idDict[i]:
#原理与上式相同
index = self.xy2idDict[i][(X[i], 1-y)]
Z += self.w[index]
#计算分子的指数
numerator = np.exp(numerator)
#计算分母的z
Z = np.exp(Z) + numerator
#返回Pw(y|x)
return numerator / Z
def maxEntropyTrain(self, iter = 500):
#设置迭代次数寻找最优解
for i in range(iter):
#单次迭代起始时间点
iterStart = time.time()
#计算“6.2.3 最大熵模型的学习”中的第二个期望(83页最上方哪个)
Epxy = self.calcEpxy()
#使用的是IIS,所以设置sigma列表
sigmaList = [0] * self.n
#对于所有的n进行一次遍历
for j in range(self.n):
#依据“6.3.1 改进的迭代尺度法” 式6.34计算
sigmaList[j] = (1 / self.M) * np.log(self.Ep_xy[j] / Epxy[j])
#按照算法6.1步骤二中的(b)更新w
self.w = [self.w[i] + sigmaList[i] for i in range(self.n)]
#单次迭代结束
iterEnd = time.time()
#打印运行时长信息
print('iter:%d:%d, time:%d'%(i, iter, iterStart - iterEnd))
def predict(self, X):
'''
预测标签
:param X:要预测的样本
:return: 预测值
'''
#因为y只有0和1,所有建立两个长度的概率列表
result = [0] * 2
#循环计算两个概率
for i in range(2):
#计算样本x的标签为i的概率
result[i] = self.calcPwy_x(X, i)
#返回标签
#max(result):找到result中最大的那个概率值
#result.index(max(result)):通过最大的那个概率值再找到其索引,索引是0就返回0,1就返回1
return result.index(max(result))
def test(self):
'''
对测试集进行测试
:return:
'''
#错误值计数
errorCnt = 0
#对测试集中所有样本进行遍历
for i in range(len(self.testDataList)):
#预测该样本对应的标签
result = self.predict(self.testDataList[i])
#如果错误,计数值加1
if result != self.testLabelList[i]: errorCnt += 1
#返回准确率
return 1 - errorCnt / len(self.testDataList)
if __name__ == '__main__':
start = time.time()
# 获取训练集及标签
print('start read transSet')
trainData, trainLabel = loadData('../Mnist/mnist_train.csv')
# 获取测试集及标签
print('start read testSet')
testData, testLabel = loadData('../Mnist/mnist_test.csv')
#初始化最大熵类
maxEnt = maxEnt(trainData[:20000], trainLabel[:20000], testData, testLabel)
#开始训练
print('start to train')
maxEnt.maxEntropyTrain()
#开始测试
print('start to test')
accuracy = maxEnt.test()
print('the accuracy is:', accuracy)
# 打印时间
print('time span:', time.time() - start)