1+ import numpy as np
2+ from scipy .stats import multivariate_normal
3+ from utils .common_function import get_covariance_matrix , get_vector_norm
4+ from utils .generate_data import generate_cluster_data
5+ import matplotlib .pyplot as plt
6+
7+ class gaussian_mixture_model :
8+ def __init__ (self , k , tolerance ):
9+ self .k = k
10+ self .tolerance = tolerance
11+ self .responsibility_matrix_history = []
12+
13+ def initiate_random_gaussian (self , X ):
14+ N , D = X .shape
15+ self .pi = (1 / self .k ) * np .ones (self .k ) #This is also the prior
16+ self .mean = np .zeros ((self .k , D ))
17+ self .covariance = np .zeros ((self .k , D , D ))
18+ for k in range (self .k ):
19+ self .mean [k ] = X [np .random .choice (N ),:]
20+ self .covariance [k ] = get_covariance_matrix (X )
21+
22+ def calculate_likelihood (self , X ):
23+ N , D = X .shape
24+ multivariate_gaussian_likelihood = np .zeros ((N ,self .k ))
25+ for k in range (self .k ):
26+ mean = self .mean [k ]
27+ cov = self .covariance [k ]
28+ for n in range (N ):
29+ multivariate_gaussian_likelihood [n , k ] = multivariate_normal .pdf (X [n ], mean , cov )
30+ return multivariate_gaussian_likelihood #return array of size (N, k)
31+
32+ def expectation_step (self , X ):
33+ likelihood = self .calculate_likelihood (X )
34+ self .responsibility_matrix = (self .pi * likelihood ) / (np .sum (likelihood , axis = 1 , keepdims = True )) #size (N, k)
35+ self .assignment = np .argmax (self .responsibility_matrix , axis = 1 ) #size (N, )
36+ self .responsibility_matrix_history .append (self .responsibility_matrix .max (axis = 1 ))
37+
38+ def maximization_step (self , X ):
39+ N = X .shape [0 ]
40+ for k in range (self .k ):
41+ res = np .expand_dims (self .responsibility_matrix [:,k ], axis = 1 )
42+ self .mean [k ] = ((res * X ).sum (axis = 0 )) / (res .sum ())
43+ self .covariance [k ] = (X - self .mean [k ]).T .dot ((X - self .mean [k ]) * res ) / res .sum ()
44+
45+ self .priors = self .responsibility_matrix .sum (axis = 0 ) / N
46+
47+ def check_converge (self ):
48+ if len (self .responsibility_matrix_history ) < 2 :
49+ return False
50+ else :
51+ diff = get_vector_norm (self .responsibility_matrix_history [- 1 ] - self .responsibility_matrix_history [- 2 ], 2 )
52+ return diff <= self .tolerance
53+
54+ if __name__ == '__main__' :
55+ N , D , n_cluster = 600 , 3 , 3
56+ X , label = generate_cluster_data (N , D , n_cluster )
57+ k = 3
58+ tolerance = 1e-6
59+ model = gaussian_mixture_model (k , tolerance )
60+ model .initiate_random_gaussian (X [:,1 :])
61+ while not model .check_converge ():
62+ model .expectation_step (X [:,1 :])
63+ model .maximization_step (X [:,1 :])
64+ x_axis = np .arange (N )
65+ plt .scatter (x_axis , X .mean (axis = 1 ), c = model .assignment )
66+ plt .show ()
0 commit comments