-
Notifications
You must be signed in to change notification settings - Fork 114
/
hmm.py
359 lines (307 loc) · 14.7 KB
/
hmm.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
# -*- coding:utf-8 -*-
# 隐马尔科夫链模型
# By tostq <tostq216@163.com>
# 博客: blog.csdn.net/tostq
import numpy as np
from math import pi,sqrt,exp,pow,log
from numpy.linalg import det, inv
from abc import ABCMeta, abstractmethod
from sklearn import cluster
class _BaseHMM():
"""
基本HMM虚类,需要重写关于发射概率的相关虚函数
n_state : 隐藏状态的数目
n_iter : 迭代次数
x_size : 观测值维度
start_prob : 初始概率
transmat_prob : 状态转换概率
"""
__metaclass__ = ABCMeta # 虚类声明
def __init__(self, n_state=1, x_size=1, iter=20):
self.n_state = n_state
self.x_size = x_size
self.start_prob = np.ones(n_state) * (1.0 / n_state) # 初始状态概率
self.transmat_prob = np.ones((n_state, n_state)) * (1.0 / n_state) # 状态转换概率矩阵
self.trained = False # 是否需要重新训练
self.n_iter = iter # EM训练的迭代次数
# 初始化发射参数
@abstractmethod
def _init(self,X):
pass
# 虚函数:返回发射概率
@abstractmethod
def emit_prob(self, x): # 求x在状态k下的发射概率 P(X|Z)
return np.array([0])
# 虚函数
@abstractmethod
def generate_x(self, z): # 根据隐状态生成观测值x p(x|z)
return np.array([0])
# 虚函数:发射概率的更新
@abstractmethod
def emit_prob_updated(self, X, post_state):
pass
# 通过HMM生成序列
def generate_seq(self, seq_length):
X = np.zeros((seq_length, self.x_size))
Z = np.zeros(seq_length)
Z_pre = np.random.choice(self.n_state, 1, p=self.start_prob) # 采样初始状态
X[0] = self.generate_x(Z_pre) # 采样得到序列第一个值
Z[0] = Z_pre
for i in range(seq_length):
if i == 0: continue
# P(Zn+1)=P(Zn+1|Zn)P(Zn)
Z_next = np.random.choice(self.n_state, 1, p=self.transmat_prob[Z_pre,:][0])
Z_pre = Z_next
# P(Xn+1|Zn+1)
X[i] = self.generate_x(Z_pre)
Z[i] = Z_pre
return X,Z
# 估计序列X出现的概率
def X_prob(self, X, Z_seq=np.array([])):
# 状态序列预处理
# 判断是否已知隐藏状态
X_length = len(X)
if Z_seq.any():
Z = np.zeros((X_length, self.n_state))
for i in range(X_length):
Z[i][int(Z_seq[i])] = 1
else:
Z = np.ones((X_length, self.n_state))
# 向前向后传递因子
_, c = self.forward(X, Z) # P(x,z)
# 序列的出现概率估计
prob_X = np.sum(np.log(c)) # P(X)
return prob_X
# 已知当前序列预测未来(下一个)观测值的概率
def predict(self, X, x_next, Z_seq=np.array([]), istrain=True):
if self.trained == False or istrain == False: # 需要根据该序列重新训练
self.train(X)
X_length = len(X)
if Z_seq.any():
Z = np.zeros((X_length, self.n_state))
for i in range(X_length):
Z[i][int(Z_seq[i])] = 1
else:
Z = np.ones((X_length, self.n_state))
# 向前向后传递因子
alpha, _ = self.forward(X, Z) # P(x,z)
prob_x_next = self.emit_prob(np.array([x_next]))*np.dot(alpha[X_length - 1],self.transmat_prob)
return prob_x_next
def decode(self, X, istrain=True):
"""
利用维特比算法,已知序列求其隐藏状态值
:param X: 观测值序列
:param istrain: 是否根据该序列进行训练
:return: 隐藏状态序列
"""
if self.trained == False or istrain == False: # 需要根据该序列重新训练
self.train(X)
X_length = len(X) # 序列长度
state = np.zeros(X_length) # 隐藏状态
pre_state = np.zeros((X_length, self.n_state)) # 保存转换到当前隐藏状态的最可能的前一状态
max_pro_state = np.zeros((X_length, self.n_state)) # 保存传递到序列某位置当前状态的最大概率
_,c=self.forward(X,np.ones((X_length, self.n_state)))
max_pro_state[0] = self.emit_prob(X[0]) * self.start_prob * (1/c[0]) # 初始概率
# 前向过程
for i in range(X_length):
if i == 0: continue
for k in range(self.n_state):
prob_state = self.emit_prob(X[i])[k] * self.transmat_prob[:,k] * max_pro_state[i-1]
max_pro_state[i][k] = np.max(prob_state)* (1/c[i])
pre_state[i][k] = np.argmax(prob_state)
# 后向过程
state[X_length - 1] = np.argmax(max_pro_state[X_length - 1,:])
for i in reversed(range(X_length)):
if i == X_length - 1: continue
state[i] = pre_state[i + 1][int(state[i + 1])]
return state
# 针对于多个序列的训练问题
def train_batch(self, X, Z_seq=list()):
# 针对于多个序列的训练问题,其实最简单的方法是将多个序列合并成一个序列,而唯一需要调整的是初始状态概率
# 输入X类型:list(array),数组链表的形式
# 输入Z类型: list(array),数组链表的形式,默认为空列表(即未知隐状态情况)
self.trained = True
X_num = len(X) # 序列个数
self._init(self.expand_list(X)) # 发射概率的初始化
# 状态序列预处理,将单个状态转换为1-to-k的形式
# 判断是否已知隐藏状态
if Z_seq==list():
Z = [] # 初始化状态序列list
for n in range(X_num):
Z.append(list(np.ones((len(X[n]), self.n_state))))
else:
Z = []
for n in range(X_num):
Z.append(np.zeros((len(X[n]),self.n_state)))
for i in range(len(Z[n])):
Z[n][i][int(Z_seq[n][i])] = 1
for e in range(self.n_iter): # EM步骤迭代
# 更新初始概率过程
# E步骤
print "iter: ", e
b_post_state = [] # 批量累积:状态的后验概率,类型list(array)
b_post_adj_state = np.zeros((self.n_state, self.n_state)) # 批量累积:相邻状态的联合后验概率,数组
b_start_prob = np.zeros(self.n_state) # 批量累积初始概率
for n in range(X_num): # 对于每个序列的处理
X_length = len(X[n])
alpha, c = self.forward(X[n], Z[n]) # P(x,z)
beta = self.backward(X[n], Z[n], c) # P(x|z)
post_state = alpha * beta / np.sum(alpha * beta) # 归一化!
b_post_state.append(post_state)
post_adj_state = np.zeros((self.n_state, self.n_state)) # 相邻状态的联合后验概率
for i in range(X_length):
if i == 0: continue
if c[i]==0: continue
post_adj_state += (1 / c[i]) * np.outer(alpha[i - 1],
beta[i] * self.emit_prob(X[n][i])) * self.transmat_prob
if np.sum(post_adj_state)!=0:
post_adj_state = post_adj_state/np.sum(post_adj_state) # 归一化!
b_post_adj_state += post_adj_state # 批量累积:状态的后验概率
b_start_prob += b_post_state[n][0] # 批量累积初始概率
# M步骤,估计参数,最好不要让初始概率都为0出现,这会导致alpha也为0
b_start_prob += 0.001*np.ones(self.n_state)
self.start_prob = b_start_prob / np.sum(b_start_prob)
b_post_adj_state += 0.001
for k in range(self.n_state):
if np.sum(b_post_adj_state[k])==0: continue
self.transmat_prob[k] = b_post_adj_state[k] / np.sum(b_post_adj_state[k])
self.emit_prob_updated(self.expand_list(X), self.expand_list(b_post_state))
def expand_list(self, X):
# 将list(array)类型的数据展开成array类型
C = []
for i in range(len(X)):
C += list(X[i])
return np.array(C)
# 针对于单个长序列的训练
def train(self, X, Z_seq=np.array([])):
# 输入X类型:array,数组的形式
# 输入Z类型: array,一维数组的形式,默认为空列表(即未知隐状态情况)
self.trained = True
X_length = len(X)
self._init(X)
# 状态序列预处理
# 判断是否已知隐藏状态
if Z_seq.any():
Z = np.zeros((X_length, self.n_state))
for i in range(X_length):
Z[i][int(Z_seq[i])] = 1
else:
Z = np.ones((X_length, self.n_state))
for e in range(self.n_iter): # EM步骤迭代
# 中间参数
print e, " iter"
# E步骤
# 向前向后传递因子
alpha, c = self.forward(X, Z) # P(x,z)
beta = self.backward(X, Z, c) # P(x|z)
post_state = alpha * beta
post_adj_state = np.zeros((self.n_state, self.n_state)) # 相邻状态的联合后验概率
for i in range(X_length):
if i == 0: continue
if c[i]==0: continue
post_adj_state += (1 / c[i])*np.outer(alpha[i - 1],beta[i]*self.emit_prob(X[i]))*self.transmat_prob
# M步骤,估计参数
self.start_prob = post_state[0] / np.sum(post_state[0])
for k in range(self.n_state):
self.transmat_prob[k] = post_adj_state[k] / np.sum(post_adj_state[k])
self.emit_prob_updated(X, post_state)
# 求向前传递因子
def forward(self, X, Z):
X_length = len(X)
alpha = np.zeros((X_length, self.n_state)) # P(x,z)
alpha[0] = self.emit_prob(X[0]) * self.start_prob * Z[0] # 初始值
# 归一化因子
c = np.zeros(X_length)
c[0] = np.sum(alpha[0])
alpha[0] = alpha[0] / c[0]
# 递归传递
for i in range(X_length):
if i == 0: continue
alpha[i] = self.emit_prob(X[i]) * np.dot(alpha[i - 1], self.transmat_prob) * Z[i]
c[i] = np.sum(alpha[i])
if c[i]==0: continue
alpha[i] = alpha[i] / c[i]
return alpha, c
# 求向后传递因子
def backward(self, X, Z, c):
X_length = len(X)
beta = np.zeros((X_length, self.n_state)) # P(x|z)
beta[X_length - 1] = np.ones((self.n_state))
# 递归传递
for i in reversed(range(X_length)):
if i == X_length - 1: continue
beta[i] = np.dot(beta[i + 1] * self.emit_prob(X[i + 1]), self.transmat_prob.T) * Z[i]
if c[i+1]==0: continue
beta[i] = beta[i] / c[i + 1]
return beta
# 二元高斯分布函数
def gauss2D(x, mean, cov):
# x, mean, cov均为numpy.array类型
z = -np.dot(np.dot((x-mean).T,inv(cov)),(x-mean))/2.0
temp = pow(sqrt(2.0*pi),len(x))*sqrt(det(cov))
return (1.0/temp)*exp(z)
class GaussianHMM(_BaseHMM):
"""
发射概率为高斯分布的HMM
参数:
emit_means: 高斯发射概率的均值
emit_covars: 高斯发射概率的方差
"""
def __init__(self, n_state=1, x_size=1, iter=20):
_BaseHMM.__init__(self, n_state=n_state, x_size=x_size, iter=iter)
self.emit_means = np.zeros((n_state, x_size)) # 高斯分布的发射概率均值
self.emit_covars = np.zeros((n_state, x_size, x_size)) # 高斯分布的发射概率协方差
for i in range(n_state): self.emit_covars[i] = np.eye(x_size) # 初始化为均值为0,方差为1的高斯分布函数
def _init(self,X):
# 通过K均值聚类,确定状态初始值
mean_kmeans = cluster.KMeans(n_clusters=self.n_state)
mean_kmeans.fit(X)
self.emit_means = mean_kmeans.cluster_centers_
for i in range(self.n_state):
self.emit_covars[i] = np.cov(X.T) + 0.01 * np.eye(len(X[0]))
def emit_prob(self, x): # 求x在状态k下的发射概率
prob = np.zeros((self.n_state))
for i in range(self.n_state):
prob[i]=gauss2D(x,self.emit_means[i],self.emit_covars[i])
return prob
def generate_x(self, z): # 根据状态生成x p(x|z)
return np.random.multivariate_normal(self.emit_means[z][0],self.emit_covars[z][0],1)
def emit_prob_updated(self, X, post_state): # 更新发射概率
for k in range(self.n_state):
for j in range(self.x_size):
self.emit_means[k][j] = np.sum(post_state[:,k] *X[:,j]) / np.sum(post_state[:,k])
X_cov = np.dot((X-self.emit_means[k]).T, (post_state[:,k]*(X-self.emit_means[k]).T).T)
self.emit_covars[k] = X_cov / np.sum(post_state[:,k])
if det(self.emit_covars[k]) == 0: # 对奇异矩阵的处理
self.emit_covars[k] = self.emit_covars[k] + 0.01*np.eye(len(X[0]))
class DiscreteHMM(_BaseHMM):
"""
发射概率为离散分布的HMM
参数:
emit_prob : 离散概率分布
x_num:表示观测值的种类
此时观测值大小x_size默认为1
"""
def __init__(self, n_state=1, x_num=1, iter=20):
_BaseHMM.__init__(self, n_state=n_state, x_size=1, iter=iter)
self.emission_prob = np.ones((n_state, x_num)) * (1.0/x_num) # 初始化发射概率均值
self.x_num = x_num
def _init(self, X):
self.emission_prob = np.random.random(size=(self.n_state,self.x_num))
for k in range(self.n_state):
self.emission_prob[k] = self.emission_prob[k]/np.sum(self.emission_prob[k])
def emit_prob(self, x): # 求x在状态k下的发射概率
prob = np.zeros(self.n_state)
for i in range(self.n_state): prob[i]=self.emission_prob[i][int(x[0])]
return prob
def generate_x(self, z): # 根据状态生成x p(x|z)
return np.random.choice(self.x_num, 1, p=self.emission_prob[z][0])
def emit_prob_updated(self, X, post_state): # 更新发射概率
self.emission_prob = np.zeros((self.n_state, self.x_num))
X_length = len(X)
for n in range(X_length):
self.emission_prob[:,int(X[n])] += post_state[n]
self.emission_prob+= 0.1/self.x_num
for k in range(self.n_state):
if np.sum(post_state[:,k])==0: continue
self.emission_prob[k] = self.emission_prob[k]/np.sum(post_state[:,k])