Package medite :: Package MediteAppli :: Module HMM
[hide private]
[frames] | no frames]

Source Code for Module medite.MediteAppli.HMM

  1  # -*- coding: iso-8859-1 -*- 
  2  # Copyright 20003 - 2008: Julien Bourdaillet (julien.bourdaillet@lip6.fr), Jean-Gabriel Ganascia (jean-gabriel.ganascia@lip6.fr) 
  3  # This file is part of MEDITE. 
  4  # 
  5  #    MEDITE is free software; you can redistribute it and/or modify 
  6  #    it under the terms of the GNU General Public License as published by 
  7  #    the Free Software Foundation; either version 2 of the License, or 
  8  #    (at your option) any later version. 
  9  # 
 10  #    MEDITE is distributed in the hope that it will be useful, 
 11  #    but WITHOUT ANY WARRANTY; without even the implied warranty of 
 12  #    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the 
 13  #    GNU General Public License for more details. 
 14  # 
 15  #    You should have received a copy of the GNU General Public License 
 16  #    along with Foobar; if not, write to the Free Software 
 17  #    Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA  02110-1301  USA 
 18   
 19  #import la lib hmm ici 
 20  import sys,logging 
 21   
 22  #from numpy import exp,log,seterr 
 23  #from math import log as mlog 
 24  from math import exp 
 25  from numpy import Infinity 
 26   
 27  import platform 
 28  if platform.system().lower() == 'linux': 
 29      from ghmm import * 
 30   
 31   
32 -def mlog(x): return x
33 -def log(x):
34 try: 35 return mlog(x) 36 except OverflowError: 37 #si x = 0 38 return -1 * Infinity
39 # if x !=0 : 40 # return mlog(x) 41 # else : 42 # return -Infinity 43 44
45 -class algoHMM:
46 """classe effectuant l'algorithme HMM viterbi pour l'algorithme de Feng & Manmatha 47 """
48 - def __init__(self,termes1,termes2,debugLevel=1):
49 """Constructeur 50 @param termes1,termes2: la liste des blocs que l'on va comparer 51 @type termes1,termes2: [bloc(val,index),...] (au moins les champs val et index doivent etre renseignés) 52 @param debugLevel : voir feng.py, optionnel 53 """ 54 #Definition des constantes: 55 self.__k1 = 0.001 56 self.__logk1 = mlog(self.__k1) 57 self.__k2 = 0.8 58 self.__logk2 = mlog(self.__k2) 59 self.__lambda = 0.5 60 self.__mu1 = 0.99 61 self.__logmu1 = mlog(self.__mu1) 62 self.__mu2 = 0.0001 63 self.__logmu2 = mlog(self.__mu2) 64 self.__logZero = 0#-Infinity 65 self.__logUn = mlog(1) 66 logging.basicConfig(level=debugLevel,datefmt='%a, %d %b %Y %H:%M:%S',format='%(asctime)s %(levelname)s %(message)s')#,filename='hmm.log',filemode='w') 67 #TODO : DEFINIR LE FORMAT D ENTREE ([(mot/hash,pos),...]) 68 # G liste des positions dans le texte1 de taille m 69 self.listeTexte1 = [(termes1.index(x),x.index,x.val) for x in termes1] 70 #O liste des positions dans le texte2 de taille n 71 self.listeTexte2 = [(termes2.index(x),x.index,x.val) for x in termes2]
72 73 # if len(self.listeTexte2)>len(self.listeTexte1) : 74 # tmp = self.listeTexte1[:] 75 # self.listeTexte1 = self.listeTexte2[:] 76 # self.listeTexte2 = tmp[:] 77 78 79 #par defaut numpy affiche un warning pour log(0), mais renvoie quand meme -infini 80 #on ne souhaite pas afficher ce warning 81 # e = seterr(divide='ignore') 82 # e=seterr(**e) 83 # seterr(**e) #bizarre, il faut le faire 2 fois ... 84 85
86 - def calculeAlignementPy(self):
87 """ 88 Lance l'alignement HMM, ancienne version en python pur, lente car python 89 @return la sequence de termes sur laquelle on aligne 90 @rtype [[posListe1,posListe2]...] 91 """ 92 #print "Debut viterbi :",len(self.listeTexte1),len(self.listeTexte2) 93 #print self.listeTexte1,"\n____\n",self.listeTexte2 94 logging.log(2,"Appel de l'alignement HMM") 95 viterbi = self.__viterbi()[1] 96 res=[] 97 if viterbi == None : res = [] 98 else: 99 for index in xrange(len(viterbi)-1): 100 res.append((self.listeTexte1[viterbi[index]][1],self.listeTexte2[index][1])) 101 102 logging.log(1,"liste de termes 1 : %s",self.listeTexte1) 103 logging.log(1,"liste de termes 2 : %s",self.listeTexte2) 104 logging.log(1,"Termes alignés par le HMM : %s",res) 105 logging.log(2,"Fin de l'alignement HMM") 106 return res
107
108 - def calculeAlignement(self):
109 """ 110 Calul l'alignement de Viterbi avec la librairie GHMM 111 @return liste de couples apparies entre texte 1 et texte 2 112 @rtype liste de couples (index texte 1, index texte 2) 113 """ 114 #matrice A des probas de transition 115 if self.listeTexte1 == [] or self.listeTexte2 == [] : return [] 116 # sigma = liste/alphabet des etats possibles 117 sigma = IntegerRange(0,max(len(self.listeTexte1),len(self.listeTexte2))) 118 # A: matrice de transition 119 A=[] 120 for i in xrange(len(self.listeTexte1)): 121 ATmp=[] 122 for j in xrange(len(self.listeTexte1)): 123 ATmp.append(self.__probaTransition(j, i)) 124 A.append(ATmp) 125 #matrice B des probas d'emissions 126 B=[] 127 for i in xrange(len(self.listeTexte1)): 128 BTmp=[] 129 for j in xrange(len(self.listeTexte2)): 130 BTmp.append(self.__probaEmmision(self.listeTexte2[j][2], i)) 131 #la proba d'emission est 0 pour les termes1 non présents dans le texte2 132 # cf le code de ghmm 133 for k in xrange(len(self.listeTexte1)-len(self.listeTexte2)): 134 BTmp.append(0) 135 B.append(BTmp) 136 137 #matrice pi des proba initiales 138 pi = [0]*len(self.listeTexte1) 139 if len(pi) > 0 : pi[0]=1 # éta 0, proba intiale de 1, les autres de 0 140 141 m = HMMFromMatrices(sigma, DiscreteDistribution(sigma), A, B, pi) 142 #print m 143 test = EmissionSequence(sigma,[x[0] for x in self.listeTexte2]) 144 v = m.viterbi(test) 145 # print v 146 viterbi = v[0] 147 res=[] 148 if viterbi == None : res = [] 149 else: 150 for index in xrange(len(viterbi)): 151 res.append((self.listeTexte1[viterbi[index]][1],self.listeTexte2[index][1])) 152 153 # print self.listeTexte1 154 # print self.listeTexte2 155 # print res 156 #sys.exit(0) 157 # res: liste de couples appariés entre texte 1 et texte 2 158 return res
159 ########## 160 #Methodes privées 161 ########### 162 163
164 - def __probaTransition(self, etatSuivant, etat):
165 """P(Si|Si-1), et etat1 apres etat2 166 Partie 2.1 => (3) 167 """ 168 if etatSuivant < etat : res = self.__logZero 169 elif etatSuivant == etat : res = self.__logk1 170 elif etatSuivant - etat == 1 : res = self.__logk2 171 elif etatSuivant - etat > 1 : res = log(self.__lambda * exp (-1 * self.__lambda * (etatSuivant-etat))) 172 return res
173
174 - def __probaEmmision(self, observation, etat):
175 """P(Oi|Si) 176 Partie 2.1 => (4) 177 """ 178 if observation == self.listeTexte1[etat][2] : res = self.__logmu1 179 else : res = self.__logmu2 180 return res
181
182 - def __probaInitiale(self,etat):
183 """p(Si) 184 """ 185 if etat == 0 : res = self.__logUn 186 else : res = self.__logZero 187 return res
188 # return len(self.listeTexte1)**(-1) 189 190 191 192
193 - def __viterbi(self):
194 """Algorithme viterbi en Python, lent 195 @return: la proba de l'etat, la séquence d'etats, la probabilité d'apparition de la sequence 196 @rtype: decimal,[[posListe1,posListe2]...],decimal 197 """ 198 T = {} 199 for etat in self.listeTexte1: 200 # prob. V. path V. prob. 201 #T[etat[1]] = (self.__probaInitiale(etat[0]), [etat[0]], self.__probaInitiale(etat[0])) 202 T[etat[1]] = ([], [etat[0]], self.__probaInitiale(etat[0])) 203 logging.log(1,"Debut viterbi") 204 logging.log(1,"T : %s",T) 205 if __debug__ : 206 cpt = 0 207 len2 = len(self.listeTexte2)+1 208 argMax=[] 209 for observation in self.listeTexte2: 210 U = {} 211 for etatSuivant in self.listeTexte1: 212 #total = log(0) 213 #argMax = None 214 valMax = self.__logZero 215 for etat in self.listeTexte1: 216 (prob, viterbiSequence, viterbiProb) = T[etat[1]] 217 p = self.__probaEmmision(observation[2], etat[0]) + self.__probaTransition(etatSuivant[1], etat[1]) 218 viterbiProb += p 219 #total = log( exp(total)+ exp(p)) 220 if viterbiProb > valMax: 221 argMax = viterbiSequence+[etatSuivant[0]] 222 valMax = viterbiProb 223 # U[etatSuivant[1]] = (total, argMax, valMax) 224 U[etatSuivant[1]] = ([], argMax, valMax) 225 if __debug__ : 226 cpt=len(argMax) 227 logging.log(3, "Viterbi : %0.0f %% Done",float(cpt*len2**-1)*100) 228 T = U 229 ## apply sum/max to the final states: 230 #total = log(0) 231 argMax = None 232 valMax = self.__logZero 233 for etat in self.listeTexte1: 234 (prob, viterbiSequence, viterbiProb) = T[etat[1]] 235 #total += prob 236 if viterbiProb > valMax: 237 argMax = viterbiSequence 238 valMax = viterbiProb 239 logging.log(1,"T final : %s",T) 240 return ([], argMax, valMax)
241