-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathutils.py
362 lines (262 loc) · 10.2 KB
/
utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
import numpy as np
import h5py as h5
def LoadDataset(file_name, test=False):
with h5.File(file_name, "r") as f:
dataMatrix = f["dataMatrix"]
trainDataset = f["trainDataset"]
trainKnn = f["trainKnn"]
testDataset = f["testDataset"]
testKnn = f["testKnn"]
dataMatrix = np.asarray(dataMatrix)
trainDataset = np.asarray(trainDataset)
trainKnn = np.asarray(trainKnn)
testDataset = np.asarray(testDataset)
testKnn = np.asarray(testKnn)
if test:
result = (dataMatrix, testDataset[:100], testKnn[:100])
else:
result = (dataMatrix, trainDataset, trainKnn)
return result
def Compute_Euclidean(v1, v2):
"""计算两点之间的欧式距离
:param v1: 欧式空间中点1
:param v2: 欧式空间中点2
:return: 点1和点2之间的距离
"""
sum_distance = 0
v1 = np.asarray(v1)
v2 = np.asarray(v2)
sum_distance += np.power(v1 - v2, 2).sum()
distance = np.power(sum_distance, 0.5)
return distance
def Compute_P(Tpointset_index, pointset_index):
""" 计算桶的准确率
:param Tpointset_index: 和某点相似的点的索引号
:param pointset_index: 桶中分得点的索引号
:return: 准确率
"""
Tpointset_index = set(Tpointset_index)
pointset_index = set(pointset_index)
TP = len(Tpointset_index & pointset_index)
TP_FP = len(pointset_index)
if TP_FP == 0:
precise = 0
print("没有预测结果")
else:
precise = TP / TP_FP
return precise
def Compute_R(Tpointset_index, pointset_index):
""" 计算桶的准确率
:param Tpointset_index: 和某点相似的点的索引号
:param pointset_index: 桶中分得点的索引号
:return: 召回率
"""
Tpointset_index = set(Tpointset_index)
pointset_index = set(pointset_index)
TP = len(Tpointset_index & pointset_index)
TP_FN = len(Tpointset_index)
if TP_FN == 0:
recall = 0
print("没有正确结果")
else:
recall = TP / TP_FN
return recall
def calc_ratio(query, t_index, p_index, data, K=10):
_, t_dis = Knn(K, query, t_index, data)
_, p_dis = Knn(K, query, p_index, data)
t_dis.sort()
p_dis.sort()
ratio = np.mean(np.divide(p_dis[1:], t_dis[1:]))
return ratio
def calc_recall(t_index, p_index):
""" 计算召回率
:param t_index: 正确的结果
:param p_index: 预测的结果
:return: 召回率
"""
t_index, p_index = list(t_index), list(p_index)
t_index = set(t_index)
p_index = set(p_index)
tp = t_index & p_index
tp_fn = t_index
if tp_fn == 0:
recall = 0
print("没有正确结果")
else:
recall = len(tp) / len(tp_fn)
return recall
def CentralPointOfCluster(pointsIndex, matrix):
""" 计算聚类结果的中心点
:param pointsIndex: 类中点的索引
:param matrix: 类中点的各维度的数据
:return: 聚类数量,聚类结果的中心点
"""
matrix = np.asarray(matrix)
cpoint = [0. for i in range(matrix.shape[1])]
cpoint = np.asarray(cpoint)
for pointIndex in pointsIndex:
cpoint += np.asarray(matrix[pointIndex])
pointNum = len(pointsIndex)
# 这里有有个问题,当pointNum=0时,这里会有错误,但是之前都没有对pointsIndex的判空操作,这里留着吧
if pointNum == 0:
print("无中心点")
return pointNum, cpoint
else:
cpoint = cpoint / pointNum
cpoint = list(cpoint)
return pointNum, cpoint
def Kmeans(cpoints, pointsIndex, matrix, iteration=15):
"""
:param cpoints: 初始化的中心点
:param pointsIndex: 需要聚类的点的Id的集合
:param matrix: 点的真实位置
:param iteration: 最大迭代次数
:return: [中心点,中心点的聚类结果]
"""
flag = 0 # 记录迭代的次数
while True:
# 保留原始的中心点
originalCpoints = cpoints.copy()
clusterPoints = [[] for i in range(len(cpoints))] # 每一类包含的点的索引号
for pointIndex in pointsIndex:
minDistance = float("inf")
selectCpoint = 0
for cpointIndex in range(len(cpoints)):
v1 = cpoints[cpointIndex]
v2 = matrix[pointIndex]
distance = Compute_Euclidean(v1, v2)
if minDistance > distance:
minDistance = distance
selectCpoint = cpointIndex
clusterPoints[selectCpoint].append(pointIndex)
cpoints = []
result = [[] for i in range(len(clusterPoints))] # 最后的输出结果,0是中心点,1是中心点的聚类结果
for i in range(len(clusterPoints)):
cluster_pointsIndex = clusterPoints[i]
if len(cluster_pointsIndex) == 0:
continue
_, cpoint = CentralPointOfCluster(cluster_pointsIndex, matrix)
cpoints.append(cpoint)
result[i].append(cpoint)
result[i].append(cluster_pointsIndex)
cpoints = np.asarray(cpoints)
originalCpoints = np.asarray(originalCpoints)
up_bound = cpoints < originalCpoints * 1.05
low_bound = originalCpoints * 0.95 < cpoints
if False not in up_bound:
if False not in low_bound:
print("聚类完成,共聚类{}次".format(flag))
break
flag += 1
if flag > iteration:
print("已经聚类{}次,时间过长,已经终止".format(iteration))
break
return result
def Knn(K, query, pointset_index, matrix):
""" k近邻搜索方法
:param k: 近邻数量
:param query: 查询点
:param pointset_index: 点集合的索引号
:param matrix: 各个点的坐标
:return: (query的K个最近邻在matrix中的索引号,query到K个最近邻的距离)
"""
pointset_index = np.array(list(map(int, pointset_index)))
candid_matrix = [matrix[index] for index in pointset_index]
candid_size = np.array(candid_matrix).shape[0]
different_matrix = np.tile(query, (candid_size, 1)) - candid_matrix
dis_matrix = np.linalg.norm(different_matrix, axis=1)
if K >= len(dis_matrix):
K_list = np.argsort(dis_matrix)
K_distance = dis_matrix[K_list]
K_list = list(pointset_index[K_list])
else:
K_list = np.argsort(dis_matrix)[:K]
K_distance = dis_matrix[K_list]
K_list = list(pointset_index[K_list])
return K_list, K_distance
def ShowResult(tree, dataMatrix, testDataset, testKnn, file_name, k=10, c=1, hava_element=True):
file = open(file_name, "w")
meanPrecise = 0.
meanRecall = 0.
AVG = 0.
for index in range(testDataset.shape[0]):
flag_c = 2
sumPDRD = 0.
testPoint = testDataset[index]
if hava_element:
candid, leafNodes = tree.searchElement_C(testPoint, c)
else:
candid, leafNodes = tree.search_C(testPoint, c)
while len(candid) == 0:
candid, leafNodes = tree.search_C(testPoint, flag_c)
flag_c += 1
if len(candid) < k:
predict, predictDistance = Knn(len(candid), testPoint, candid, dataMatrix)
else:
predict, predictDistance = Knn(k, testPoint, candid, dataMatrix)
real, realDistance = testKnn[index]
# print(predict)
real = real[:len(predict)]
# print(real)
realDistance = realDistance[:len(predict)]
precise = Compute_P(real, predict)
recall = Compute_R(real, predict)
leafNodesList = []
for lnode in leafNodes:
leafNodesList.append(lnode.Id)
file.write("点的准确率是:{},召回率是:{}\n".format(precise, recall))
print("点的准确率是:{},召回率是:{}".format(len(candid), recall))
meanPrecise += precise
meanRecall += recall
for i in range(len(predict)):
if predictDistance[i] == 0 or realDistance[i] == 0:
sumPDRD += 1
continue
# print("预测的距离{}".format(predictDistance[i]))
# print("真实的距离{}".format(realDistance[i]))
# print(predictDistance[i] / realDistance[i])
sumPDRD += predictDistance[i] / realDistance[i]
sumPDRD = sumPDRD / len(predict)
# print(sumPDRD)
# print("*"*50)
AVG += sumPDRD
AVG = AVG / testDataset.shape[0]
file.write("点的平均准确率是:{}\n".format(meanPrecise / testDataset.shape[0]))
file.write("点的平均召回率是:{}\n".format(meanRecall / testDataset.shape[0]))
file.write("AVG是:{}\n".format(AVG))
print("点的平均准确率是:{}\n".format(meanPrecise / testDataset.shape[0]))
print("点的平均召回率是:{}\n".format(meanRecall / testDataset.shape[0]))
print("AVG是:{}\n".format(AVG))
file.close()
return meanRecall / testDataset.shape[0], AVG
if __name__ == "__main__":
# LoadDataset("Zipf/audio/datasetKnn.hdf5")
# # 测试Compute_Euclidean
# v1 = [1, 2, 3]
# v2 = [2, 3, 4]
# a = Compute_Euclidean(v1, v2)
# print(a)
# # 测试Kmeans
# matrix = [[1, 2, 3],
# [4, 5, 6],
# [7, 8, 9],
# [5, 5, 5]]
#
# pointsIndex = [0, 1, 2, 3]
#
# cpoints = [[1, 1, 1],
# [9, 9, 9]]
# result = Kmeans(cpoints, pointsIndex, matrix)
# for node in result:
# print(node)
# # 测试Knn
# matrix = [[1, 0, 0, 0, 0],
# [1, 0, 0, 0, 0],
# [1, 2, 3, 3, 3],
# [1, 2, 2, 2, 2],
# [1, 2, 2, 3, 3]]
# pointset_index = [0, 1, 2, 3, 4]
# point = [1, 1, 1, 1, 1]
# K_list, _ = Knn(4, point, pointset_index, matrix)
# print(K_list)
pass