# -*- coding: iso-8859-1 -*- # Copyright 20003 - 2008: Julien Bourdaillet (julien.bourdaillet@lip6.fr), Jean-Gabriel Ganascia (jean-gabriel.ganascia@lip6.fr) # This file is part of MEDITE. # # MEDITE is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # MEDITE is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Foobar; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA #import la lib hmm ici import sys,logging #from numpy import exp,log,seterr #from math import log as mlog from math import exp from numpy import Infinity import platform if platform.system().lower() == 'linux': from ghmm import * def mlog(x): return x def log(x): try: return mlog(x) except OverflowError: #si x = 0 return -1 * Infinity # if x !=0 : # return mlog(x) # else : # return -Infinity class algoHMM: """classe effectuant l'algorithme HMM viterbi pour l'algorithme de Feng & Manmatha """ def __init__(self,termes1,termes2,debugLevel=1): """Constructeur @param termes1,termes2: la liste des blocs que l'on va comparer @type termes1,termes2: [bloc(val,index),...] (au moins les champs val et index doivent etre renseignés) @param debugLevel : voir feng.py, optionnel """ #Definition des constantes: self.__k1 = 0.001 self.__logk1 = mlog(self.__k1) self.__k2 = 0.8 self.__logk2 = mlog(self.__k2) self.__lambda = 0.5 self.__mu1 = 0.99 self.__logmu1 = mlog(self.__mu1) self.__mu2 = 0.0001 self.__logmu2 = mlog(self.__mu2) self.__logZero = 0#-Infinity self.__logUn = mlog(1) logging.basicConfig(level=debugLevel,datefmt='%a, %d %b %Y %H:%M:%S',format='%(asctime)s %(levelname)s %(message)s')#,filename='hmm.log',filemode='w') #TODO : DEFINIR LE FORMAT D ENTREE ([(mot/hash,pos),...]) # G liste des positions dans le texte1 de taille m self.listeTexte1 = [(termes1.index(x),x.index,x.val) for x in termes1] #O liste des positions dans le texte2 de taille n self.listeTexte2 = [(termes2.index(x),x.index,x.val) for x in termes2] # if len(self.listeTexte2)>len(self.listeTexte1) : # tmp = self.listeTexte1[:] # self.listeTexte1 = self.listeTexte2[:] # self.listeTexte2 = tmp[:] #par defaut numpy affiche un warning pour log(0), mais renvoie quand meme -infini #on ne souhaite pas afficher ce warning # e = seterr(divide='ignore') # e=seterr(**e) # seterr(**e) #bizarre, il faut le faire 2 fois ... def calculeAlignementPy(self): """ Lance l'alignement HMM, ancienne version en python pur, lente car python @return la sequence de termes sur laquelle on aligne @rtype [[posListe1,posListe2]...] """ #print "Debut viterbi :",len(self.listeTexte1),len(self.listeTexte2) #print self.listeTexte1,"\n____\n",self.listeTexte2 logging.log(2,"Appel de l'alignement HMM") viterbi = self.__viterbi()[1] res=[] if viterbi == None : res = [] else: for index in xrange(len(viterbi)-1): res.append((self.listeTexte1[viterbi[index]][1],self.listeTexte2[index][1])) logging.log(1,"liste de termes 1 : %s",self.listeTexte1) logging.log(1,"liste de termes 2 : %s",self.listeTexte2) logging.log(1,"Termes alignés par le HMM : %s",res) logging.log(2,"Fin de l'alignement HMM") return res def calculeAlignement(self): """ Calul l'alignement de Viterbi avec la librairie GHMM @return liste de couples apparies entre texte 1 et texte 2 @rtype liste de couples (index texte 1, index texte 2) """ #matrice A des probas de transition if self.listeTexte1 == [] or self.listeTexte2 == [] : return [] # sigma = liste/alphabet des etats possibles sigma = IntegerRange(0,max(len(self.listeTexte1),len(self.listeTexte2))) # A: matrice de transition A=[] for i in xrange(len(self.listeTexte1)): ATmp=[] for j in xrange(len(self.listeTexte1)): ATmp.append(self.__probaTransition(j, i)) A.append(ATmp) #matrice B des probas d'emissions B=[] for i in xrange(len(self.listeTexte1)): BTmp=[] for j in xrange(len(self.listeTexte2)): BTmp.append(self.__probaEmmision(self.listeTexte2[j][2], i)) #la proba d'emission est 0 pour les termes1 non présents dans le texte2 # cf le code de ghmm for k in xrange(len(self.listeTexte1)-len(self.listeTexte2)): BTmp.append(0) B.append(BTmp) #matrice pi des proba initiales pi = [0]*len(self.listeTexte1) if len(pi) > 0 : pi[0]=1 # éta 0, proba intiale de 1, les autres de 0 m = HMMFromMatrices(sigma, DiscreteDistribution(sigma), A, B, pi) #print m test = EmissionSequence(sigma,[x[0] for x in self.listeTexte2]) v = m.viterbi(test) # print v viterbi = v[0] res=[] if viterbi == None : res = [] else: for index in xrange(len(viterbi)): res.append((self.listeTexte1[viterbi[index]][1],self.listeTexte2[index][1])) # print self.listeTexte1 # print self.listeTexte2 # print res #sys.exit(0) # res: liste de couples appariés entre texte 1 et texte 2 return res ########## #Methodes privées ########### def __probaTransition(self, etatSuivant, etat): """P(Si|Si-1), et etat1 apres etat2 Partie 2.1 => (3) """ if etatSuivant < etat : res = self.__logZero elif etatSuivant == etat : res = self.__logk1 elif etatSuivant - etat == 1 : res = self.__logk2 elif etatSuivant - etat > 1 : res = log(self.__lambda * exp (-1 * self.__lambda * (etatSuivant-etat))) return res def __probaEmmision(self, observation, etat): """P(Oi|Si) Partie 2.1 => (4) """ if observation == self.listeTexte1[etat][2] : res = self.__logmu1 else : res = self.__logmu2 return res def __probaInitiale(self,etat): """p(Si) """ if etat == 0 : res = self.__logUn else : res = self.__logZero return res # return len(self.listeTexte1)**(-1) def __viterbi(self): """Algorithme viterbi en Python, lent @return: la proba de l'etat, la séquence d'etats, la probabilité d'apparition de la sequence @rtype: decimal,[[posListe1,posListe2]...],decimal """ T = {} for etat in self.listeTexte1: # prob. V. path V. prob. #T[etat[1]] = (self.__probaInitiale(etat[0]), [etat[0]], self.__probaInitiale(etat[0])) T[etat[1]] = ([], [etat[0]], self.__probaInitiale(etat[0])) logging.log(1,"Debut viterbi") logging.log(1,"T : %s",T) if __debug__ : cpt = 0 len2 = len(self.listeTexte2)+1 argMax=[] for observation in self.listeTexte2: U = {} for etatSuivant in self.listeTexte1: #total = log(0) #argMax = None valMax = self.__logZero for etat in self.listeTexte1: (prob, viterbiSequence, viterbiProb) = T[etat[1]] p = self.__probaEmmision(observation[2], etat[0]) + self.__probaTransition(etatSuivant[1], etat[1]) viterbiProb += p #total = log( exp(total)+ exp(p)) if viterbiProb > valMax: argMax = viterbiSequence+[etatSuivant[0]] valMax = viterbiProb # U[etatSuivant[1]] = (total, argMax, valMax) U[etatSuivant[1]] = ([], argMax, valMax) if __debug__ : cpt=len(argMax) logging.log(3, "Viterbi : %0.0f %% Done",float(cpt*len2**-1)*100) T = U ## apply sum/max to the final states: #total = log(0) argMax = None valMax = self.__logZero for etat in self.listeTexte1: (prob, viterbiSequence, viterbiProb) = T[etat[1]] #total += prob if viterbiProb > valMax: argMax = viterbiSequence valMax = viterbiProb logging.log(1,"T final : %s",T) return ([], argMax, valMax)