1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20 import sys,logging
21
22
23
24 from math import exp
25 from numpy import Infinity
26
27 import platform
28 if platform.system().lower() == 'linux':
29 from ghmm import *
30
31
32 -def mlog(x): return x
34 try:
35 return mlog(x)
36 except OverflowError:
37
38 return -1 * Infinity
39
40
41
42
43
44
46 """classe effectuant l'algorithme HMM viterbi pour l'algorithme de Feng & Manmatha
47 """
48 - def __init__(self,termes1,termes2,debugLevel=1):
49 """Constructeur
50 @param termes1,termes2: la liste des blocs que l'on va comparer
51 @type termes1,termes2: [bloc(val,index),...] (au moins les champs val et index doivent etre renseignés)
52 @param debugLevel : voir feng.py, optionnel
53 """
54
55 self.__k1 = 0.001
56 self.__logk1 = mlog(self.__k1)
57 self.__k2 = 0.8
58 self.__logk2 = mlog(self.__k2)
59 self.__lambda = 0.5
60 self.__mu1 = 0.99
61 self.__logmu1 = mlog(self.__mu1)
62 self.__mu2 = 0.0001
63 self.__logmu2 = mlog(self.__mu2)
64 self.__logZero = 0
65 self.__logUn = mlog(1)
66 logging.basicConfig(level=debugLevel,datefmt='%a, %d %b %Y %H:%M:%S',format='%(asctime)s %(levelname)s %(message)s')
67
68
69 self.listeTexte1 = [(termes1.index(x),x.index,x.val) for x in termes1]
70
71 self.listeTexte2 = [(termes2.index(x),x.index,x.val) for x in termes2]
72
73
74
75
76
77
78
79
80
81
82
83
84
85
87 """
88 Lance l'alignement HMM, ancienne version en python pur, lente car python
89 @return la sequence de termes sur laquelle on aligne
90 @rtype [[posListe1,posListe2]...]
91 """
92
93
94 logging.log(2,"Appel de l'alignement HMM")
95 viterbi = self.__viterbi()[1]
96 res=[]
97 if viterbi == None : res = []
98 else:
99 for index in xrange(len(viterbi)-1):
100 res.append((self.listeTexte1[viterbi[index]][1],self.listeTexte2[index][1]))
101
102 logging.log(1,"liste de termes 1 : %s",self.listeTexte1)
103 logging.log(1,"liste de termes 2 : %s",self.listeTexte2)
104 logging.log(1,"Termes alignés par le HMM : %s",res)
105 logging.log(2,"Fin de l'alignement HMM")
106 return res
107
109 """
110 Calul l'alignement de Viterbi avec la librairie GHMM
111 @return liste de couples apparies entre texte 1 et texte 2
112 @rtype liste de couples (index texte 1, index texte 2)
113 """
114
115 if self.listeTexte1 == [] or self.listeTexte2 == [] : return []
116
117 sigma = IntegerRange(0,max(len(self.listeTexte1),len(self.listeTexte2)))
118
119 A=[]
120 for i in xrange(len(self.listeTexte1)):
121 ATmp=[]
122 for j in xrange(len(self.listeTexte1)):
123 ATmp.append(self.__probaTransition(j, i))
124 A.append(ATmp)
125
126 B=[]
127 for i in xrange(len(self.listeTexte1)):
128 BTmp=[]
129 for j in xrange(len(self.listeTexte2)):
130 BTmp.append(self.__probaEmmision(self.listeTexte2[j][2], i))
131
132
133 for k in xrange(len(self.listeTexte1)-len(self.listeTexte2)):
134 BTmp.append(0)
135 B.append(BTmp)
136
137
138 pi = [0]*len(self.listeTexte1)
139 if len(pi) > 0 : pi[0]=1
140
141 m = HMMFromMatrices(sigma, DiscreteDistribution(sigma), A, B, pi)
142
143 test = EmissionSequence(sigma,[x[0] for x in self.listeTexte2])
144 v = m.viterbi(test)
145
146 viterbi = v[0]
147 res=[]
148 if viterbi == None : res = []
149 else:
150 for index in xrange(len(viterbi)):
151 res.append((self.listeTexte1[viterbi[index]][1],self.listeTexte2[index][1]))
152
153
154
155
156
157
158 return res
159
160
161
162
163
165 """P(Si|Si-1), et etat1 apres etat2
166 Partie 2.1 => (3)
167 """
168 if etatSuivant < etat : res = self.__logZero
169 elif etatSuivant == etat : res = self.__logk1
170 elif etatSuivant - etat == 1 : res = self.__logk2
171 elif etatSuivant - etat > 1 : res = log(self.__lambda * exp (-1 * self.__lambda * (etatSuivant-etat)))
172 return res
173
175 """P(Oi|Si)
176 Partie 2.1 => (4)
177 """
178 if observation == self.listeTexte1[etat][2] : res = self.__logmu1
179 else : res = self.__logmu2
180 return res
181
183 """p(Si)
184 """
185 if etat == 0 : res = self.__logUn
186 else : res = self.__logZero
187 return res
188
189
190
191
192
194 """Algorithme viterbi en Python, lent
195 @return: la proba de l'etat, la séquence d'etats, la probabilité d'apparition de la sequence
196 @rtype: decimal,[[posListe1,posListe2]...],decimal
197 """
198 T = {}
199 for etat in self.listeTexte1:
200
201
202 T[etat[1]] = ([], [etat[0]], self.__probaInitiale(etat[0]))
203 logging.log(1,"Debut viterbi")
204 logging.log(1,"T : %s",T)
205 if __debug__ :
206 cpt = 0
207 len2 = len(self.listeTexte2)+1
208 argMax=[]
209 for observation in self.listeTexte2:
210 U = {}
211 for etatSuivant in self.listeTexte1:
212
213
214 valMax = self.__logZero
215 for etat in self.listeTexte1:
216 (prob, viterbiSequence, viterbiProb) = T[etat[1]]
217 p = self.__probaEmmision(observation[2], etat[0]) + self.__probaTransition(etatSuivant[1], etat[1])
218 viterbiProb += p
219
220 if viterbiProb > valMax:
221 argMax = viterbiSequence+[etatSuivant[0]]
222 valMax = viterbiProb
223
224 U[etatSuivant[1]] = ([], argMax, valMax)
225 if __debug__ :
226 cpt=len(argMax)
227 logging.log(3, "Viterbi : %0.0f %% Done",float(cpt*len2**-1)*100)
228 T = U
229
230
231 argMax = None
232 valMax = self.__logZero
233 for etat in self.listeTexte1:
234 (prob, viterbiSequence, viterbiProb) = T[etat[1]]
235
236 if viterbiProb > valMax:
237 argMax = viterbiSequence
238 valMax = viterbiProb
239 logging.log(1,"T final : %s",T)
240 return ([], argMax, valMax)
241