python实现隐马尔科夫模型HMM
一份完全按照李航<<统计学习方法>>介绍的HMM代码,供大家参考,具体内容如下
#coding=utf8
'''''
Createdon2017-8-5
里面的代码许多地方可以精简,但为了百分百还原公式,就没有精简了。
@author:adzhua
'''
importnumpyasnp
classHMM(object):
def__init__(self,A,B,pi):
'''''
A:状态转移概率矩阵
B:输出观察概率矩阵
pi:初始化状态向量
'''
self.A=np.array(A)
self.B=np.array(B)
self.pi=np.array(pi)
self.N=self.A.shape[0]#总共状态个数
self.M=self.B.shape[1]#总共观察值个数
#输出HMM的参数信息
defprintHMM(self):
print("==================================================")
print("HMMcontent:N=",self.N,",M=",self.M)
foriinrange(self.N):
ifi==0:
print("hmm.A",self.A[i,:],"hmm.B",self.B[i,:])
else:
print("",self.A[i,:],"",self.B[i,:])
print("hmm.pi",self.pi)
print("==================================================")
#前向算法
defforwar(self,T,O,alpha,prob):
'''''
T:观察序列的长度
O:观察序列
alpha:运算中用到的临时数组
prob:返回值所要求的概率
'''
#初始化
foriinrange(self.N):
alpha[0,i]=self.pi[i]*self.B[i,O[0]]
#递归
fortinrange(T-1):
forjinrange(self.N):
sum=0.0
foriinrange(self.N):
sum+=alpha[t,i]*self.A[i,j]
alpha[t+1,j]=sum*self.B[j,O[t+1]]
#终止
sum=0.0
foriinrange(self.N):
sum+=alpha[T-1,i]
prob[0]*=sum
#带修正的前向算法
defforwardWithScale(self,T,O,alpha,scale,prob):
scale[0]=0.0
#初始化
foriinrange(self.N):
alpha[0,i]=self.pi[i]*self.B[i,O[0]]
scale[0]+=alpha[0,i]
foriinrange(self.N):
alpha[0,i]/=scale[0]
#递归
fortinrange(T-1):
scale[t+1]=0.0
forjinrange(self.N):
sum=0.0
foriinrange(self.N):
sum+=alpha[t,i]*self.A[i,j]
alpha[t+1,j]=sum*self.B[j,O[t+1]]
scale[t+1]+=alpha[t+1,j]
forjinrange(self.N):
alpha[t+1,j]/=scale[t+1]
#终止
fortinrange(T):
prob[0]+=np.log(scale[t])
defback(self,T,O,beta,prob):
'''''
T:观察序列的长度len(O)
O:观察序列
beta:计算时用到的临时数组
prob:返回值;所要求的概率
'''
#初始化
foriinrange(self.N):
beta[T-1,i]=1.0
#递归
fortinrange(T-2,-1,-1):#从T-2开始递减;即T-2,T-3,T-4,...,0
foriinrange(self.N):
sum=0.0
forjinrange(self.N):
sum+=self.A[i,j]*self.B[j,O[t+1]]*beta[t+1,j]
beta[t,i]=sum
#终止
sum=0.0
foriinrange(self.N):
sum+=self.pi[i]*self.B[i,O[0]]*beta[0,i]
prob[0]=sum
#带修正的后向算法
defbackwardWithScale(self,T,O,beta,scale):
'''''
T:观察序列的长度len(O)
O:观察序列
beta:计算时用到的临时数组
'''
#初始化
foriinrange(self.N):
beta[T-1,i]=1.0
#递归
fortinrange(T-2,-1,-1):
foriinrange(self.N):
sum=0.0
forjinrange(self.N):
sum+=self.A[i,j]*self.B[j,O[t+1]]*beta[t+1,j]
beta[t,i]=sum/scale[t+1]
#viterbi算法
defviterbi(self,O):
'''''
O:观察序列
'''
T=len(O)
#初始化
delta=np.zeros((T,self.N),np.float)
phi=np.zeros((T,self.N),np.float)
I=np.zeros(T)
foriinrange(self.N):
delta[0,i]=self.pi[i]*self.B[i,O[0]]
phi[0,i]=0.0
#递归
fortinrange(1,T):
foriinrange(self.N):
delta[t,i]=self.B[i,O[t]]*np.array([delta[t-1,j]*self.A[j,i]forjinrange(self.N)]).max()
phi=np.array([delta[t-1,j]*self.A[j,i]forjinrange(self.N)]).argmax()
#终止
prob=delta[T-1,:].max()
I[T-1]=delta[T-1,:].argmax()
fortinrange(T-2,-1,-1):
I[t]=phi[I[t+1]]
returnprob,I
#计算gamma(计算A所需的分母;详情见李航的统计学习):时刻t时马尔可夫链处于状态Si的概率
defcomputeGamma(self,T,alpha,beta,gamma):
''''''''
fortinrange(T):
foriinrange(self.N):
sum=0.0
forjinrange(self.N):
sum+=alpha[t,j]*beta[t,j]
gamma[t,i]=(alpha[t,i]*beta[t,i])/sum
#计算sai(i,j)(计算A所需的分子)为给定训练序列O和模型lambda时
defcomputeXi(self,T,O,alpha,beta,Xi):
fortinrange(T-1):
sum=0.0
foriinrange(self.N):
forjinrange(self.N):
Xi[t,i,j]=alpha[t,i]*self.A[i,j]*self.B[j,O[t+1]]*beta[t+1,j]
sum+=Xi[t,i,j]
foriinrange(self.N):
forjinrange(self.N):
Xi[t,i,j]/=sum
#输入L个观察序列O,初始模型:HMM={A,B,pi,N,M}
defBaumWelch(self,L,T,O,alpha,beta,gamma):
DELTA=0.01;round=0;flag=1;probf=[0.0]
delta=0.0;probprev=0.0;ratio=0.0;deltaprev=10e-70
xi=np.zeros((T,self.N,self.N))#计算A的分子
pi=np.zeros((T),np.float)#状态初始化概率
denominatorA=np.zeros((self.N),np.float)#辅助计算A的分母的变量
denominatorB=np.zeros((self.N),np.float)
numeratorA=np.zeros((self.N,self.N),np.float)#辅助计算A的分子的变量
numeratorB=np.zeros((self.N,self.M),np.float)#针对输出观察概率矩阵
scale=np.zeros((T),np.float)
whileTrue:
probf[0]=0
#E_step
forlinrange(L):
self.forwardWithScale(T,O[l],alpha,scale,probf)
self.backwardWithScale(T,O[l],beta,scale)
self.computeGamma(T,alpha,beta,gamma)#(t,i)
self.computeXi(T,O[l],alpha,beta,xi)#(t,i,j)
foriinrange(self.N):
pi[i]+=gamma[0,i]
fortinrange(T-1):
denominatorA[i]+=gamma[t,i]
denominatorB[i]+=gamma[t,i]
denominatorB[i]+=gamma[T-1,i]
forjinrange(self.N):
fortinrange(T-1):
numeratorA[i,j]+=xi[t,i,j]
forkinrange(self.M):#M为观察状态取值个数
fortinrange(T):
ifO[l][t]==k:
numeratorB[i,k]+=gamma[t,i]
#M_step。计算pi,A,B
foriinrange(self.N):#这个for循环也可以放到forlinrange(L)里面
self.pi[i]=0.001/self.N+0.999*pi[i]/L
forjinrange(self.N):
self.A[i,j]=0.001/self.N+0.999*numeratorA[i,j]/denominatorA[i]
numeratorA[i,j]=0.0
forkinrange(self.M):
self.B[i,k]=0.001/self.N+0.999*numeratorB[i,k]/denominatorB[i]
numeratorB[i,k]=0.0
#重置
pi[i]=denominatorA[i]=denominatorB[i]=0.0
ifflag==1:
flag=0
probprev=probf[0]
ratio=1
continue
delta=probf[0]-probprev
ratio=delta/deltaprev
probprev=probf[0]
deltaprev=delta
round+=1
ifratio<=DELTA:
print('numiteration:',round)
break
if__name__=='__main__':
print("pythonmyHMM")
#初始的状态概率矩阵pi;状态转移矩阵A;输出观察概率矩阵B;观察序列
pi=[0.5,0.5]
A=[[0.8125,0.1875],[0.2,0.8]]
B=[[0.875,0.125],[0.25,0.75]]
O=[
[1,0,0,1,1,0,0,0,0],
[1,1,0,1,0,0,1,1,0],
[0,0,1,1,0,0,1,1,1]
]
L=len(O)
T=len(O[0])#T等于最长序列的长度就好了
hmm=HMM(A,B,pi)
alpha=np.zeros((T,hmm.N),np.float)
beta=np.zeros((T,hmm.N),np.float)
gamma=np.zeros((T,hmm.N),np.float)
#训练
hmm.BaumWelch(L,T,O,alpha,beta,gamma)
#输出HMM参数信息
hmm.printHMM()
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。