-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathEM算法.py
159 lines (101 loc) · 3.91 KB
/
EM算法.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
#!/usr/bin/env python
# coding: utf-8
# In[ ]:
'''
EM算法
-------------------------------------
数据集:两个生成的正态分布
训练集数量:10000
---------------------------------
训练结果:
时间:0.04s
真实参数:mu1 1.20 sigma1 0.70 alpha1 0.30 mu2 2.30 sigma2 1.20 alpha2 0.70
模型参数:mu1 1.18 sigma1 0.65 alpha1 0.26 mu2 2.24 sigma2 1.21 alpha2 0.74
'''
import numpy as np
import time
# In[ ]:
'''生成数据集'''
def CreateData(mu1, sigma1, alpha1, mu2, sigma2, alpha2, length):
# 生成第一个分模型的数据
First = np.random.normal(loc = mu1, scale = sigma1, size = int(length * alpha1))
# 生成第二个分模型的数据
Second = np.random.normal(loc = mu2, scale = sigma2, size = int(length * alpha2))
# 混合两个数据
ObservedData = np.concatenate((First, Second), axis = 0)
# 打乱顺序(打不打乱其实对算法没有影响)
np.random.shuffle(ObservedData)
return ObservedData
# In[ ]:
'''根据高斯密度函数计算'''
def Cal_Gaussian(Data, mu, sigma):
GaussianP = 1 / (sigma * np.sqrt(2 * np.pi)) * np.exp(-1 * np.square(Data - mu) / (2 * np.square(sigma)))
return GaussianP
# In[ ]:
'''E步'''
def E_step(Data, mu1, sigma1, alpha1, mu2, sigma2, alpha2):
# 计算分模型对每个样本的响应度
# 这里Data使用的是ndarray格式
# 因此得到的数列结果中,包含该分模型对每个样本的计算响应度公式的分子项
Gamma1 = alpha1 * Cal_Gaussian(Data, mu1, sigma1)
Gamma2 = alpha2 * Cal_Gaussian(Data, mu2, sigma2)
# 计算响应度公式的分母项
Summary = Gamma1 + Gamma2
# 计算响应度
Gamma1 = Gamma1 / Summary
Gamma2 = Gamma2 / Summary
return Gamma1, Gamma2
# In[ ]:
'''M步'''
def M_step(Data, mu1_old, mu2_old, Gamma1, Gamma2):
# 计算新的参数mu
mu1_new = np.dot(Gamma1, Data) / np.sum(Gamma1)
mu2_new = np.dot(Gamma2, Data) / np.sum(Gamma2)
# 计算新的参数sigma
sigma1_new = np.sqrt(np.dot(Gamma1, np.square(Data - mu1_old)) / np.sum(Gamma1))
sigma2_new = np.sqrt(np.dot(Gamma2, np.square(Data - mu2_old)) / np.sum(Gamma2))
# 计算新的参数alpha
m = len(Data)
alpha1_new = np.sum(Gamma1) / m
alpha2_new = np.sum(Gamma2) / m
return mu1_new, sigma1_new, alpha1_new, mu2_new, sigma2_new, alpha2_new
# In[ ]:
'''EM算法'''
def EM(Data, mu1, sigma1, alpha1, mu2, sigma2, alpha2, itertime):
# 迭代
for i in range(itertime):
# 计算响应度,E步
Gamma1, Gamma2 = E_step(Data, mu1, sigma1, alpha1, mu2, sigma2, alpha2)
# 更新参数,M步
mu1, sigma1, alpha1, mu2, sigma2, alpha2 = M_step(Data, mu1, mu2, Gamma1, Gamma2)
return mu1, sigma1, alpha1, mu2, sigma2, alpha2
# In[ ]:
'''测试模型'''
if __name__ == '__main__':
# 生成数据
mu1 = 1.2
sigma1 = 0.7
alpha1 = 0.3
mu2 = 2.3
sigma2 = 1.2
alpha2 = 0.7
length = 10000
Data = CreateData(mu1, sigma1, alpha1, mu2, sigma2, alpha2, length)
# 初始化数据
init_mu1 = 0.9
init_sigma1 = 0.4
init_alpha1 = 0.2
init_mu2 = 2
init_sigma2 = 1.4
init_alpha2 = 0.8
itertime = 100
# 训练模型
start = time.time()
print('start training')
model_mu1, model_sigma1, model_alpha1, model_mu2, model_sigma2, model_alpha2 = EM(Data, init_mu1, init_sigma1, init_alpha1, init_mu2, init_sigma2, init_alpha2, itertime)
print('end training')
end = time.time()
print('training time: ', end - start)
print('真实参数:mu1 ', mu1, 'sigma1 ', sigma1, 'alpha1 ', alpha1, 'mu2 ', mu2, 'sigma2 ', sigma2, 'alpha2 ', alpha2)
print('模型参数:mu1 ', model_mu1, 'sigma1 ', model_sigma1, 'alpha1 ', model_alpha1, 'mu2 ', model_mu2, 'sigma2 ', model_sigma2, 'alpha2 ', model_alpha2)
# In[ ]: