python实现隐马尔科夫模型HMM
一份完全按照李航<<统计学习方法>>介绍的HMM代码,供大家参考,具体内容如下
#coding=utf8 ''''' Createdon2017-8-5 里面的代码许多地方可以精简,但为了百分百还原公式,就没有精简了。 @author:adzhua ''' importnumpyasnp classHMM(object): def__init__(self,A,B,pi): ''''' A:状态转移概率矩阵 B:输出观察概率矩阵 pi:初始化状态向量 ''' self.A=np.array(A) self.B=np.array(B) self.pi=np.array(pi) self.N=self.A.shape[0]#总共状态个数 self.M=self.B.shape[1]#总共观察值个数 #输出HMM的参数信息 defprintHMM(self): print("==================================================") print("HMMcontent:N=",self.N,",M=",self.M) foriinrange(self.N): ifi==0: print("hmm.A",self.A[i,:],"hmm.B",self.B[i,:]) else: print("",self.A[i,:],"",self.B[i,:]) print("hmm.pi",self.pi) print("==================================================") #前向算法 defforwar(self,T,O,alpha,prob): ''''' T:观察序列的长度 O:观察序列 alpha:运算中用到的临时数组 prob:返回值所要求的概率 ''' #初始化 foriinrange(self.N): alpha[0,i]=self.pi[i]*self.B[i,O[0]] #递归 fortinrange(T-1): forjinrange(self.N): sum=0.0 foriinrange(self.N): sum+=alpha[t,i]*self.A[i,j] alpha[t+1,j]=sum*self.B[j,O[t+1]] #终止 sum=0.0 foriinrange(self.N): sum+=alpha[T-1,i] prob[0]*=sum #带修正的前向算法 defforwardWithScale(self,T,O,alpha,scale,prob): scale[0]=0.0 #初始化 foriinrange(self.N): alpha[0,i]=self.pi[i]*self.B[i,O[0]] scale[0]+=alpha[0,i] foriinrange(self.N): alpha[0,i]/=scale[0] #递归 fortinrange(T-1): scale[t+1]=0.0 forjinrange(self.N): sum=0.0 foriinrange(self.N): sum+=alpha[t,i]*self.A[i,j] alpha[t+1,j]=sum*self.B[j,O[t+1]] scale[t+1]+=alpha[t+1,j] forjinrange(self.N): alpha[t+1,j]/=scale[t+1] #终止 fortinrange(T): prob[0]+=np.log(scale[t]) defback(self,T,O,beta,prob): ''''' T:观察序列的长度len(O) O:观察序列 beta:计算时用到的临时数组 prob:返回值;所要求的概率 ''' #初始化 foriinrange(self.N): beta[T-1,i]=1.0 #递归 fortinrange(T-2,-1,-1):#从T-2开始递减;即T-2,T-3,T-4,...,0 foriinrange(self.N): sum=0.0 forjinrange(self.N): sum+=self.A[i,j]*self.B[j,O[t+1]]*beta[t+1,j] beta[t,i]=sum #终止 sum=0.0 foriinrange(self.N): sum+=self.pi[i]*self.B[i,O[0]]*beta[0,i] prob[0]=sum #带修正的后向算法 defbackwardWithScale(self,T,O,beta,scale): ''''' T:观察序列的长度len(O) O:观察序列 beta:计算时用到的临时数组 ''' #初始化 foriinrange(self.N): beta[T-1,i]=1.0 #递归 fortinrange(T-2,-1,-1): foriinrange(self.N): sum=0.0 forjinrange(self.N): sum+=self.A[i,j]*self.B[j,O[t+1]]*beta[t+1,j] beta[t,i]=sum/scale[t+1] #viterbi算法 defviterbi(self,O): ''''' O:观察序列 ''' T=len(O) #初始化 delta=np.zeros((T,self.N),np.float) phi=np.zeros((T,self.N),np.float) I=np.zeros(T) foriinrange(self.N): delta[0,i]=self.pi[i]*self.B[i,O[0]] phi[0,i]=0.0 #递归 fortinrange(1,T): foriinrange(self.N): delta[t,i]=self.B[i,O[t]]*np.array([delta[t-1,j]*self.A[j,i]forjinrange(self.N)]).max() phi=np.array([delta[t-1,j]*self.A[j,i]forjinrange(self.N)]).argmax() #终止 prob=delta[T-1,:].max() I[T-1]=delta[T-1,:].argmax() fortinrange(T-2,-1,-1): I[t]=phi[I[t+1]] returnprob,I #计算gamma(计算A所需的分母;详情见李航的统计学习):时刻t时马尔可夫链处于状态Si的概率 defcomputeGamma(self,T,alpha,beta,gamma): '''''''' fortinrange(T): foriinrange(self.N): sum=0.0 forjinrange(self.N): sum+=alpha[t,j]*beta[t,j] gamma[t,i]=(alpha[t,i]*beta[t,i])/sum #计算sai(i,j)(计算A所需的分子)为给定训练序列O和模型lambda时 defcomputeXi(self,T,O,alpha,beta,Xi): fortinrange(T-1): sum=0.0 foriinrange(self.N): forjinrange(self.N): Xi[t,i,j]=alpha[t,i]*self.A[i,j]*self.B[j,O[t+1]]*beta[t+1,j] sum+=Xi[t,i,j] foriinrange(self.N): forjinrange(self.N): Xi[t,i,j]/=sum #输入L个观察序列O,初始模型:HMM={A,B,pi,N,M} defBaumWelch(self,L,T,O,alpha,beta,gamma): DELTA=0.01;round=0;flag=1;probf=[0.0] delta=0.0;probprev=0.0;ratio=0.0;deltaprev=10e-70 xi=np.zeros((T,self.N,self.N))#计算A的分子 pi=np.zeros((T),np.float)#状态初始化概率 denominatorA=np.zeros((self.N),np.float)#辅助计算A的分母的变量 denominatorB=np.zeros((self.N),np.float) numeratorA=np.zeros((self.N,self.N),np.float)#辅助计算A的分子的变量 numeratorB=np.zeros((self.N,self.M),np.float)#针对输出观察概率矩阵 scale=np.zeros((T),np.float) whileTrue: probf[0]=0 #E_step forlinrange(L): self.forwardWithScale(T,O[l],alpha,scale,probf) self.backwardWithScale(T,O[l],beta,scale) self.computeGamma(T,alpha,beta,gamma)#(t,i) self.computeXi(T,O[l],alpha,beta,xi)#(t,i,j) foriinrange(self.N): pi[i]+=gamma[0,i] fortinrange(T-1): denominatorA[i]+=gamma[t,i] denominatorB[i]+=gamma[t,i] denominatorB[i]+=gamma[T-1,i] forjinrange(self.N): fortinrange(T-1): numeratorA[i,j]+=xi[t,i,j] forkinrange(self.M):#M为观察状态取值个数 fortinrange(T): ifO[l][t]==k: numeratorB[i,k]+=gamma[t,i] #M_step。计算pi,A,B foriinrange(self.N):#这个for循环也可以放到forlinrange(L)里面 self.pi[i]=0.001/self.N+0.999*pi[i]/L forjinrange(self.N): self.A[i,j]=0.001/self.N+0.999*numeratorA[i,j]/denominatorA[i] numeratorA[i,j]=0.0 forkinrange(self.M): self.B[i,k]=0.001/self.N+0.999*numeratorB[i,k]/denominatorB[i] numeratorB[i,k]=0.0 #重置 pi[i]=denominatorA[i]=denominatorB[i]=0.0 ifflag==1: flag=0 probprev=probf[0] ratio=1 continue delta=probf[0]-probprev ratio=delta/deltaprev probprev=probf[0] deltaprev=delta round+=1 ifratio<=DELTA: print('numiteration:',round) break if__name__=='__main__': print("pythonmyHMM") #初始的状态概率矩阵pi;状态转移矩阵A;输出观察概率矩阵B;观察序列 pi=[0.5,0.5] A=[[0.8125,0.1875],[0.2,0.8]] B=[[0.875,0.125],[0.25,0.75]] O=[ [1,0,0,1,1,0,0,0,0], [1,1,0,1,0,0,1,1,0], [0,0,1,1,0,0,1,1,1] ] L=len(O) T=len(O[0])#T等于最长序列的长度就好了 hmm=HMM(A,B,pi) alpha=np.zeros((T,hmm.N),np.float) beta=np.zeros((T,hmm.N),np.float) gamma=np.zeros((T,hmm.N),np.float) #训练 hmm.BaumWelch(L,T,O,alpha,beta,gamma) #输出HMM参数信息 hmm.printHMM()
以上就是本文的全部内容,希望对大家的学习有所帮助,也希望大家多多支持毛票票。
声明:本文内容来源于网络,版权归原作者所有,内容由互联网用户自发贡献自行上传,本网站不拥有所有权,未作人工编辑处理,也不承担相关法律责任。如果您发现有涉嫌版权的内容,欢迎发送邮件至:czq8825#qq.com(发邮件时,请将#更换为@)进行举报,并提供相关证据,一经查实,本站将立刻删除涉嫌侵权内容。