# -*- coding: iso-8859-1 -*- # Copyright 20003 - 2008: Julien Bourdaillet (julien.bourdaillet@lip6.fr), Jean-Gabriel Ganascia (jean-gabriel.ganascia@lip6.fr) # This file is part of MEDITE. # # MEDITE is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # MEDITE is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Foobar; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA import sys,string, logging, bisect, gc from math import * import psyco import suffix_tree, recouvrement, utile import numpy.oldnumeric as Numeric import numpy.numarray as numarray import numpy import aligne #import cost #print dir(cost) class Align(object): """ Interface, regroupe les fonctions communes """ def __init__(self): #,texte): """ Constructeur #pre: isinstance(texte,str) # len(texte)>0 """ #self.texte = texte # texte original self.tool = utile.Utile() def deplacements_pond(self, L1, L2): """ Fonction qui aligne 2 liste d'intervalles du texte pre: isinstance(L1,list) isinstance(L2,list) len(L1)>0 len(L2)>0 forall([L1[i][0] >= L1[i-1][1] for i in range(1, len(L1))]) forall([L2[i][0] >= L2[i-1][1] for i in range(1, len(L2))]) """ pass def repetition(self, liste_occ, l_texte1) : """ Détermine s'il y a une occurrence inférieure et une occurrence supérieure éà la frontière pre: 0<=l_texte1<=self.l_texte1 """ return (not liste_occ == [] and liste_occ[0] < l_texte1 and liste_occ[-1] >= l_texte1) def syserr(self,str): sys.stderr.write(str+"\n") #sys.stderr.flush() def ass__(self,L): if __debug__: for i in xrange(1,len(L)): assert L[i-1][1]<=L[i][0] def ass2__(self,L,d,f,texte): if __debug__: for i in xrange(1,len(L)): assert d<=L[i-1][1]<=L[i][0]<=f,"d "+str(d)+" / L["+str(i-1)+"][1] "+str(L[i-1][1])+" / L["+str(i)+"][0] "+str(L[i][0])+" / f "+str(f)+ \ "\nL "+str(L)+"\n"+self.lint2str(L,texte) def lint2str(self,L,texte): """ Transforme une liste d'intervalle sur self.texte en leur transcription textuelle pre: isinstance(L,list) post: isinstance(__return__,str) """ res = "" if len(L) == 0: return '' for x in L[:-1]: res += texte[x[0]:x[1]] + " / " res += texte[L[-1][0]:L[-1][1]] return res class AlignNaif(Align): """ Aligneur naif, 1ere version utilisée dans MEDITE """ def deplacements_pond(self, S1, S2): #print "appel deplacements_pond_ avec seq1 = ", S1, " et seq2 = ", S2 L = [] while S1 <> [] or S2 <> []: if S1 == []: L = self.tool.addition_intervalle(L, S2[0]) S2 = S2[1:] elif S2 == []: L = self.tool.addition_intervalle(L, S1[0]) S1 = S1[1:] elif self.texte[S1[0][0]:S1[0][1]] == self.texte[S2[0][0]:S2[0][1]]: S1 = S1[1:] S2 = S2[1:] elif self.absent(S1[0], S2): L = self.tool.addition_intervalle(L, S1[0]) S1 = S1[1:] elif self.absent(S2[0], S1): L = self.tool.addition_intervalle(L, S2[0]) S2 = S2[1:] else: ecart1 = self.rang(S1[0], S2) ecart2 = self.rang(S2[0], S1) if (ecart1 > ecart2 or (ecart1 == ecart2 and (S1[0][1] - S1[0][0]) < (S2[0][1] - S2[0][0]))): L = self.tool.addition_intervalle(L, S1[0]) S1 = S1[1:] else: L = self.tool.addition_intervalle(L, S2[0]) S2 = S2[1:] return L def absent(self, I, S): # vérifie que la chaine repérée par l'intervalle I n'est pas identique à une chaine # présente dans l'un des intervalles de S while S <> []: if self.texte[I[0]:I[1]] == self.texte[S[0][0]:S[0][1]]: return 0 else: S = S[1:] return 1 def rang(self, I, S): # calcule le rang de la chaine couverte par I dans S return self.rang_aux(I, S) def rang_aux(self, I, S): ncar = 0 while S <> [] and self.texte[I[0]:I[1]] <> self.texte[S[0][0]:S[0][1]]: ncar = ncar + S[0][1] - S[0][0] S = S[1:] return ncar class AlignDicho(Align): """ Alignement dichotomique """ def deplacements_pond(self, L1, L2): L=L2[:] i=0 # dico stockant pour chaque bloc de la 2e liste ses positions dans cette liste self.dicoPos = {} while L <> []: B2=L[0] chaine = self.texte[B2[0]:B2[1]] l=B2[1]-B2[0] # taille du bloc if not self.dicoPos.has_key(chaine): # ajout de sa position self.dicoPos[chaine] = [(i,l)] else:self.dicoPos[chaine].append((i,l)) i+=1 L=L[1:] self.L1=L1 self.L2=L2 LRes1,LRes2 = self.trouver_deplacement(0,0,len(L1),0,len(L2)) i=c=0.0 for b in self.dicoPos: i+=1 for p,l in self.dicoPos[b]: c+=1 #sys.stderr.write("Nb moyen d'occurence par bloc de L2 (d) = "+str(round(c/i,2))+" / Taille de L1 (n) = "+str(len(self.L1))+" / log2(n) = "+str(round(log(len(self.L1))/log(2),2))+"\n") del self.L1, self.L2, self.dicoPos #sys.stderr.write(str(LRes1+LRes2)+"\n") dicoBest1={} dicoBest2={} LRes=[] LResBC=[] for x in LRes1: dicoBest1[x[0]]=1 for x in LRes2: dicoBest2[x[0]]=1 for i in L1: if dicoBest1.has_key(i[0]):LRes.append(i) else:LResBC.append(i) for i in L2: if dicoBest2.has_key(i[0]):LRes.append(i) else:LResBC.append(i) return LRes,LResBC def trouver_deplacement(self,compt,deb1,fin1,deb2,fin2): # deb1, fin1 début et fin de la liste 1 courante, idem pour liste 2 #sys.stderr.write(str(compt)+" trouver_deplacement("+str(deb1)+","+str(fin1)+","+str(deb2)+","+str(fin2)+")\n") #sys.stderr.flush() compt+=1 ecartMaxInit = max(fin1-deb2,fin2-deb1) #ecart maximal initial de positions #ecartMax = ecartMaxInit #longMax = 0 # longueur maximale initiale de bloc scoreMax = 0 # score max initial LTemp1 = self.L1[deb1:fin1] # liste 1 courante PosB1 = deb1 # position du 1er bloc de la liste 1 courante PosCandidatB1 = PosCandidatB2 = 0 # position des blocs candidats de chaque liste candidatB = None # bloc candidat pour l'appariement for Bi in LTemp1: for (PosB2,longB2) in self.dicoPos[self.texte[Bi[0]:Bi[1]]]: ecart = abs(PosB2 - PosB1) # ecart entre les 2 blocs #if (deb2<=PosB2<=fin2 and ecartlongMax): #a = (ecartMaxInit-ecart) #b = longB2 score = 0*(ecartMaxInit-ecart) + 1*longB2 #if (a+2*b)==0:break #score = (2*2*a*b)/(a+2*b) if (deb2<=PosB2<=fin2 and score>scoreMax): #sys.stderr.write(self.texte[Bi[0]:Bi[1]]+" / "+str(longB2)+" TOUCHED\n") candidatB=Bi #ecartMax=ecart #longMax=longB2 scoreMax=score PosCandidatB1=PosB1 PosCandidatB2=PosB2 PosB1+=1 if candidatB != None: # si on a trouvé un candidat on applique la récurence # sinon on renvoie les listes courantes La1,La2 = self.trouver_deplacement(compt,deb1,PosCandidatB1,deb2,PosCandidatB2) Lb1,Lb2 = self.trouver_deplacement(compt,PosCandidatB1+1,fin1,PosCandidatB2+1,fin2) #if (deb1==PosCandidatB1 or deb2==PosCandidatB2): # La1,La2=self.L1[deb1:PosCandidatB1],self.L2[deb2:PosCandidatB2] # else: # La1,La2 = self.trouver_deplacement(compt,deb1,PosCandidatB1,deb2,PosCandidatB2) # if (PosCandidatB1+1>=fin1 or PosCandidatB2+1>=fin2): # Lb1,Lb2=self.L1[PosCandidatB1+1:fin1],self.L2[PosCandidatB2+1:fin2] # else: # Lb1,Lb2 = self.trouver_deplacement(compt,PosCandidatB1+1,fin1,PosCandidatB2+1,fin2) return La1+Lb1,La2+Lb2 else: return self.L1[deb1:fin1],self.L2[deb2:fin2] class AlignAstar(Align): """ Alignement avec ordre partiel et A* """ def calcPosCoutFixe(self, L): """ Calcule des dictionnaires des poistions et des couts fixes pre: isinstance(L,list) len(L)>0 post: forall([self.texte[x[0]:x[1]] in __return__[0].keys() for x in L]) """ dicoPos = {} coutFixe = {} cumul=0 for i in xrange(len(L)): chaine = self.texte[L[i][0]:L[i][1]] if not dicoPos.has_key(chaine): dicoPos[chaine] = [] dicoPos[chaine].append(i) # ajout de sa position coutFixe[i]=cumul #cumul des couts (leur longueur) des blocs précédents cumul+=L[i][1]-L[i][0] return dicoPos,coutFixe def deplacements_pond(self, L1, L2): # dicoPos1 stocke pour chaque bloc de L1 ses positions dans cette liste dicoPos = {} coutFixe = {} dicoPos[1], coutFixe[1] = self.calcPosCoutFixe(L1) dicoPos[2], coutFixe[2] = self.calcPosCoutFixe(L2) diffSym = self.preTraitDiffSym(L1,L2,dicoPos) # pré-calcul des différences symétriques best, closedSet = self.deplacements_pond_Astar(L1,L2,diffSym,dicoPos,coutFixe) # A* dicoBest1={} # dico des positions dans L1 du meilleur parcours dicoBest2={} # dico des positions dans L2 du meilleur parcours while best != (-1,-1): # on remonte le meilleur parcours trouvé dicoBest1[best[0]]=1 dicoBest2[best[1]]=1 best=closedSet[best][1] # noeud père LResDep=[] LResBC=[] for i in xrange(len(L1)): if not dicoBest1.has_key(i):LResDep.append(L1[i]) else:LResBC.append(L1[i]) for i in xrange(len(L2)): if not dicoBest2.has_key(i):LResDep.append(L2[i]) else:LResBC.append(L2[i]) del dicoPos, coutFixe, dicoBest1, dicoBest2 return LResDep,LResBC def deplacements_pond_Astar(self, L1Static, L2Static, diffSym, dicoPos, coutFixe): """ implémentation de A* pour rechercher le meilleur chemin dans l'arbre des appariement entre blocs Renvoie le noeud du dernier appariement """ openSet={} # ensemble des sommets ouverts pour A* closedSet={} # ensemble des sommets fermés pour A* L1=L1Static # liste L1 courante L2=L2Static best=(-1,-1) while True: for posB1 in xrange(len(L1)): for posB2 in dicoPos[2][self.texte[L1[posB1][0]:L1[posB1][1]]]: if posB2cout: openSet[node]=(cout,best) del closedSet[node] elif openSet.has_key(node): if openSet[node][0]>cout: openSet[node]=(cout,best) else: openSet[node]=(cout,best) best=None # noeud (posL1, posL2) bestValue=None # valeur (cout,pere) for node in openSet: # recherche du noeud de moindre cout if (best==None or openSet[node][0]0 """ diffSym={} # dico[(i,j)] stocke le cout heuristique et la longueur de la liste résultant # de la différence symétrique entre L1[i+1:] et L2[j+1:] if niveau>0: #sys.stderr.write(str(dicoPos)+"\n") #print self.pr(L1)+"\n"+self.pr(L2)+"\n" #sys.stderr.flush() pass posB1=0 for B1 in L1: for posB2 in dicoPos[2][self.texte[B1[0]:B1[1]]]: L=self.difference_symetrique(L1[posB1+1:],L2[posB2+1:],posB1,posB2,dicoPos) cout=0 for B in L: cout += B[1]-B[0] diffSym[(posB1,posB2)]=(cout,len(L)) posB1+=1 return diffSym def difference_symetrique(self,L1,L2,deb1,deb2,dicoPos): """ différence symétrique entre 2 listes de blocs pre: forall([self.texte[x[0]:x[1]] in dicoPos[1].iterkeys() for x in L1]) forall([self.texte[x[0]:x[1]] in dicoPos[2].iterkeys() for x in L1]) forall([self.texte[x[0]:x[1]] in dicoPos[1].iterkeys() for x in L2]) forall([self.texte[x[0]:x[1]] in dicoPos[2].iterkeys() for x in L2]) len(self.tool.difference(L1,L2))>0 forall(L1, lambda x:self.texte[x[0]:x[1]] in dicoPos[1].iterkeys()) forall(L1, lambda x:self.texte[x[0]:x[1]] in dicoPos[2].iterkeys()) forall(L2, lambda x:self.texte[x[0]:x[1]] in dicoPos[1].iterkeys()) forall(L2, lambda x:self.texte[x[0]:x[1]] in dicoPos[2].iterkeys()) """ if (len(L1)==0 and len(L2)==0): return [] elif (len(L1)==0): return L2 elif (len(L2)==0): return L1 LRes=[] for B1 in L1: found=False for posB2 in dicoPos[2][self.texte[B1[0]:B1[1]]]: if posB2>=deb2+1: found=True if not found: LRes.append(B1) for B2 in L2: found=False for posB1 in dicoPos[1][self.texte[B2[0]:B2[1]]]: if posB1>=deb1+1: found=True if not found: LRes.append(B2) #sys.stderr.write("\ndiff_sym L1="+str(L1)+"\nL2="+str(L2)+"\n="+str(LRes)+"\n") return LRes class Mset_(set): def evaluate(self): res = 0 for cle,val in self: res += val return res class Mset(dict): """Multiset implémenté par un dictionnaire""" def __init__(self, liste=[]): assert isinstance(liste, list) self.valeur = 0 self.length = 0 for (val,deb,fin) in liste: try: nb,longueur = self[val] self[val] = nb+1,longueur except KeyError: self[val] = (1, fin-deb) self.valeur += fin - deb self.length += 1 def difference(self, mset2, renvoieReste=False): """Différence entre l'objet courant et mset2, renvoie un nouveau mset""" assert isinstance(mset2, Mset) res = Mset() #; reste = MSet() for cle,(nb, longueur) in self.iteritems(): if cle not in mset2: res[cle] = (nb, longueur) res.valeur += nb * longueur res.length += nb else: (nb2, longueur2) = mset2[cle] if nb > nb2: res[cle] = (nb-nb2, longueur) res.valeur += (nb-nb2) * longueur res.length += nb-nb2 return res def difference_liste(self, mset2, renvoieReste=False): assert isinstance(mset2, list) res = Mset() #; reste = MSet() for cle,(nb, longueur) in self.iteritems(): if cle not in mset2: res[cle] = (nb, longueur) res.valeur += nb * longueur res.length += nb else: (nb2, longueur2) = mset2[cle] if nb > nb2: res[cle] = (nb-nb2, longueur) res.valeur += (nb-nb2) * longueur res.length += nb-nb2 return res def intersection(self, mset2): """Intersection entre l'objet courant et mset2, renvoie un nouveau mset""" assert isinstance(mset2, Mset) res = Mset() if self.length <= mset2.length: grand = mset2 ; petit = self else: grand = self ; petit = mset2 for cle,(nb, longueur) in petit.iteritems(): try: (nb2, longueur2) = grand[cle] if nb <= nb2: nbToAdd = nb else: nbToAdd = nb2 res[cle] = (nbToAdd, longueur) res.valeur += nbToAdd * longueur res.length += nbToAdd except KeyError: pass return res def union(self, mset2): """Union entre l'objet courant et mset2, ATTENTION modifie l'objet courant""" assert isinstance(mset2, Mset) for cle,(nb, longueur) in mset2.iteritems(): try: nb2, longueur2 = self[cle] self[cle] = nb+nb2, longueur except KeyError: self[cle] = nb, longueur self.valeur += nb * longueur self.length += nb class AlignAstar2(Align): """ texte passé en paramétre des fonctions """ def calcPosCoutFixe(self, L): #, texte): """ Calcule des dictionnaires des positions et des couts fixes pre: isinstance(L,list) len(L)>0 #post: forall([texte[x[0]:x[1]] in __return__[0].keys() for x in L]) """ dicoPos = {} coutFixe = {} cumul=0 for i in xrange(len(L)): #chaine = texte[L[i][0]:L[i][1]] #longueur = L[i][1]-L[i][0] chaine, longueur = L[i] try: dicoPos[chaine].append(i) # ajout de sa position except KeyError: dicoPos[chaine] = [i] coutFixe[i]=cumul #cumul des couts (leur longueur) des blocs précédents cumul+=longueur #L[i][1]-L[i][0] return dicoPos,coutFixe def deplacements_pond(self, Lv1, Lv2): L1 = [] ; L2 = [] ; Lcf1 = [] ; Lcf2 = [] ; LH1 = [] ; LH2 = [] for bloc in Lv1: cle = hash(self.texte[bloc[0]:bloc[1]]) L1.append(cle) Lcf1.append((cle,bloc[1]-bloc[0])) LH1.append((cle, bloc[0], bloc[1])) print LH1 logging.debug('init L1 done') for bloc in Lv2: cle = hash(self.texte[bloc[0]:bloc[1]]) L2.append(cle) Lcf2.append((cle,bloc[1]-bloc[0])) LH2.append((cle, bloc[0], bloc[1])) logging.debug('init L2 done') # dicoPos1 stocke pour chaque bloc de L1 ses positions dans cette liste dicoPos = {} coutFixe = {} dicoPos[1], coutFixe[1] = self.calcPosCoutFixe(Lcf1,self.texte) dicoPos[2], coutFixe[2] = self.calcPosCoutFixe(Lcf2,self.texte) for cle in dicoPos[1]: assert cle in dicoPos[2] for cle in dicoPos[2]: assert cle in dicoPos[1] diffSym = self.preTraitDiffSym(LH1,LH2,self.texte,dicoPos) # pré-calcul des différences symétriques best, closedSet = self.deplacements_pond_Astar(L1,L2,self.texte,diffSym,dicoPos,coutFixe) # A* dicoBest1={} # dico des positions dans L1 du meilleur parcours dicoBest2={} # dico des positions dans L2 du meilleur parcours while best != (-1,-1): # on remonte le meilleur parcours trouvé dicoBest1[best[0]]=1 dicoBest2[best[1]]=1 best=closedSet[best][1] # noeud père LResDep=[] LResBC=[] for i in xrange(len(L1)): if not dicoBest1.has_key(i):LResDep.append(L1[i]) else:LResBC.append(L1[i]) for i in xrange(len(L2)): if not dicoBest2.has_key(i):LResDep.append(L2[i]) else:LResBC.append(L2[i]) del dicoPos, coutFixe, dicoBest1, dicoBest2 return LResDep,LResBC def deplacements_pond_Astar_(self, L1Static, L2Static, diffSym, dicoPos, coutFixe): """ implémentation de A* pour rechercher le meilleur chemin dans l'arbre des appariement entre blocs Renvoie le noeud du dernier appariement """ openSet={} # ensemble des sommets ouverts pour A* closedSet={} # ensemble des sommets fermés pour A* L1=L1Static # liste L1 courante L2=L2Static logging.debug('len(L1)='+str(len(L1))+' / len(L2)='+str(len(L2))) #logging.debug('openSet = '+str(openSet)) #logging.debug('closedSet = '+str(closedSet)) best=(-1,-1) while True: cptEval = 0 for posB1 in xrange(len(L1)): #for posB2 in dicoPos[2][texte[L1[posB1][0]:L1[posB1][1]]]: for posB2 in dicoPos[2][L1[posB1]]: if posB2cout: openSet[node]=(cout,best) del closedSet[node] elif openSet.has_key(node): if openSet[node][0]>cout: openSet[node]=(cout,best) else: openSet[node]=(cout,best) best=None # noeud (posL1, posL2) bestValue=None # valeur (cout,pere) for node in openSet: # recherche du noeud de moindre cout if (best==None or openSet[node][0]len_L1-10 or i%100==0): logging.log(5,'itération LH1: '+str(i)) LL1 = Mset(LH1[posB1+1:]) if (i<10 or i>len_L1-10 or i%100==0): logging.log(5,' done MSet1') liste_pos2 = dicoPos[2][LH1[posB1][0]]#texte[L1[posB1][0]:L1[posB1][1]]] j = len(liste_pos2)-1 while j >= 0: if j == len(liste_pos2)-1: LL2 = Mset(LH2[liste_pos2[j]+1:]) ; ds = nb = 0 LL1res = LL1.difference(LL2) LL2res = LL2.difference(LL1) else: posB2 = liste_pos2[j] next_posB2 = liste_pos2[j+1] #(ds,nb,LL1,LL2) = diffSym[(posB1,next_posB2)] #diffSym[(posB1,next_posB2)] = (ds,nb) #new_LL1 = Mset(LH1[posB2+1:next_posB2+1]) #LL1res = LL1res.difference(LL2res.intersection(new_LL1)) #LL2res = LL2res.difference(new_LL1) new_LL2 = Mset(LH2[posB2+1:next_posB2]) t2 = new_LL2.difference(LL1res) LL2res.union(t2) LL1res = LL1res.difference(new_LL2) ds2 = LL1res.valeur + LL2res.valeur nb2 = LL1res.length + LL2.length #ds2 = LL1res.evaluate() + LL2res.evaluate() #nb2 = len(LL1res) + len(LL2res) diffSym[(posB1,liste_pos2[j])] = (ds2,nb2)#,LL1res, LL2res) j -= 1 diffSym[(posB1,liste_pos2[0])] = (ds2,nb2) if (i<10 or i>len_L1-10 or i%100==0): logging.log(5,' done itération interne') i+=1 return diffSym def preTraitDiffSymVect(self,L1,L2,dicoPos,niveau=0): logging.log(5,'begin preTraitDiffSymVect') diffSym={} # dico[(i,j)] stocke le cout heuristique et la longueur de la liste résultant de la différence symétrique entre L1[i+1:] et L2[j+1:] LH1=L1 ; LH2=L2 logging.log(5,'len(LH2)= '+str(len(LH2))+' / len(LH1)= '+str(len(LH1))) dic_alphabet = {} for cle,deb,fin in L1: dic_alphabet[cle] = fin-deb for cle,deb,fin in L2: dic_alphabet[cle] = fin-deb taille_alphabet = len(dic_alphabet) temp = [] for cle in dic_alphabet: bisect.insort_right(temp, cle) array_pos_alphabet = numarray.array(temp) # ordre des lettres de l'alphabet assert len(array_pos_alphabet) == taille_alphabet array_valeur_alphabet = numarray.zeros(taille_alphabet) # poids de chaque lettre for pos in xrange(taille_alphabet):# MAJ dico avec position cle = array_pos_alphabet[pos] val = dic_alphabet[cle] #dic_alphabet[cle] = val,pos dic_alphabet[cle] = pos array_valeur_alphabet[pos] = val LH1 = [cle for cle,deb,fin in LH1] LHH1 = [dic_alphabet[cle] for cle in LH1] #logging.debug('len(LHH1)=%d, taille_alphabet=%d,len(LHH1)*taille_alphabet=%d',len(LHH1), taille_alphabet,len(LHH1)*taille_alphabet) #array_LH1 = numarray.zeros((len(LHH1),taille_alphabet),numarray.Int8) #current = numarray.zeros(taille_alphabet) #accu = numarray.zeros(taille_alphabet) #logging.debug('i=%d',len(LHH1)) #for i in xrange(len(LHH1)-1,-1,-1): # if i%1000 ==0: logging.debug('i=%d',i) # pos = LHH1[i] # numarray.where(numarray.arange(taille_alphabet)==pos,1,0,out=current) # numarray.add(accu,current,accu) # array_LH1[i] = accu #logging.debug('array_LH1 done') LH2 = [cle for cle,deb,fin in LH2] LHH2 = [dic_alphabet[cle] for cle in LH2] #array_LH2 = numarray.zeros((len(LHH2),taille_alphabet),numarray.Int8) #current = numarray.zeros(taille_alphabet) #accu = numarray.zeros(taille_alphabet) #for i in xrange(len(LHH2)-1,-1,-1): # pos = LHH2[i] # numarray.where(numarray.arange(taille_alphabet)==pos,1,0,out=current) # numarray.add(accu,current,accu) # array_LH2[i] = accu #logging.debug('array_LH2 done') logging.log(4,'LH1 = '+str(LH1)) logging.log(4,'LH2 = '+str(LH2)) logging.log(4,'array_pos_alphabet = '+str(array_pos_alphabet)) logging.log(4,'array_valeur_alphabet = '+str(array_valeur_alphabet)) logging.log(4,'dic_alphabet = '+str(dic_alphabet)) #for pos in xrange(taille_alphabet): # cle = array_pos_alphabet[pos] # val = dic_alphabet[cle] # array_valeur_alphabet[pos] = val logging.log(5,'creation alphabet done') current = numarray.zeros(taille_alphabet) accu = numarray.zeros(taille_alphabet) def liste_to_alphab_array(liste,array): #c=0 #for cle,deb,fin in liste: #for cle in liste: #for pos in liste: #val,pos = dic_alphabet[cle] #array[pos] += 1 #array[dic_alphabet[cle]] += 1 # array[pos] += 1 #c +=1 numarray.logical_and(current,array_zero, current) numarray.logical_and(accu,array_zero, accu) for i in xrange(len(liste)-1,-1,-1): pos = LHH2[i] numarray.where(numarray.arange(taille_alphabet)==pos,1,0,out=current) numarray.add(accu,current,accu) array = accu #map(lambda pos:array[pos]=array[pos]+1,liste) #for p in [dic_alphabet[cle] for cle in liste]: array[p] +=1 #j=0 #for i in array: assert i>=0,i ; j+=i #print Numeric.sum(array), numarray.add.reduce(array),j #assert len(liste) == c assert len(liste) == numarray.sum(array),str(len(liste))+' / '+str(numarray.sum(array)) def alphab_array_val_length(array): val = length = 0 for pos in xrange(taille_alphabet): nb = max(0,array[pos]) length += nb val += nb * array_valeur_alphabet[pos] #if nb>0: assert val>0 #assert length == numarray.sum(array) assert 0 <= length <= val #if val == 1769: # print liste # print array return val, length len_L1 = len(LH1) ; len_L2 = len(LH2) i=0 array_zero = numarray.zeros(taille_alphabet)#,numarray.UInt8) array_inter = numarray.zeros(taille_alphabet)#,numarray.UInt8) LL1 = numarray.zeros(taille_alphabet)#,numarray.UInt8) LL2 = numarray.zeros(taille_alphabet)#,numarray.UInt8) LL1res = numarray.zeros(taille_alphabet)#,numarray.UInt8) LL2res = numarray.zeros(taille_alphabet)#,numarray.UInt8) new_LL1 = numarray.zeros(taille_alphabet)#,numarray.UInt8) for posB1 in xrange(len_L1-1,-1,-1): if (i<10 or i>len_L1-10 or i%100==0): logging.log(5,'itération LH1: '+str(i)) if i == 300: break #LL1 = numarray.zeros(taille_alphabet,numarray.Int8) numarray.logical_and(LL1,array_zero, LL1) assert numarray.sum(LL1) == 0 liste_to_alphab_array(LHH1[posB1+1:], LL1) #LL1 = array_LH1[posB1+1] #for pos in LHH1[posB1+1:]: # LL1[pos] += 1 #logging.log(4,'LL1 = '+str(LL1)) if (i<10 or i>len_L1-10 or i%100==0): logging.log(5,' done MSet1') #liste_pos2 = dicoPos[2][LH1[posB1][0]]#texte[L1[posB1][0]:L1[posB1][1]]] liste_pos2 = dicoPos[2][LH1[posB1]] j = len(liste_pos2)-1 while j >= 0: #posB2 = liste_pos2[j] #LL2 = array_LH2[posB2] #liste_to_alphab_array(LHH2[posB2+1:], LL2) #numarray.subtract(LL1,LL2, LL1res) #numarray.subtract(LL2,LL1, LL2res) if j == len(liste_pos2)-1: numarray.logical_and(LL2,array_zero, LL2) numarray.logical_and(LL1res,array_zero, LL1res) numarray.logical_and(LL2res,array_zero, LL2res) assert numarray.sum(LL2) == numarray.sum(LL1res) == numarray.sum(LL2res) == 0 #LL2 = liste_to_alphab_array(LHH2[liste_pos2[j]+1:], LL2) ; ds = nb = 0 #LL2 = array_LH2[liste_pos2[j]+1] #for pos in LHH2[liste_pos2[j]+1:]: # LL2[pos] += 1 #logging.log(4,'LL2 = '+str(LL2)) #LL1res = numarray.subtract(LL1,LL2, LL1res) #numarray.maximum(LL1res,array_zero,LL1res) #LL2res = numarray.subtract(LL2,LL1, LL2res) #numarray.maximum(LL2res,array_zero,LL2res) #logging.log(4,'LL1res = '+str(LL1res)) #logging.log(4,'LL2res = '+str(LL2res)) else: posB2 = liste_pos2[j] next_posB2 = liste_pos2[j+1] numarray.logical_and(new_LL1,array_zero, new_LL1) numarray.logical_and(array_inter,array_zero, array_inter) assert numarray.sum(new_LL1) == numarray.sum(array_inter) == 0 #new_LL1 = liste_to_alphab_array(LHH1[posB2+1:next_posB2+1], new_LL1) #new_LL1 = array_LH1[liste_pos2[j]+1] #for pos in LHH1[posB2+1:next_posB2+1]: # new_LL1[pos] += 1 # new_LL1 = Mset(LH1[posB2+1:next_posB2+1]) # LL1res = LL1res.difference(LL2res.intersection(new_LL1)) # LL2res = LL2res.difference(new_LL1) numarray.minimum(LL2res,new_LL1,array_inter) #numarray.maximum(array_inter,array_zero,array_inter) numarray.subtract(LL1res,array_inter, LL1res) #numarray.maximum(LL1res,array_zero,LL1res) numarray.subtract(LL2res,new_LL1, LL2res) #numarray.maximum(LL2res,array_zero,LL2res) #logging.log(4,'array_inter = '+str(array_inter)) #logging.log(4,'LL1res = '+str(LL1res)) #logging.log(4,'LL2res = '+str(LL2res)) #ds2 = LL1res.valeur + LL2res.valeur #nb2 = LL1res.length + LL2.length d1,n1 = alphab_array_val_length(LL1res) d2,n2 = alphab_array_val_length(LL2res) ds2 = d1 + d2 ; nb2 = n1 + n2 diffSym[(posB1,liste_pos2[j])] = (ds2,nb2)#,LL1res, LL2res) j -= 1 diffSym[(posB1,liste_pos2[0])] = (ds2,nb2) if (i<10 or i>len_L1-10 or i%100==0): logging.log(5,' done itération interne') i+=1 #logging.debug('diffSym = '+str(diffSym)) return diffSym def preTraitDiffSym2(self,L1,L2,texte,dicoPos,niveau=0): r='''class dicH(weakref.WeakKeyDictionary): def __init__(self, dico=None): if dico is None: self.totalItem = 0 self.valeur = 0 else: for cle,nbItem in dico.iteritems(): self[cle] = nbItem #[pos for pos in liste_pos] self.totalItem = dico.totalItem self.valeur = dico.valeur''' def _addBloc(bloc): (cle, debut, fin) = bloc try: #(nbItem, valeur) = self[cle] dico[cle] += 1 #(nbItem+1,valeur) except KeyError: dico[cle] = 1 #(1, fin-debut) #[debut] dico['valeur'] += fin - debut dico['totalItem'] += 1 def _testBloc(bloc): (cle, debut, fin) = bloc try: #(nbItem, valeur) = self[cle] # liste_pos_bloc = self[cle] #liste_pos_bloc.pop() #if len(liste_pos_bloc) == 0: # del self[cle] if diffSymCourante[cle] == 1: del diffSymCourante[cle] else: diffSymCourante[cle] -= 1 #(nbItem-1, valeur) diffSymCourante['valeur'] -= fin - debut diffSymCourante['totalItem'] -= 1 except KeyError: diffSymCourante[cle] = 1 #[debut] diffSymCourante['valeur'] += fin - debut diffSymCourante['totalItem'] += 1 diffSym={} # dico[(i,j)] stocke le cout heuristique et la longueur de la liste résultant # de la différence symétrique entre L1[i+1:] et L2[j+1:] LH1 = [] ; LH2 = [] for bloc in L1: LH1.append((hash(texte[bloc[0]:bloc[1]]), bloc[0], bloc[1])) logging.debug('init LH1 done') for bloc in L2: LH2.append((hash(texte[bloc[0]:bloc[1]]), bloc[0], bloc[1])) logging.debug('init LH2 done') dico = {'valeur':0, 'totalItem':0} #{} #dicH() # #dico.valeur = 0 ; dico.totalItem = 0 #for i in xrange(2,len(LH1)-1): # _addBloc(LH1[i+1]) #assert dico['totalItem'] == len(L1)-3 ld=i=0 logging.debug('len(LH2)= '+str(len(LH2))+' / len(LH1)= '+str(len(LH1))) for pos2 in xrange(len(LH2)-1,-1,-1): if (i<10 or i>len(LH2)-10 or i%10==0): logging.debug('itération LH2: '+str(i));k=True else:k=False if pos2 < (len(LH2)-1): _addBloc(LH2[pos2+1]) assert dico['totalItem'] == ld + 1 ; ld+=1 #print 'totalItem=',dico#['totalItem'] if pos2 > 0: diffSymCourante = weakref.WeakKeyDictionary(dico.copy()) else: diffSymCourante = dico logging.debug(' copie dico done') j=0 for pos1 in xrange(len(LH1)-1,-1,-1): #if k and (j<10 or j>len(LH1)-10 or j%10==0): logging.debug(' itération LH1: '+str(j)) if pos1 < (len(LH1)-1): _testBloc(LH1[pos1+1]) diffSym[(pos1,pos2)] = (diffSymCourante['valeur'],diffSymCourante['totalItem']) j+=1 logging.debug(' itération interne done') i+=1 a='''for pos2 in xrange(len(LH2)-1,-1,-1): if pos2 < len(LH2)-1: _testBloc(LH2[pos2+1], True) diffSymCourante = dico.copy() prev_taille = diffSymCourante['totalItem'] for pos1 in xrange(len(LH1)): if pos1 < len(LH1)-1: _testBloc(LH1[pos1+1]) diffSym[(pos1,pos2)] = (diffSymCourante['valeur'],diffSymCourante['totalItem']) #assert diffSymCourante['totalItem'] < prev_taille, str(diffSymCourante['totalItem'])+' '+str(prev_taille)''' assert len(diffSym) == len(L1) * len(L2), str(len(diffSym))+' '+str(len(L1))+' '+str(len(L2)) #str1 = '' ; str2 = '' #for i in xrange(len(L1)): # str1 += str(diffSym[(i,0)]) +' ' # str2 += str(diffSym[(i,len(LH2)-1)]) +' ' #print str1 #print str2 return diffSym def preTraitDiffSym__(self,L1,L2,texte,dicoPos,niveau=0): def _addBloc(bloc): cle = hash(texte[bloc[0]:bloc[1]]) try: dico[cle].append(bloc[0]) except KeyError: dico[cle] = [bloc[0]] dico['valeur'] += bloc[1] - bloc[0] dico['nbItem'] += 1 diffSym={} # dico[(i,j)] stocke le cout heuristique et la longueur de la liste résultant # de la différence symétrique entre L1[i+1:] et L2[j+1:] L = {1:L1, 2:L2} tabDiffSym = {} dico = {'valeur':0, 'nbItem':0} tabDiffSym[(len(L1),-1)] = {} for i in xrange(len(L1)-1,-1,-1): _addBloc(L1[i]) dico_courant = dico.copy() tabDiffSym[(i,-1)] = dico_courant assert dico['nbItem'] == len(L1) tabDiffSym[(len(L1),-1)] = {'valeur':0, 'nbItem':0} ; bloc = {} for j in xrange(len(L2)-1,-1,-1): bloc[2] = L2[j] for i in xrange(len(L1)-1,-1,-1): bloc[1] = L1[i] diffSymCourante = tabDiffSym[(i+1,-1)] #print diffSymCourante for k in [1,2]: cle = hash(texte[bloc[k][0]:bloc[k][1]]) try: liste_pos_bloc = diffSymCourante[cle] try: liste_pos_bloc.pop() except IndexError: del diffSymCourante[cle] diffSymCourante['valeur'] -= bloc[k][1] - bloc[k][0] diffSymCourante['nbItem'] -= 1 except KeyError: diffSymCourante[cle] = [bloc[k][0]] diffSymCourante['valeur'] += bloc[k][1] - bloc[k][0] diffSymCourante['nbItem'] += 1 tabDiffSym[(i,0)] = diffSymCourante diffSym[(i,j)] = (diffSymCourante['valeur'],diffSymCourante['nbItem']) for i in xrange(len(L1)): tabDiffSym[(i,-1)] = tabDiffSym[(i,0)] del tabDiffSym[(i,0)] dico = tabDiffSym[(len(L1),-1)] _addBloc(L2[j]) tabDiffSym[(len(L1),-1)] = dico assert len(tabDiffSym) == len(L1)+1 assert len(diffSym) == len(L1) * len(L2),str(len(diffSym))+str(len(L1))+str(len(L2)) return diffSym def preTraitDiffSym_(self,L1,L2,texte,dicoPos,niveau=0): """ Précalcul de toutes les différences symétriques possibles. Ainsi chacune est calculée une seule fois et non un nombre combinatoire de fois si on faisait le calcul à la demande pre: forall([texte[x[0]:x[1]] in dicoPos[1].keys() for x in L1]) forall([texte[x[0]:x[1]] in dicoPos[2].keys() for x in L1]) forall([texte[x[0]:x[1]] in dicoPos[1].keys() for x in L2]) forall([texte[x[0]:x[1]] in dicoPos[2].keys() for x in L2]) len(self.tool.difference(L1,L2))>0 """ diffSym={} # dico[(i,j)] stocke le cout heuristique et la longueur de la liste résultant # de la différence symétrique entre L1[i+1:] et L2[j+1:] posB1=0 for B1 in L1: for posB2 in dicoPos[2][texte[B1[0]:B1[1]]]: L=self.difference_symetrique(L1[posB1+1:],L2[posB2+1:],texte,posB1,posB2,dicoPos) cout=0 for B in L: cout += B[1]-B[0] diffSym[(posB1,posB2)]=(cout,len(L)) posB1+=1 assert len(diffSym) <= len(L1) * len(L2),str(len(diffSym))+' '+str(len(L1))+' '+str(len(L2)) return diffSym def difference_symetrique(self,L1,L2,texte,deb1,deb2,dicoPos): """ différence symétrique entre 2 listes de blocs pre: forall([texte[x[0]:x[1]] in dicoPos[1].iterkeys() for x in L1]) forall([texte[x[0]:x[1]] in dicoPos[2].iterkeys() for x in L1]) forall([texte[x[0]:x[1]] in dicoPos[1].iterkeys() for x in L2]) forall([texte[x[0]:x[1]] in dicoPos[2].iterkeys() for x in L2]) len(self.tool.difference(L1,L2))>0 forall(L1, lambda x:texte[x[0]:x[1]] in dicoPos[1].iterkeys()) forall(L1, lambda x:texte[x[0]:x[1]] in dicoPos[2].iterkeys()) forall(L2, lambda x:texte[x[0]:x[1]] in dicoPos[1].iterkeys()) forall(L2, lambda x:texte[x[0]:x[1]] in dicoPos[2].iterkeys()) """ if (len(L1)==0 and len(L2)==0): return [] elif (len(L1)==0): return L2 elif (len(L2)==0): return L1 LRes=[] for B1 in L1: found=False for posB2 in dicoPos[2][texte[B1[0]:B1[1]]]: if posB2>=deb2+1: found=True if not found: LRes.append(B1) for B2 in L2: found=False for posB1 in dicoPos[1][texte[B2[0]:B2[1]]]: if posB1>=deb1+1: found=True if not found: LRes.append(B2) #sys.stderr.write("\ndiff_sym L1="+str(L1)+"\nL2="+str(L2)+"\n="+str(LRes)+"\n") return LRes a=''' class AlignAstarRecur3(AlignAstar): def __init__(self,texte,l_texte1,long_min_pivots): AlignAstar.__init__(self,texte) self.long_min_pivots=long_min_pivots self.l_texte1 = l_texte1 def deplacements_pond(self, L1, L2): """ Fonction qui aligne 2 listes d'intervalles du texte pre: isinstance(L1,list) isinstance(L2,list) len(L1)>0 len(L2)>0 forall([self.l_texte1 >= L1[i][0] >= L1[i-1][1] >= 0 for i in range(1, len(L1))]) forall([len(self.texte) >= L2[i][0] >= L2[i-1][1] >= self.l_texte1 for i in range(1, len(L2))]) post:forall([len(self.texte) >=__return__[1][i][0] >= __return__[1][i-1][1] >= 0 for i in range(1, len(__return__[1]))]) """ #forall([len(self.texte) >=__return__[0][i][0] >= __return__[0][i-1][1] >= 0 for i in range(1, len(__return__[0]))]) #forall([len(self.texte) >=__return__[2][i][0] >= __return__[2][i-1][1] >= 0 for i in range(1, len(__return__[2]))]) LResDep1,LResDep2,LResBC1,LResBC2 = self.deplacements_pond__(L1, L2, 0,self.l_texte1,self.l_texte1,len(self.texte), 0) LDep,LUnique = self.removeUnique(LResDep1+LResDep2) return LDep,LResBC1+LResBC2,LUnique def removeUnique(self,L): """ Scinde L en 2 listes, une pour les chaines ayant plusieurs occurences et l'autre pour celles en ayant une seule #pre: forall([len(self.texte) >= L[i][0] >= L[i-1][1] >= 0 for i in range(1, len(L))]) #post: forall([len(self.texte) >=__return__[0][i][0] >= __return__[0][i-1][1] >= 0 for i in range(1, len(__return__[0]))]) # forall([len(self.texte) >=__return__[1][i][0] >= __return__[1][i-1][1] >= 0 for i in range(1, len(__return__[1]))]) """ dicDep = {} for x in L: t=self.texte[x[0]:x[1]] if not dicDep.has_key(t): dicDep[t]=[] dicDep[t].append(x[0]) LDep=[] LUnique=[] for clef in dicDep: locc=dicDep[clef] if self.repetition(locc,self.l_texte1): for occ in locc: LDep = self.tool.addition_intervalle(LDep,[occ, occ+len(clef)]) else: for occ in locc: LUnique = self.tool.addition_intervalle(LUnique,[occ, occ+len(clef)])#sans fusion return LDep,LUnique def deplacements_pond__(self, L1, L2, debT1, debT2, finT1, finT2, niveau): """ Fonction qui aligne 2 listes d'intervalles du texte pre: isinstance(L1,list) isinstance(L2,list) len(L1)>0 len(L2)>0 forall([debT1 >= L1[i][0] >= L1[i-1][1] >= finT1 for i in range(1, len(L1))]) forall([debT2 >= L2[i][0] >= L2[i-1][1] >= finT2 for i in range(1, len(L2))]) #post[debT1, debT2]: # forall([finT1>=__return__[2][i][0] >= __return__[2][i-1][1] >= __old__.debT1 for i in range(1, len(__return__[2]))]) # forall([finT2>=__return__[3][i][0] >= __return__[3][i-1][1] >= __old__.debT2 for i in range(1, len(__return__[3]))]) #forall([finT1>=__return__[0][i][0] >= __return__[0][i-1][1] >= __old__.debT1 for i in range(1, len(__return__[0]))]) #forall([finT2>=__return__[1][i][0] >= __return__[1][i-1][1] >= __old__.debT2 for i in range(1, len(__return__[1]))]) """ # dicoPos[1] stocke pour chaque bloc de L1 ses positions dans cette liste dicoPos = {} coutFixe = {} dicoPos[1], coutFixe[1] = self.calcPosCoutFixe(L1) dicoPos[2], coutFixe[2] = self.calcPosCoutFixe(L2) if niveau>0: #sys.stderr.write("t1="+self.texte[debT1:finT1]+"\n") #sys.stderr.write("t2="+self.texte[debT2:finT2]+"\n") #assert(dicoPos[1].has_key('.on.')) #sys.stderr.flush() pass for x in dicoPos[1]: assert(x in self.texte[debT1:finT1]) assert(x in self.texte[debT2:finT2]) for x in dicoPos[2]: assert(x in self.texte[debT1:finT1]) assert(x in self.texte[debT2:finT2]) diffSym = self.preTraitDiffSym(L1,L2,dicoPos,niveau) # pré-calcul des différences symétriques best, closedSet = self.deplacements_pond_Astar(L1,L2,diffSym,dicoPos,coutFixe) # A* LBest = {1:[], 2:[]} while best != (-1,-1): # on remonte le meilleur parcours trouvé LBest[1].append(best[0]) LBest[2].append(best[1]) best=closedSet[best][1] # noeud pére LBest[1].reverse() LBest[2].reverse() for i in xrange(1,len(LBest[1])): assert(0<=LBest[1][i-1]<=LBest[1][i]<=len(L1)) assert(0<=LBest[2][i-1]<=LBest[2][i]<=len(L2)) LResDep={1:[],2:[]} LResBC={1:[],2:[]} debL1=0 debL2=0 debdebT1=debT1 debdebT2=debT2 for i in xrange(len(LBest[1])): #print niveau,i posL1 = LBest[1][i] posL2 = LBest[2][i] B1 = L1[posL1] B2 = L2[posL2] t1 = self.texte[debT1:B1[0]] t2 = self.texte[debT2:B2[0]] assert(debT1<=B1[0]) assert(debT2<=B2[0]) LDepTemp = {1:L1[debL1:posL1], 2:L2[debL2:posL2]} assert(debL1<=posL1) assert(debL2<=posL2) self.ass2__(LDepTemp[1],debT1,B1[0]) self.ass2__(LDepTemp[2],debT2,B2[0]) LResBC,LDepTemp=self.fff(t1,t2,debT1,debT2,LResBC,LDepTemp,B1[0],B2[0],niveau) #for x in LResBC1: # print str(i)+self.texte[x[0]:x[1]] #print str(i)+"-----------------------------------------" LResBC[1].append(B1) LResBC[2].append(B2) self.ass2__(LResBC[1],debdebT1,B1[1]) self.ass2__(LResBC[2],debdebT2,B2[1]) LResDep[1].extend(LDepTemp[1]) LResDep[2].extend(LDepTemp[2]) #self.ass__(LResDep) #self.ass2__(LResDep[1],debdebT1,B1[0]) #self.ass2__(LResDep[2],debdebT2,B2[0]) debT1=B1[1] debT2=B2[1] debL1=posL1+1 debL2=posL2+1 # fin t1=self.texte[debT1:finT1] t2=self.texte[debT2:finT2] assert(debT1<=finT1) assert(debT2<=finT2) if len(t1)>0 and len(t2)>0: LDepTemp = {1:L1[debL1:], 2:L2[debL2:]} if len(LDepTemp[1])>0: self.ass2__(LDepTemp[1],L1[debL1][0],finT1) if len(LDepTemp[2])>0: self.ass2__(LDepTemp[2],L2[debL2][0],finT2) LResBC,LDepTemp=self.fff(t1,t2,debT1,debT2,LResBC,LDepTemp,finT1,finT2,niveau) self.ass__(LResBC) self.ass2__(LResBC[1],debdebT1,finT1) self.ass2__(LResBC[2],debdebT2,finT2) LResDep[1].extend(LDepTemp[1]) LResDep[2].extend(LDepTemp[2]) #self.ass2__(LResDep[1],debdebT1,finT1) #self.ass2__(LResDep[2],debdebT2,finT2) #self.ass__(LResDep) return LResDep[1],LResDep[2],LResBC[1],LResBC[2] def fff(self,t1,t2,debT1,debT2,LResBC,LDepTemp,fT1,fT2,niveau): """ pre: debT1>=0 #forall([fT1>=__return__[0][1][i][0] >= __return__[0][1][i-1][1] >= __old__.debT1 for i in range(1, len(__return__[0][1]))]) #forall([fT2>=__return__[0][2][i][0] >= __return__[0][2][i-1][1] >= __old__.debT2 for i in range(1, len(__return__[0][2]))]) #forall([fT1>=__return__[1][1][i][0] >= __return__[1][1][i-1][1] >= __old__.debT1 for i in range(1, len(__return__[1][1]))]) #forall([fT2>=__return__[1][2][i][0] >= __return__[1][2][i-1][1] >= __old__.debT2 for i in range(1, len(__return__[1][2]))]) """ #return LResBC,LDepTemp if len(t1)==0 or len(t2)==0: return LResBC,LDepTemp st = suffix_tree.GeneralisedSuffixTree([t1,t2]) blocs_texte = st.shared_substrings3(self.long_min_pivots) #self.syserr("BT1 "+str(blocs_texte)) recouv = MediteAppli.Recouvrement(t1+t2,blocs_texte,len(t1)) blocs_texte = recouv.eliminer_recouvrements() #self.syserr("BT2 "+str(blocs_texte)) blocs_texte,r2 = self.remUnique(blocs_texte,len(t1)) #self.syserr("BT3 "+str(blocs_texte)) #self.syserr("r2 "+str(r2)) NL1=[] NL2=[] for clef in blocs_texte: for occ in blocs_texte[clef]: if occ0 and len(NL2)>0: self.ass2__(NL1,debT1,fT1) self.ass2__(NL2,debT2,fT2) self.ass2__(NL1+NL2,debT1,fT2) #ds=self.tool.difference_sym(self.toLTexte(NL1),self.toLTexte(NL2)) #if len(ds)>0: self.syserr("dSym "+str(ds)) #if len(r2)>0: self.syserr("r2 "+str(r2)) NewLResDep1,NewLResDep2,NewLResBC1,NewLResBC2 = self.deplacements_pond__(NL1, NL2, debT1,debT2,fT1,fT2,niveau+1) self.ass2__(NewLResDep1,debT1,fT1) self.ass2__(NewLResDep2,debT2,fT2) self.ass2__(NewLResBC1,debT1,fT1) self.ass2__(NewLResBC2,debT2,fT2) LResBC[1].extend(NewLResBC1) LResBC[2].extend(NewLResBC2) self.ass__(LResBC) LDepTemp[1] = self.tool.soustr_l_intervalles(LDepTemp[1],NewLResBC1) LDepTemp[2] = self.tool.soustr_l_intervalles(LDepTemp[2],NewLResBC2) LDepTemp[1] = self.tool.addition_l_intervalle(LDepTemp[1],NewLResDep1) LDepTemp[2] = self.tool.addition_l_intervalle(LDepTemp[2],NewLResDep2) #self.ass2__(LDepTemp[1],debT1,fT1) #self.ass2__(LDepTemp[2],debT2,fT2) return LResBC,LDepTemp def remUnique(self,dic,l_texte1): """post: len(__return__[0])<=len(dic) forall([x in dic.keys()],lambda x:x in __return__[0].keys() and self.repetition(__return__[0][x])) """ notUnique={} unique={} for x in dic: y=dic[x] if self.repetition(dic[x],l_texte1): notUnique[x]=y else: unique[x]=y return notUnique,unique def toLTexte(self,L): """ #pre:forall(L,lambda x:isinstance(x,(int,int))) """ #print L res=[] for x in L: res.append(self.texte[x[0]:x[1]]) return res def pr(self,L): y="" for x in L: y += self.texte[x[0]:x[1]] +"/" return y def ass__(self,L): if __debug__: for x in L: for i in xrange(1,len(L[x])): assert(L[x][i-1][1]<=L[x][i][0]) class AlignAstarRecur4(AlignAstar2,AlignAstarRecur3): """ texte passé en paramétre des récursions """ def deplacements_pond(self, L1, L2): """ Fonction qui aligne 2 listes d'intervalles du texte pre: isinstance(L1,list) isinstance(L2,list) len(L1)>0 len(L2)>0 forall([self.l_texte1 >= L1[i][0] >= L1[i-1][1] >= 0 for i in range(1, len(L1))]) forall([len(self.texte) >= L2[i][0] >= L2[i-1][1] >= self.l_texte1 for i in range(1, len(L2))]) post:forall([len(self.texte) >=__return__[1][i][0] >= __return__[1][i-1][1] >= 0 for i in range(1, len(__return__[1]))]) #forall([len(self.texte) >=__return__[0][i][0] >= __return__[0][i-1][1] >= 0 for i in range(1, len(__return__[0]))]) #forall([len(self.texte) >=__return__[2][i][0] >= __return__[2][i-1][1] >= 0 for i in range(1, len(__return__[2]))]) """ LResDep1,LResDep2,LResBC1,LResBC2 = self.deplacements_pond__(L1, L2, self.texte, self.l_texte1, 0) LDep,LUnique = self.removeUnique(LResDep1+LResDep2) return LDep,LResBC1+LResBC2,LUnique def deplacements_pond__(self, L1, L2, texte, l_texte1, niveau): """ Fonction qui aligne 2 listes d'intervalles du texte pre: isinstance(L1,list) isinstance(L2,list) len(L1)>0 len(L2)>0 forall([0 >= L1[i][0] >= L1[i-1][1] >= l_texte1 for i in range(1, len(L1))]) forall([l_texte1 >= L2[i][0] >= L2[i-1][1] >= len(texte) for i in range(1, len(L2))]) #post[debT1, debT2]: # forall([finT1>=__return__[2][i][0] >= __return__[2][i-1][1] >= __old__.debT1 for i in range(1, len(__return__[2]))]) # forall([finT2>=__return__[3][i][0] >= __return__[3][i-1][1] >= __old__.debT2 for i in range(1, len(__return__[3]))]) # forall([finT1>=__return__[0][i][0] >= __return__[0][i-1][1] >= __old__.debT1 for i in range(1, len(__return__[0]))]) # forall([finT2>=__return__[1][i][0] >= __return__[1][i-1][1] >= __old__.debT2 for i in range(1, len(__return__[1]))]) """ debT1=0 finT1=debT2=l_texte1 finT2=len(texte) texte1=texte[:l_texte1] texte2=texte[l_texte1:] # dicoPos[1] stocke pour chaque bloc de L1 ses positions dans cette liste dicoPos = {} coutFixe = {} dicoPos[1], coutFixe[1] = self.calcPosCoutFixe(L1,texte) dicoPos[2], coutFixe[2] = self.calcPosCoutFixe(L2,texte) if niveau>0: #sys.stderr.write("t1="+self.texte[debT1:finT1]+"\n") #sys.stderr.write("t2="+self.texte[debT2:finT2]+"\n") #assert(dicoPos[1].has_key('.on.')) #sys.stderr.flush() pass for x in dicoPos[1]: assert(x in texte1) assert(x in texte2) for x in dicoPos[2]: assert(x in texte1) assert(x in texte2) diffSym = self.preTraitDiffSym(L1,L2,texte,dicoPos,niveau) # pré-calcul des différences symétriques best, closedSet = self.deplacements_pond_Astar(L1,L2,texte,diffSym,dicoPos,coutFixe) # A* LBest = {1:[], 2:[]} while best != (-1,-1): # on remonte le meilleur parcours trouvé LBest[1].append(best[0]) LBest[2].append(best[1]) best=closedSet[best][1] # noeud pére LBest[1].reverse() LBest[2].reverse() for i in xrange(1,len(LBest[1])): assert(0<=LBest[1][i-1]<=LBest[1][i]<=len(L1)) assert(0<=LBest[2][i-1]<=LBest[2][i]<=len(L2)) LResDep={1:[],2:[]} LResBC={1:[],2:[]} debL1=0 debL2=0 debdebT1=debT1 debdebT2=debT2 for i in xrange(len(LBest[1])): #print niveau,i posL1 = LBest[1][i] posL2 = LBest[2][i] B1 = L1[posL1] B2 = L2[posL2] t1 = texte[debT1:B1[0]] t2 = texte[debT2:B2[0]] assert(debT1<=B1[0]) assert(debT2<=B2[0]) LDepTemp = {1:L1[debL1:posL1], 2:L2[debL2:posL2]} assert(debL1<=posL1) assert(debL2<=posL2) self.ass2__(LDepTemp[1],debT1,B1[0]) self.ass2__(LDepTemp[2],debT2,B2[0]) LResBC,LDepTemp=self.fff(t1,t2,debT1,debT2,LResBC,LDepTemp,B1[0],B2[0],niveau) #for x in LResBC1: # print str(i)+self.texte[x[0]:x[1]] #print str(i)+"-----------------------------------------" LResBC[1].append(B1) LResBC[2].append(B2) self.ass2__(LResBC[1],debdebT1,B1[1]) self.ass2__(LResBC[2],debdebT2,B2[1]) LResDep[1].extend(LDepTemp[1]) LResDep[2].extend(LDepTemp[2]) #self.ass__(LResDep) #self.ass2__(LResDep[1],debdebT1,B1[0]) #self.ass2__(LResDep[2],debdebT2,B2[0]) debT1=B1[1] debT2=B2[1] debL1=posL1+1 debL2=posL2+1 # fin t1=texte[debT1:finT1] t2=texte[debT2:finT2] assert(debT1<=finT1) assert(debT2<=finT2) if len(t1)>0 and len(t2)>0: LDepTemp = {1:L1[debL1:], 2:L2[debL2:]} if len(LDepTemp[1])>0: self.ass2__(LDepTemp[1],L1[debL1][0],finT1) if len(LDepTemp[2])>0: self.ass2__(LDepTemp[2],L2[debL2][0],finT2) LResBC,LDepTemp=self.fff(t1,t2,debT1,debT2,LResBC,LDepTemp,finT1,finT2,niveau) self.ass__(LResBC) self.ass2__(LResBC[1],debdebT1,finT1) self.ass2__(LResBC[2],debdebT2,finT2) LResDep[1].extend(LDepTemp[1]) LResDep[2].extend(LDepTemp[2]) #self.ass2__(LResDep[1],debdebT1,finT1) #self.ass2__(LResDep[2],debdebT2,finT2) #self.ass__(LResDep) return LResDep[1],LResDep[2],LResBC[1],LResBC[2] def fff(self,t1,t2,debT1,debT2,LResBC,LDepTemp,fT1,fT2,niveau): """ pre: debT1>=0 #forall([fT1>=__return__[0][1][i][0] >= __return__[0][1][i-1][1] >= __old__.debT1 for i in range(1, len(__return__[0][1]))]) #forall([fT2>=__return__[0][2][i][0] >= __return__[0][2][i-1][1] >= __old__.debT2 for i in range(1, len(__return__[0][2]))]) #forall([fT1>=__return__[1][1][i][0] >= __return__[1][1][i-1][1] >= __old__.debT1 for i in range(1, len(__return__[1][1]))]) #forall([fT2>=__return__[1][2][i][0] >= __return__[1][2][i-1][1] >= __old__.debT2 for i in range(1, len(__return__[1][2]))]) """ #return LResBC,LDepTemp if len(t1)==0 or len(t2)==0: return LResBC,LDepTemp st = suffix_tree.GeneralisedSuffixTree([t1,t2]) blocs_texte = st.shared_substrings3(self.long_min_pivots) recouv = MediteAppli.Recouvrement(t1+t2,blocs_texte,len(t1)) blocs_texte = recouv.eliminer_recouvrements() blocs_texte,r2 = self.remUnique(blocs_texte,len(t1)) NL1=[] NL2=[] for clef in blocs_texte: for occ in blocs_texte[clef]: if occ0 and len(NL2)>0: self.ass2__(NL1,0,len(t1)) self.ass2__(NL2,len(t1),len(t1+t2)) self.ass2__(NL1+NL2,0,len(t1+t2)) NewLResDep1,NewLResDep2,NewLResBC1,NewLResBC2 = self.deplacements_pond__(NL1, NL2, t1+t2, len(t1),niveau+1) NewLResDep1=self.addNumLInter(NewLResDep1,debT1) NewLResDep2=self.addNumLInter(NewLResDep2,debT2-len(t1)) NewLResBC1=self.addNumLInter(NewLResBC1,debT1) NewLResBC2=self.addNumLInter(NewLResBC2,debT2-len(t1)) self.ass2__(NewLResDep1,debT1,fT1) self.ass2__(NewLResDep2,debT2,fT2) self.ass2__(NewLResBC1,debT1,fT1) self.ass2__(NewLResBC2,debT2,fT2) LResBC[1].extend(NewLResBC1) LResBC[2].extend(NewLResBC2) self.ass__(LResBC) LDepTemp[1] = self.tool.soustr_l_intervalles(LDepTemp[1],NewLResBC1) LDepTemp[2] = self.tool.soustr_l_intervalles(LDepTemp[2],NewLResBC2) LDepTemp[1] = self.tool.addition_l_intervalle(LDepTemp[1],NewLResDep1) LDepTemp[2] = self.tool.addition_l_intervalle(LDepTemp[2],NewLResDep2) #self.ass2__(LDepTemp[1],debT1,fT1) #self.ass2__(LDepTemp[2],debT2,fT2) return LResBC,LDepTemp def addNumLInter(self,L,num): res=[] for x in L: res.append([x[0]+num,x[1]+num]) return res ''' class AlignAstarRecur(AlignAstar2): #def __init__(self,texte,l_texte1,long_min_pivots): def __init__(self,l_texte1,long_min_pivots=1, algoAlign="", coeff=None, sep=True): """ Constructeur #pre: isinstance(l_texte1, int) and isinstance(long_min_pivots, int) @param texte1: la premiere version du texte a comparer @type texte1: String @param long_min_pivots: longueur minimale des chaines répétées @param long_min_pivots: integer @param algoAlign: algo d'alignement, par défaut A* @type algoAlign: string @param coeff: poids des params pour MOA* @type coeff: triplet @param sep: sensible aux séparateurs si Vrai @type sep: boolean """ AlignAstar2.__init__(self) #,texte) self.long_min_pivots=long_min_pivots self.l_texte1 = l_texte1 self.algoAligneur = algoAlign # algo d'alignement self.separatorSensivitive = sep # sensible aux séparateurs def run(self, t1, t2): """ pre: isinstance(t1,str) and isinstance(t2,str) """ # niveau max d'appel récursif self.MAXRECURSION = 1000 self.l_texte2 = len(t2) # application de psyco ? self.preTraitDiffSym = psyco.proxy(self.preTraitDiffSym) # ajoute-t-on les déplacements récursifs ? # par défaut non car on perd l'assertion de non-chevauchement des déplacements self.addSubDep = True # on enlève les $ car le suffix_tree ne les supporte pas sepTable = string.maketrans("$",".") t1 = t1.translate(sepTable) t2 = t2.translate(sepTable) lDEP1,lDEP2,lBC1,lBC2=self.deplacements_pond2(t1,t2) #LDEP = lDEP1+lDEP2 lDEP1.extend(lDEP2) #trace('LDEP = self.cleanDep(lDEP1,t1+t2)',locals()) LDEP = self.cleanDep(lDEP1,t1,t2) lBC1.extend(lBC2) return LDEP,lBC1#+lBC2#,LUnique def cleanDep(self,LDEP,texte1,texte2): """Enleve les deplacements inclus dans un autre deplacement et ceux qui ne sont plus répétés #pre: forall([len(texte) >= LDEP[i][0] >= LDEP[i-1][1] >= 0 for i in range(1, len(LDEP))]) #post: forall([len(texte1)+len(texte2) >=__return__[i][0] >= __return__[i-1][1] >= 0 for i in range(1, len(__return__))]) """ size_LDEP = len(LDEP)+1 while len(LDEP) < size_LDEP:# boucle jusqu'à un point fixe size_LDEP = len(LDEP) #print size_LDEP,len(LDEP) LDEP = self.removeInclude(LDEP) #print size_LDEP,len(LDEP) #LDEP,LUnique = self.removeUnique(LDEP,texte1+texte2) LDEP = self.removeUnique(LDEP,texte1,texte2) #print size_LDEP,len(LDEP),len(LUnique) #LDEP = self._filtreDepRec(LDEP) #print '---------------' #print LUnique return LDEP def removeInclude(self,L): """ Enleve les deplacements inclus dans un autre déplacement #pre: forall([len(texte) >= L[i][0] >= L[i-1][1] >= 0 for i in range(1, len(L))]) #post: forall([len(texte) >=__return__[i][0] >= __return__[i-1][1] >= 0 for i in range(1, len(__return__))]) # forall([len(texte) >=__return__[1][i][0] >= __return__[1][i-1][1] >= 0 for i in range(1, len(__return__[1]))]) """ LDep = [] prevInter = (0,0) for (deb,fin) in L: if prevInter[0] <= deb <= fin <= prevInter[1]: continue else: LDep.append([deb,fin]) prevInter = (deb,fin) return LDep def removeUnique__(self,L,texte1,texte2): """ Scinde L en 2 listes, une pour les chaines ayant plusieurs occurences et l'autre pour celles en ayant une seule #pre: forall([len(texte) >= L[i][0] >= L[i-1][1] >= 0 for i in range(1, len(L))]) #post: forall([len(texte) >=__return__[0][i][0] >= __return__[0][i-1][1] >= 0 for i in range(1, len(__return__[0]))]) # forall([len(texte) >=__return__[1][i][0] >= __return__[1][i-1][1] >= 0 for i in range(1, len(__return__[1]))]) """ texte = texte1+texte2 dicDep = {} for x in L: t=texte[x[0]:x[1]] try: dicDep[t].append(x[0]) except KeyError: dicDep[t]=[x[0]] #if not dicDep.has_key(t): dicDep[t]=[] #dicDep[t].append(x[0]) #print dicDep LDep=[] LUnique=[] for clef,locc in dicDep.iteritems(): len_clef = len(clef) if self.repetition(locc,self.l_texte1): assert texte[locc[0]:locc[0]+len_clef] == texte[locc[-1]:locc[-1]+len_clef] assert texte1[locc[0]:locc[0]+len_clef] == texte2[locc[-1]-self.l_texte1:locc[-1]-self.l_texte1+len_clef] , texte1[locc[0]:locc[0]+len_clef] + texte2[locc[-1]-self.l_texte1:locc[-1]-self.l_texte1+len_clef] for occ in locc: #LDep = self.tool.addition_intervalle(LDep,[occ, occ+len(clef)]) bisect.insort_right(LDep,[occ, occ+len_clef]) else: for occ in locc: #LUnique = self.tool.addition_intervalle(LUnique,[occ, occ+len(clef)])#sans fusion bisect.insort_right(LUnique,[occ, occ+len_clef])#sans fusion #print LDep return LDep#,LUnique def removeUnique(self,L,texte1,texte2): """ Scinde L en 2 listes, une pour les chaines ayant plusieurs occurences et l'autre pour celles en ayant une seule #pre: forall([len(texte) >= L[i][0] >= L[i-1][1] >= 0 for i in range(1, len(L))]) #post: forall([len(texte) >=__return__[0][i][0] >= __return__[0][i-1][1] >= 0 for i in range(1, len(__return__[0]))]) # forall([len(texte) >=__return__[1][i][0] >= __return__[1][i-1][1] >= 0 for i in range(1, len(__return__[1]))]) """ dicDep = {} for (deb,fin) in L: longueur = fin - deb if deb < self.l_texte1: cle = hash(texte1[deb:fin]) else: cle = hash(texte2[deb-self.l_texte1:fin-self.l_texte1]) try: dicDep[(cle,longueur)].append(deb) except KeyError: dicDep[(cle,longueur)]=[deb] #print dicDep LDep=[] #LUnique=[] for (cle,longueur),locc in dicDep.iteritems(): #len_clef = len(clef) if self.repetition(locc,self.l_texte1): assert texte1[locc[0]:locc[0]+longueur] == texte2[locc[-1]-self.l_texte1:locc[-1]-self.l_texte1+longueur] , texte1[locc[0]:locc[0]+longueur] + texte2[locc[-1]-self.l_texte1:locc[-1]-self.l_texte1+longueur] for occ in locc: #LDep = self.tool.addition_intervalle(LDep,[occ, occ+len(clef)]) bisect.insort_right(LDep,[occ, occ+longueur]) #else: # for occ in locc: #LUnique = self.tool.addition_intervalle(LUnique,[occ, occ+len(clef)])#sans fusion # bisect.insort_right(LUnique,[occ, occ+longueur])#sans fusion #print LDep return LDep #,LUnique def compute_alignement(self,t1,t2): """ prends les 2 textes en entrée et renvoie 2 listes au format [(BC,[BDeps le précédant])] """ aligneSMEMS = True if aligneSMEMS: s1,s2 = self._texteToSeqHomo(t1,t2) else: # 3e param, taille des ngrammes s1,s2 = self._texteToSeqHomoNGrammes(t1,t2,1,self.long_min_pivots) #print "S1:"+self.lint2str(s1,t1) ; print "S2:"+self.lint2str(s2,t1+t2) if __debug__: texte = t1+t2 self.ass2__(s1,0,len(t1),texte) self.ass2__(s2,len(t1),len(t1)+len(t2),texte) if len(s1)==0 or len(s2)==0: return [],[] #logging.debug('s1='+str(s1)) #logging.debug('s2='+str(s2)) #LResT1,LResT2 = self._appelAstar(s1,s2,t1,t2,len(t1)) LResT1,LResT2 = self._appelAlgo(s1,s2,t1,t2,len(t1)) return LResT1,LResT2 def _appelAlgo(self,s1,s2,t1,t2,t): if self.algoAligneur.lower() == 'LIS'.lower(): a = aligne.AlignLIS() return a.alignement(s1, s2, t1, t2, t) elif self.algoAligneur.lower() == 'HIS'.lower(): a = aligne.AlignHIS() return a.alignement(s1, s2, t1, t2, t) elif self.algoAligneur.lower() == 'HISCont'.lower(): a = aligne.AlignHISCont() return a.alignement(s1, s2, t1, t2, t) elif self.algoAligneur.lower() == 'HISVO'.lower(): a = aligne.AlignHISVo() return a.alignement(s1, s2, t1, t2, t) elif self.algoAligneur.lower() == 'glouton'.lower(): a = aligne.AlignGlouton() return a.alignement(s1, s2, t1, t2, t) elif self.algoAligneur.lower() == 'dyn'.lower(): a = aligne.AlignProgDyn() return a.alignement(s1, s2, t1, t2, t) elif self.algoAligneur.lower() == 'dyn2'.lower(): a = aligne.AlignProgDyn2() return a.alignement(s1, s2, t1, t2, t) else: return self._appelAstar(s1,s2,t1,t2,t) def deplacements_pond2(self, t1, t2, niveau=0): """ pre: isinstance(t1,str) and isinstance(t2,str) """ #print "T1:"+t1 ; print "T2:"+t2 if len(t1)==0 or len(t2)==0: return [],[],[],[] if niveau > self.MAXRECURSION: return [],[],[],[] logging.log(5,"debut dep_pond niveau "+str(niveau)) LResT1,LResT2 = self.compute_alignement(t1,t2) if len(LResT1)==0 or len(LResT2)==0: return [],[],[],[] texte=t1+t2 debutT1=i=0 ; debutT2=len(t1) lResBC1=[] ; lResBC2=[] ; lResDEP1=[] ; lResDEP2=[] #print "LResT1:"+str(LResT1); print "LResT2:"+str(LResT2) while (ilen(LResT2): assert(len(LResT1)==len(LResT2)+1 and LResT1[-1][0] is None) lResDEP1.extend(LResT1[-1][1]) elif len(LResT2)>len(LResT1): assert(len(LResT2)==len(LResT1)+1 and LResT2[-1][0] is None) lResDEP2.extend(LResT2[-1][1]) else: assert(len(LResT1)==len(LResT2)) logging.log(5,"fin dep_pond niveau "+str(niveau)) return lResDEP1,lResDEP2,lResBC1,lResBC2 def _filtreDepRec(self, liste): """recherche d'un point fixe""" taille_originale = len(liste) taille_prev = 0 liste2 = self.__filtreDepRec(liste) taille_courante = len(liste2) while taille_prev != taille_courante: taille_prev = taille_courante liste2 = self.__filtreDepRec(liste2) taille_courante = len(liste2) return liste2 def __filtreDepRec(self, liste): """filtrage des déplacements se chevauchant""" liste2 = [] if len(liste) < 2: liste2.extend(liste) else: i = 0 while i < len(liste)-1: segment1 = liste[i] ; long1 = segment1[1]-segment1[0] segment2 = liste[i+1] ; long2 = segment2[1]-segment2[0] if segment1[1] > segment2[0]: if long1 >= long2: liste2.append(segment1) i += 2 else: liste2.append(segment2) i += 1 else: liste2.append(segment1) i += 1 if i == len(liste)-1: liste2.append(liste[-1]) return liste2 def _texteToSeqHomo(self,t1,t2): """ Extrait des 2 textes, les 2 séquences de blocs répétés """ logging.log(5,"debut _texteToSeqHomo") st = suffix_tree.GeneralisedSuffixTree([t1,t2]) logging.log(5,"fin construction ST") #blocs_texte,seq = st.shared_substrings3(self.long_min_pivots) #blocs_texte = st.get_MEM_index_chaine(self.long_min_pivots) eliminationRecouv = True if self.algoAligneur.lower() == 'HISCont'.lower(): eliminationRecouv = False blocs_texte = st.get_MEM_index_chaine3(self.long_min_pivots, eliminRecouv=eliminationRecouv) #print 'blocs_texte:'+str(blocs_texte) logging.log(5,"fin extraction MEM") del st #gc.collect() #recouv = recouvrement.Recouvrement(t1+t2,blocs_texte,len(t1)) #recouv = recouvrement.Recouvrement2(t1+t2,seq,len(t1)) #blocs_texte = recouv.eliminer_recouvrements() #logging.debug("fin elim recouvrement") blocs_texte = self.remUnique(blocs_texte,len(t1), t1, t2) NL1=[] ; NL2=[] len_t1 = len(t1) for (cle,longueur),liste_occ in blocs_texte.iteritems(): #len_clef = fin - debut #len(clef) for occ in iter(liste_occ): if occ < len_t1: #NL1 = self.tool.addition_intervalle(NL1,[occ, occ+len(clef)]) bisect.insort_right(NL1,[occ, occ+longueur]) else: #NL2 = self.tool.addition_intervalle(NL2,[occ, occ+len(clef)]) bisect.insort_right(NL2,[occ, occ+longueur]) logging.log(5,"fin _texteToSeqHomo") #print NL1 return NL1,NL2 def _texteToSeqHomoNGrammes(self,t1,t2,n,min_size): """Recherche naive d'homologies @param t1: le texte 1 @param t2: le texte 2 @param n: le nombre de mots que l'on desire dans un bloc (1=> monogramme, 2=> bigramme etc...) @param min_size: taille minimale des blocs """ t12=t1+t2 ; lenT1 = len(t1) ; lenT12 = len(t12) # découpage effectif en listes de Ngrammes pour les textes 1 et 2 t1Split = self.__splitNGrammes(t1,n,0) t2Split = self.__splitNGrammes(t2, n,lenT1) # Mise sous le format dico[longueur][cle] = [listePositions] pour texte 1 dicoT1 = {} for key,(debut,fin) in t1Split: assert 0 <= debut < fin <= lenT1 longueur = fin - debut assert longueur > 0 if not dicoT1.has_key(longueur): dicoT1[longueur] = {} try: dicoT1[longueur][key].append(debut) except KeyError : dicoT1[longueur][key] = [debut] # Mise sous le format dico[longueur][cle] = [listePositions] pour texte 2 dicoT2 = {} for key,(debut,fin) in t2Split: assert lenT1 <= debut < fin <= lenT12 longueur = fin - debut assert longueur > 0 if not dicoT2.has_key(longueur): dicoT2[longueur] = {} try: dicoT2[longueur][key].append(debut) except KeyError : dicoT2[longueur][key] = [debut] #print len(dicoT1), len(dicoT2) # on ne garde dans dico que les occurrences des Ngrammes présents dans les 2 textes dico = {} for longueur,dicoLongueurT1 in dicoT1.iteritems(): if dicoT2.has_key(longueur): #print longueur for key,listePosT1 in dicoLongueurT1.iteritems(): if dicoT2[longueur].has_key(key): listePosT2 = dicoT2[longueur][key] if not dico.has_key(longueur): dico[longueur] = {} try: dico[longueur][key].extend(listePosT1) dico[longueur][key].extend(listePosT2) except KeyError: dico[longueur][key] = listePosT1 dico[longueur][key].extend(listePosT2) dico[longueur][key].sort() #print len(dico) if __debug__: # on va tester si toutes les occurrences d'un Ngrammes sont bien le même texte for longueur, dicoLongueur in dico.iteritems(): for key, listePos in dicoLongueur.iteritems(): texte = t12[listePos[0]:listePos[0]+longueur] listeTextes = [texte] i = 1 ; OK = True while i < len(listePos): #assert texte == t12[listePos[i]:listePos[i]+longueur], (texte, t12[listePos[i]:listePos[i]+longueur]) texte2 = t12[listePos[i]:listePos[i]+longueur] listeTextes.append(texte2) if texte != texte2: OK = False i += 1 if not OK: print listeTextes # élimination des recouvrements if len(dico) > 1: #logging.warning('len(dico)='+str(len(dico))) pass recouv = recouvrement.Recouvrement4(t12,dico,lenT1,min_size) blocs = recouv.eliminer_recouvrements() # les listes de positions arrivent def Recouvrement4 décroissantes # on doit les inverser pour remUnique qui enlève les ngrammes # n'ayant plus qu'une seule occurrence for cle, listePos in blocs.iteritems(): listePos.reverse() if len(blocs) > 1: #logging.warning('len(blocs)='+str(len(blocs))) pass blocs = self.remUnique(blocs,lenT1,t1,t2) if len(blocs) > 1: #logging.warning('len(blocs)2='+str(len(blocs))) pass # on met sous les bon format NL1=[] ; NL2=[] #on lit les blocs et on les places dans la bonne liste en fonction de leur position for key,pos in blocs.iteritems(): for posD in pos : if posD < len(t1) : NL1.append([posD,posD+key[1]]) else : NL2.append([posD,posD+key[1]]) NL1.sort() NL2.sort() return NL1,NL2 def __splitNGrammes(self, texte, tailleNgram, lenT1): """ Découpage en Ngrammes Appel plusieurs fois __splitNGrammes2 pour avoir tous les ngrammes car celle-ci ne renvoie que des ngrammes disjoints 2 à 2 @param texte: le texte @param tailleNgram: de 1 à 3, le nombre de mots que l'on desire dans un bloc (1=> monogramme, 2=> bigramme etc...) @param lenT1: lmongueur du texte1 """ assert 1 <= tailleNgram <= 3 if self.separatorSensivitive: sep=" " else: sep="." if tailleNgram == 1: # unigramme return self.__splitNGrammes2( texte, tailleNgram, lenT1) else: # r1 contient les bigrammes commençant en 0 et disjoints 2 à 2 r1 = self.__splitNGrammes2( texte, tailleNgram, lenT1) t = texte.split(sep) ; i = 0 if len(t) > 0: # on enlève les éventuels unigrammes vides au début de séquence while len(t[0]) == 0: t = t[1:] ; i += 1 # on enlève le 1er unigramme de la séquence pour décaler de 1 assert len(t[0]) > 0 i += len(t[0]) ; t = t[1:] # on enlève les éventuels unigrammes vides while len(t) > 0 and len(t[0]) == 0: t = t[1:] ; i += 1 else: i += 1 # r2 contient les bigrammes (ou trigrammes) commençant en 1 et disjoints 2 à 2 r2 = self.__splitNGrammes2( texte[i:], tailleNgram, lenT1+i) r1.extend(r2) if tailleNgram == 2: return r1 else: if len(t) > 0: # on enlève le 1er unigramme de la séquence pour décaler de 1 assert len(t[0]) > 0 i += len(t[0]) ; t = t[1:] # on enlève les éventuels unigrammes vides while len(t) > 0 and len(t[0]) == 0: t = t[1:] ; i += 1 else: i += 1 # r1 contient les trigrammes commençant en 2 et disjoints 2 à 2 r3 = self.__splitNGrammes2( texte[i:], tailleNgram, lenT1+i) r1.extend(r3) return r1 def __splitNGrammes2(self, texte, tailleNgram, lenT1): """Découpage effectif en Ngrammes !! Attention, la fonction ne retourne que les Ngrammes disjoints 2à2 Il faut l'appellet plusieurs fois en décalant le début pour avoir les autres ngrammes @param texte: le texte @param tailleNgram: le nombre de mots que l'on desire dans un bloc (1=> monogramme, 2=> bigramme etc...) @param lenT1: l'offset pour la position (0 pour le texte1, len(texte1) pour le texte 2 """ if self.separatorSensivitive: sep=" " else: sep="." res = [] # liste résultat i = 0 # indice courant dans le texte prev = 0 # indice de début du ngram curNgram = 0 # taille du ngram courant (en nb de gram) accu = [] # accumulateur du ngram courant while i < len(texte): car = texte[i] # caractère courant accu.append(car) if car == sep: # si séparateur, on vient d'ajouter un au ngram courant curNgram += 1 # si on a la taille de ngram souhaitée, on l'ajoute if curNgram == tailleNgram: ngram = ''.join(accu) debut = prev +lenT1 fin = debut + len(ngram) #print ngram,[debut,fin] assert ngram == texte[debut-lenT1:fin-lenT1], (ngram, texte[debut-lenT1:fin-lenT1]) res.append((hash(ngram),[debut,fin])) accu = [] ; curNgram = 0 ; prev = i + 1 i += 1 return res def ______splitNGrammes(self, texte, tailleNgram, lenT1): """Sort les blocs d'un texte en nGrammes @param texte: le texte @param tailleNgram: le nombre de mots que l'on desire dans un bloc (1=> monogramme, 2=> bigramme etc...) @param lenT1: l'offset pour la position (0 pour le texte1, len(texte1) pour le texte 2 """ if self.separatorSensivitive: sep=" " else: sep="." tSplit = texte.split(sep) assert len(texte) == sum([len(x) for x in tSplit]) + len(tSplit)-1 print tSplit[:5] res = [] ; posTexte = 0 ; pos = 0 while pos < len(tSplit)-tailleNgram+1: mot = tSplit[pos] if len(mot) == 0: pos += 1 ; posTexte += 1 else: valNgramme = mot tailleCurrentNgramme = 1 ; i = 1 ; nbSep = 0 while tailleCurrentNgramme < tailleNgram: next = tSplit[pos+i] if len(next) == 0: nbSep += 1 else: valNgramme += sep + nbSep * sep + next tailleCurrentNgramme += 1 i += 1 if tailleCurrentNgramme == tailleNgram: debut = posTexte + lenT1 fin = posTexte + lenT1 + len(valNgramme) + nbSep print (valNgramme, texte[debut:fin]) res.append((hash(valNgramme),[debut,fin])) assert valNgramme == texte[debut:fin], (valNgramme, texte[debut:fin]) pos += 1 ; posTexte += len(mot)+1 if __debug__: print pos, posTexte som = 0 for x in tSplit[:pos]: if len(x) == 0: som += 1 else: som += len(x)+1 assert posTexte-1 == som, (posTexte-1,som) def ____splitNGrammes(self, texte, tailleNgram, lenT1): """Sort les blocs d'un texte en nGrammes @param texte: le texte @param tailleNgram: le nombre de mots que l'on desire dans un bloc (1=> monogramme, 2=> bigramme etc...) @param lenT1: l'offset pour la position (0 pour le texte1, len(texte1) pour le texte 2 """ if self.separatorSensivitive: sep=" " else: sep="." tSplit = texte.split(sep) assert len(texte) == sum([len(x) for x in tSplit]) + len(tSplit)-1 posT = -1 res = [] for pos in xrange(len(tSplit)): mot = tSplit[pos] posT += 1 if len(mot) != 0: #len(mot) == 0 => split entre 2 " " valNgramme = mot lenNgramme = len(mot) #recherche du ngramme tailleCurrentNgramme = 1 nbEspace = 0 while (tailleCurrentNgramme < tailleNgram and pos+tailleCurrentNgramme+nbEspace < len(tSplit)): if len(tSplit[pos+tailleCurrentNgramme+nbEspace]) == 0 : nbEspace += 1 valNgramme += sep else: valNgramme += sep+tSplit[pos+tailleCurrentNgramme+nbEspace] tailleCurrentNgramme += 1 if tailleCurrentNgramme == tailleNgram: #on a crée un nGramme debut = posT + lenT1 #nombre de lettres+nombre d'espaces+un espace-1 par mot ds le nGramme fin = posT+lenT1 + len(valNgramme) + nbEspace+tailleNgram-1 res.append((hash(valNgramme),[debut,fin])) assert valNgramme == texte[debut:fin], (valNgramme, texte[debut:fin]) posT += len(mot) else: #pass posT += 1 return res def _appelAstar(self,L1,L2,texte1,texte2,lt1): """ Alignement A* entre les 2 séquences pre: isinstance(L1,list) and isinstance(L2,list) and isinstance(texte1,str) and isinstance(texte2,str) post: (len(__return__[0])==len(__return__[1])) or (len(__return__[0])==len(__return__[1])+1) or \ (len(__return__[0])+1==len(__return__[1]))""" # dicoPos1 stocke pour chaque bloc de L1 ses positions dans cette liste logging.log(5,"debut _appelAstar") LC1 = [] ; LC2 = [] # (cle) Lcf1 = [] ; Lcf2 = [] # (cle,longueur) LH1 = [] ; LH2 = [] # (cle,debut,fin) for bloc in L1: cle = hash(texte1[bloc[0]:bloc[1]]) LC1.append(cle) Lcf1.append((cle,bloc[1]-bloc[0])) LH1.append((cle, bloc[0], bloc[1])) #print LH1 logging.log(5,'init L1 done') for bloc in L2: cle = hash(texte2[bloc[0]-lt1:bloc[1]-lt1]) LC2.append(cle) Lcf2.append((cle,bloc[1]-bloc[0])) LH2.append((cle, bloc[0], bloc[1])) logging.log(5,'init L2 done') dicoPos = {} coutFixe = {} dicoPos[1], coutFixe[1] = self.calcPosCoutFixe(Lcf1) dicoPos[2], coutFixe[2] = self.calcPosCoutFixe(Lcf2) if __debug__: for cle in dicoPos[1]: assert cle in dicoPos[2], str(cle)+' / '+str([texte1[d:f] for d,f in L1])+' '+str([texte2[d-lt1:f-lt1] for d,f in L2]) for cle in dicoPos[2]: assert cle in dicoPos[1] dic = {} ; tri = [] for cle,deb,fin in LH1: try: val,lpos = dic[cle] lpos.append((deb,fin)) dic[cle] = val+1, lpos except KeyError: dic[cle] = 1 , [(deb,fin)] for cle,(nb,lpos) in dic.iteritems(): bisect.insort_right(tri,(nb,lpos)) for nb,lpos in tri[-20:]: deb = lpos[0][0] ; fin =lpos[0][1] #logging.debug('LH1: nb = '+str(nb)+' / '+texte1[deb:fin]) dic = {} ; tri = [] for cle,deb,fin in LH2: try: val,lpos = dic[cle] lpos.append((deb,fin)) dic[cle] = val+1, lpos except KeyError: dic[cle] = 1 , [(deb,fin)] for cle,(nb,lpos) in dic.iteritems(): bisect.insort_right(tri,(nb,lpos)) for nb,lpos in tri[-20:]: deb = lpos[0][0] ; fin =lpos[0][1] #logging.debug('LH2: nb = '+str(nb)+' / '+texte2[deb-lt1:fin-lt1]) tri = [] for cle,liste in dicoPos[1].iteritems(): bisect.insort_right(tri,len(liste)) #logging.debug('tri liste1 = '+str(tri)) logging.debug("longueur liste1 = %d, moyenne = %.2f, mediane = %d",len(tri),+(0.0+sum(tri))/len(tri),tri[len(tri)/2]) tri = [] for cle,liste in dicoPos[2].iteritems(): bisect.insort_right(tri,len(liste)) #logging.debug('tri liste2 = '+str(tri)) logging.debug('longueur liste2 = %d, moyenne = %.2f, mediane = %d',len(tri),(0.0+sum(tri))/len(tri),tri[len(tri)/2]) logging.log(5,"fin calcPosCoutFixe") #psyco.bind(self.preTraitDiffSym) diffSym = self.preTraitDiffSym(LH1,LH2,dicoPos) # pré-calcul des différences symétriques #logging.debug(LH1) #logging.debug(LH2) #logging.debug(dicoPos) #diffSym = self.preTraitDiffSymVect(LH1,LH2,dicoPos) # pré-calcul des différences symétriques #trace('diffSym = self.preTraitDiffSymVect(LH1,LH2,dicoPos)',locals()) logging.log(5,"fin preTraitDiffSym") #for cle,val in diffSym.iteritems(): # assert diffSym2.has_key(cle) # assert val == diffSym2[cle],str(val)+'/'+str(cle)+'/'+str(diffSym2[cle]) #for cle,val in diffSym2.iteritems(): # assert diffSym.has_key(cle) # assert val == diffSym[cle],str(val)+'/'+str(cle)+'/'+str(diffSym[cle]) best, closedSet = self.deplacements_pond_Astar(LC1,LC2,diffSym,dicoPos,coutFixe)# A* logging.log(5,"fin deplacements_pond_Astar") dicoBest={1:{}, 2:{}} # dico des positions dans L1 et L2 du meilleur parcours while best != (-1,-1): # on remonte le meilleur parcours trouvé dicoBest[1][best[0]]=1 dicoBest[2][best[1]]=1 best=closedSet[best][1] # noeud père s1=self._makeLRes(dicoBest[1],L1) s2=self._makeLRes(dicoBest[2],L2) logging.log(5,'s1 = '+str(s1)) logging.log(5,'s2 = '+str(s2)) if __debug__: s11=[] s12=[] for i in xrange(len(s1)): s11.append(s1[i][0]) s12.extend(s1[i][1]) if s11[-1] is None: s11=s11[:-1] s21=[] s22=[] for i in xrange(len(s2)): s21.append(s2[i][0]) s22.extend(s2[i][1]) if s21[-1] is None: s21=s21[:-1] self.ass2__(s11,0,lt1,texte1) self.ass2__(s12,0,lt1,texte1) texte=texte1+texte2 self.ass2__(s21,lt1,len(texte1)+len(texte2),texte) self.ass2__(s22,lt1,len(texte1)+len(texte2),texte) self.ass2__(s11+s21,0,len(texte1)+len(texte2),texte) self.ass2__(s12+s22,0,len(texte1)+len(texte2),texte) del dicoPos, coutFixe, dicoBest logging.log(5,"fin _appelAstar") return s1,s2 def _makeLRes(self,dicoBest,L): """ Donne une liste [(BC,[BDeps le précédant])] pre: isinstance(dicoBest,dict) and isinstance(L,list) #post: forall([ __return__[i-1][0][1] <= __return__[i][0][0] <= __return__[0][1][0][0] for i in range(1, len(__return__))]) """ LRes = [] accuDep = [] for i in xrange(len(L)): if not dicoBest.has_key(i): accuDep.append(L[i]) else: LRes.append((L[i],accuDep)) accuDep = [] if len(accuDep)>0: LRes.append((None,accuDep)) return LRes def remUnique(self,dic,l_texte1, texte1 ,texte2): """#post: len(__return__[0])<=len(dic) # forall([x in dic.keys()],lambda x:x in __return__[0].keys() and self.repetition(__return__[0][x])) """ notUnique={} unique={} for cle, listePos in dic.iteritems(): if self.repetition(listePos, l_texte1): notUnique[cle] = listePos else: unique[cle] = listePos a="""logging.debug('len(unique)='+str(len(unique))) unique1 = {} ; unique2 = {} for (cle,longueur),listePos in unique: if listePos[0] < l_texte1: unique1[(cle,longueur)] = listePos else: unique2[(cle,longueur)] = listePos assert len(unique) == len(unique1)+len(unique2) sep = "  " # espace + ALT+0160 i = 0 while i < len(unique1): (cle,longueur),listePos = unique1.popitem() pos = listePos[0] assert pos < l_texte1 segment1 = texte1[pos:pos+longueur] segment1_ = segment1.strip(sep) stop = False for (cle2,longueur2),listePos2 in unique2.iteritems(): assert listePos2[0] >= l_texte1 pos2 = listePos2[0] - l_texte1 segment2 = texte2[pos2:pos2+longueur2] segment2_ = segment2.strip(sep) if segment1_ == segment2_ and longueur == longueur2: stop = True if stop: break if stop: notUnique[(cle,longueur)] = listePos.extend(listePos2) unique2.pop((cle2,longueur2)) i += 1 """ return notUnique #,unique #def addNumLInter(self,L,num): # res=[] # for x in L: # res.append([x[0]+num,x[1]+num]) # return res def _appelAstar2(self,L1,L2,texte1,texte2,lt1): LC1 = [] ; LC2 = [] ; Lcf1 = [] ; Lcf2 = [] ; LH1 = [] ; LH2 = [] for bloc in L1: cle = hash(texte1[bloc[0]:bloc[1]]) LC1.append((cle,bloc[1]-bloc[0])) for bloc in L2: cle = hash(texte2[bloc[0]-lt1:bloc[1]-lt1]) LC2.append((cle,bloc[1]-bloc[0])) dom_matches = {} for i in xrange(len(L1)): # ligne x,lenght_x = LC1[i] for j in xrange(len(L2)): #colonne pass class Mset2(Mset): """Multiset gérant le + petit invariant et le + petit move trouvé""" def __init__(self, liste=[]): Mset.__init__(self, liste) self.inv_min = sys.maxint ; self.mov_min = sys.maxint self.inv_max = 0 ; self.mov_max = 0 def difference(self, mset2): """Différence entre l'objet courant et mset2, renvoie un nouveau mset2""" assert isinstance(mset2, Mset2) res = Mset2() for cle,(nb, longueur) in self.iteritems(): if cle not in mset2: res[cle] = (nb, longueur) res.valeur += nb * longueur res.length += nb self.mov_min = min(longueur, self.mov_min) self.mov_max = max(longueur, self.mov_max) else: (nb2, longueur2) = mset2[cle] self.inv_min = min(longueur, self.inv_min) self.inv_max = max(longueur, self.inv_max) if nb > nb2: res[cle] = (nb-nb2, longueur) res.valeur += (nb-nb2) * longueur res.length += nb-nb2 self.mov_min = min(longueur, self.mov_min) self.mov_max = max(longueur, self.mov_max) return res, self.inv_min, self.inv_max, self.mov_min, self.mov_max def union(self, mset2): """Union entre l'objet courant et mset2, ATTENTION modifie l'objet courant""" assert isinstance(mset2, Mset2) for cle,(nb, longueur) in mset2.iteritems(): try: nb2, longueur2 = self[cle] self[cle] = nb+nb2, longueur except KeyError: self[cle] = nb, longueur self.valeur += nb * longueur self.length += nb class AlignAstarRecur2(AlignAstarRecur): """ Recursive A* avec fonction d'évaluation des noeuds multi-critére """ def __init__(self, l_texte1,algoAlign="",long_min_pivots=1, seuil_remplacement=0.5, coeff = None, sep=True): AlignAstarRecur.__init__(self, l_texte1, long_min_pivots, sep=sep) self.seuil_remplacements = seuil_remplacement global algo algo=algoAlign if coeff == None: coeff = (0.34, 0.33, 0.33) #if coeff == None: coeff = (1, 0, 0) self.coeffX = coeff[0] self.coeffY = coeff[1] self.coeffZ = coeff[2] assert self.coeffX + self.coeffY + self.coeffZ == 1 # accélére beaucoup mais prend plus de mémoire self.cout = psyco.proxy(self.cout) def calcPosCoutFixe(self, L, longueur_texte, debut_texte=0): """ Calcule des dictionnaires des positions et des couts fixes pre: isinstance(L,list) and len(L)>0 isinstance(longueur_texte,int) #post: forall([texte[x[0]:x[1]] in __return__[0].keys() for x in L]) """ assert isinstance(longueur_texte,int) and longueur_texte > 0 dicoPos = {} coutFixeRepete = {-1 : 0} # cout fixe des blocs répétés (INV ou MOV) coutFixeNonRepete = {-1 : 0} # cout fixe des blocs non répétés (INS, SUP ou REP) cumulRepete = 0 # cumul des tailles des blocs répétés (INV ou MOV) cumulNonRepete = 0 # cumul des tailles des blocs non répétés (INS, SUP ou REP) nbRepete = 0 # nombre de blocs répétés (INV ou MOV) nbNonRepete = 0 # nombre de blocs non répétés (INS, SUP ou REP) pos_texte = 0 # position courante dans le texte lNonRepete = [] # liste des blocs non répétés (INS, SUP ou REP) for i in xrange(len(L)): chaine, debut, fin = L[i] debut -= debut_texte fin -= debut_texte assert 0 <= debut < fin <= longueur_texte, str(debut) +'/'+ str(fin) + '/'+ str(longueur_texte)+'/'+str(L) longueur = fin - debut try: dicoPos[chaine].append(i) # ajout de sa position except KeyError: dicoPos[chaine] = [i] # cumul des couts (leur longueur) des blocs répétés jusqu'à i INCLUS cumulRepete += longueur coutFixeRepete[i] = cumulRepete # cumul des couts (leur longueur) des blocs NON répétés précédent i cumulNonRepete += debut - pos_texte coutFixeNonRepete[i] = cumulNonRepete if debut - pos_texte > 0: # ajout du bloc non répété commencant en pos_texte et finissant en deb # cad précédent i lNonRepete.append((pos_texte,debut)) else: lNonRepete.append((-1,-1)) pos_texte = fin # la position courante équivaut à la somme des cumuls assert fin == cumulRepete + cumulNonRepete if fin != longueur_texte: cumulNonRepete += longueur_texte - fin coutFixeNonRepete[i+1] = cumulNonRepete pos_texte = longueur_texte lNonRepete.append((fin,longueur_texte)) assert pos_texte == longueur_texte # on est arrivé à la fin du texte # le 1er bloc est soit répété soit non répété #assert 0 == lNonRepete[0][0] < L[0][1]-debut_texte or 0 == L[0][1]-debut_texte < lNonRepete[0][0], str(lNonRepete[0][0]) + '/' +str( L[0][1]-debut_texte) assert (0 == lNonRepete[0][0] < L[0][1]-debut_texte or 0 == L[0][1]-debut_texte < lNonRepete[0][0] or -1 == lNonRepete[0][0] and L[0][1]-debut_texte == 0) , str(lNonRepete[0][0]) + '/' +str( L[0][1]-debut_texte) # le dernier bloc est soit répété soit non répété assert longueur_texte == lNonRepete[-1][1] > L[-1][2]-debut_texte or longueur_texte == L[-1][2]-debut_texte > lNonRepete[-1][1], str(lNonRepete[-1][1]) + '/' +str( L[-1][2]-debut_texte) + '/' +str(longueur_texte) assert abs(len(lNonRepete)-len(L)) <= 1, abs(len(lNonRepete)-len(L)) #print 'len(lNonRepete)-len(L)=', len(lNonRepete)-len(L) #print 'L',L #print 'lNonRepete=', lNonRepete return dicoPos, (coutFixeRepete, coutFixeNonRepete), lNonRepete def calcPosCoutFixe_(self, L, longueur_texte, debut_texte=0): """ Calcule des dictionnaires des positions et des couts fixes pre: isinstance(L,list) and len(L)>0 isinstance(longueur_texte,int) #post: forall([texte[x[0]:x[1]] in __return__[0].keys() for x in L]) """ assert isinstance(longueur_texte,int) and longueur_texte > 0 dicoPos = {} #coutFixeRepete = {-1 : 0} # cout fixe des blocs répétés (INV ou MOV) #coutFixeNonRepete = {-1 : 0} # cout fixe des blocs non répétés (INS, SUP ou REP) # coutFixe[0] = cout fixe des blocs répétés (INV ou MOV) # coutFixe[1] = cout fixe des blocs non répétés (INS, SUP ou REP) coutFixe = numpy.zeros((2,len(L)+1), numpy.int_) cumulRepete = 0 # cumul des tailles des blocs répétés (INV ou MOV) cumulNonRepete = 0 # cumul des tailles des blocs non répétés (INS, SUP ou REP) nbRepete = 0 # nombre de blocs répétés (INV ou MOV) nbNonRepete = 0 # nombre de blocs non répétés (INS, SUP ou REP) pos_texte = 0 # position courante dans le texte lNonRepete = [] # liste des blocs non répétés (INS, SUP ou REP) for i in xrange(len(L)): chaine, debut, fin = L[i] debut -= debut_texte fin -= debut_texte assert 0 <= debut < fin <= longueur_texte, str(debut) +'/'+ str(fin) + '/'+ str(longueur_texte)+'/'+str(L) longueur = fin - debut try: dicoPos[chaine].append(i) # ajout de sa position except KeyError: dicoPos[chaine] = [i] # cumul des couts (leur longueur) des blocs répétés jusqu'à i INCLUS cumulRepete += longueur #coutFixeRepete[i] = cumulRepete coutFixe[0][i] = cumulRepete # cumul des couts (leur longueur) des blocs NON répétés précédent i cumulNonRepete += debut - pos_texte #coutFixeNonRepete[i] = cumulNonRepete coutFixe[1][i] = cumulNonRepete if debut - pos_texte > 0: # ajout du bloc non répété commencant en pos_texte et finissant en deb # cad précédent i lNonRepete.append((pos_texte,debut)) else: lNonRepete.append((-1,-1)) pos_texte = fin # la position courante équivaut à la somme des cumuls assert fin == cumulRepete + cumulNonRepete if fin != longueur_texte: cumulNonRepete += longueur_texte - fin coutFixeNonRepete[i+1] = cumulNonRepete pos_texte = longueur_texte lNonRepete.append((fin,longueur_texte)) assert pos_texte == longueur_texte # on est arrivé à la fin du texte # le 1er bloc est soit répété soit non répété #assert 0 == lNonRepete[0][0] < L[0][1]-debut_texte or 0 == L[0][1]-debut_texte < lNonRepete[0][0], str(lNonRepete[0][0]) + '/' +str( L[0][1]-debut_texte) assert (0 == lNonRepete[0][0] < L[0][1]-debut_texte or 0 == L[0][1]-debut_texte < lNonRepete[0][0] or -1 == lNonRepete[0][0] and L[0][1]-debut_texte == 0) , str(lNonRepete[0][0]) + '/' +str( L[0][1]-debut_texte) # le dernier bloc est soit répété soit non répété assert longueur_texte == lNonRepete[-1][1] > L[-1][2]-debut_texte or longueur_texte == L[-1][2]-debut_texte > lNonRepete[-1][1], str(lNonRepete[-1][1]) + '/' +str( L[-1][2]-debut_texte) + '/' +str(longueur_texte) assert abs(len(lNonRepete)-len(L)) <= 1, abs(len(lNonRepete)-len(L)) #print 'len(lNonRepete)-len(L)=', len(lNonRepete)-len(L) #print 'L',L #print 'lNonRepete=', lNonRepete return dicoPos, (coutFixeRepete, coutFixeNonRepete), lNonRepete def extractMinMax(self, liste): """Extrait d'une liste le + petit et le + grand bloc situé avant et après chaque position i de cette liste dicoMinMax[0][i] = bloc le + petit avant la bloc i dicoMinMax[1][i] = bloc le + grand avant la bloc i dicoMinMax[2][i] = bloc le + petit après la bloc i dicoMinMax[3][i] = bloc le + grand après la bloc i """ dicoMinMax = {0 : {}, 1 : {}, 2 : {}, 3 : {}} #dicoMinMax = numpy.zeros((4,len(liste)), numpy.int_) # on utilise pas maxint car ça bug avec pyrex min_before = min_after = int(sys.maxint/10) max_before = max_after = 0 for i in xrange(len(liste)): debut,fin = liste[i] if fin - debut > 0: min_before = min(min_before, fin-debut) max_before = max(max_before, fin-debut) dicoMinMax[0][i] = min_before dicoMinMax[1][i] = max_before #assert 0 < min_before <= max_before, (min_before, max_before) for i in xrange(len(liste)-1, -1, -1): debut,fin = liste[i] if fin - debut > 0: min_after = min(min_after, fin-debut) max_after = max(max_after, fin-debut) dicoMinMax[2][i] = min_after dicoMinMax[3][i] = max_after #assert 0 < min_after <= max_after, (min_after, max_after) if __debug__: i = 0 while i < len(dicoMinMax[0])-2: assert dicoMinMax[0][i] >= dicoMinMax[0][i+1], dicoMinMax[0] i += 1 i = 0 while i < len(dicoMinMax[1])-2: assert dicoMinMax[1][i] <= dicoMinMax[1][i+1] i += 1 i = 0 while i < len(dicoMinMax[2])-2: assert dicoMinMax[2][i] <= dicoMinMax[2][i+1] i += 1 i = 0 while i < len(dicoMinMax[3])-2: assert dicoMinMax[3][i] >= dicoMinMax[3][i+1] i += 1 return dicoMinMax def extractRemplacements(self, L1, L2): #if len(L1) == 0 or len(L2) == 0: return numarray.array(), numarray.array() diff_taille = len(L1) - len(L2) if diff_taille < 0: #L1.extend([-1] * abs(diff_taille)) L2 = L2[:len(L1)] elif diff_taille > 0: #L2.extend([-1] * diff_taille) L1 = L1[:len(L2)] #print len(L1), len(L2) assert len(L1) == len(L2), str(len(L1))+'/'+str(len(L2)) # variable locale pour accélérer les where seuil_remp = self.seuil_remplacements inverse_seuil_remp = 1.0 / seuil_remp # obligatoirement 1 tableau float pour avoir les divisions non entières # 1 seul pour sauver un peu de place #print L1, type(L1) tab1 = numpy.array(L1, numpy.float_) tab2 = numpy.array(L2) #logging.debug(('1',tab1, tab2)) # on ne conserve que les valeurs qui sont au-dessus du seuil des remplacements tab1 = numpy.where((tab1 / tab2) > seuil_remp , tab1, 0) tab2 = numpy.where((tab1 / tab2) < inverse_seuil_remp , tab2, 0) #logging.debug(('2',tab1, tab2)) # masque où seul les positions ayant une valeur au-dessus du seuil sont vraies masque_remp = numpy.logical_and(tab1, tab2) # application du masque pour ne conserver que les positions ou les 2 conditions de seuil sont vraies tab1 = numpy.where(masque_remp, tab1, 0) tab2 = numpy.where(masque_remp, tab2, 0) # on filtre les -1 tab1 = numpy.where(tab1 >= 0, tab1, 0) tab2 = numpy.where(tab2 >= 0, tab2, 0) #logging.debug(('3',tab1, tab2)) return tab1, tab2 def extractRemplacementsNumarray(self, L1, L2): #if len(L1) == 0 or len(L2) == 0: return numarray.array(), numarray.array() diff_taille = len(L1) - len(L2) if diff_taille < 0: L1.extend([-1] * abs(diff_taille)) elif diff_taille > 0: L2.extend([-1] * diff_taille) #print len(L1), len(L2) assert len(L1) == len(L2), str(len(L1))+'/'+str(len(L2)) # variable locale pour accélérer les where seuil_remp = self.seuil_remplacements inverse_seuil_remp = 1.0 / seuil_remp # obligatoirement 1 tableau float pour avoir les divisions non entières # 1 seul pour sauver un peu de place tab1 = numarray.array(L1, type = 'Float32') tab2 = numarray.array(L2) # on ne conserve que les valeurs qui sont au-dessus du seuil des remplacements numarray.where((tab1 / tab2) > seuil_remp , tab1, 0, tab1) numarray.where((tab1 / tab2) < inverse_seuil_remp , tab2, 0, tab2) # masque où seul les positions ayant une valeur au-dessus du seuil sont vraies masque_remp = numarray.logical_and(tab1, tab2) # application du masque pour ne conserver que les positions ou les 2 conditions de seuil sont vraies numarray.where(masque_remp, tab1, 0, tab1) numarray.where(masque_remp, tab2, 0, tab2) return tab1, tab2 def extractRemplacementsNumeric(self, L1, L2): #if len(L1) == 0 or len(L2) == 0: return numarray.array(), numarray.array() diff_taille = len(L1) - len(L2) if diff_taille < 0: L1.extend([-1] * abs(diff_taille)) elif diff_taille > 0: L2.extend([-1] * diff_taille) #print len(L1), len(L2) assert len(L1) == len(L2), str(len(L1))+'/'+str(len(L2)) # variable locale pour accélérer les where seuil_remp = self.seuil_remplacements inverse_seuil_remp = 1.0 / seuil_remp # obligatoirement 1 tableau float pour avoir les divisions non entières # 1 seul pour sauver un peu de place tab1 = Numeric.array(L1, Numeric.Float) tab2 = Numeric.array(L2) # on ne conserve que les valeurs qui sont au-dessus du seuil des remplacements tab1 = Numeric.where((tab1 / tab2) > seuil_remp , tab1, 0) tab2 = Numeric.where((tab1 / tab2) < inverse_seuil_remp , tab2, 0) # masque où seul les positions ayant une valeur au-dessus du seuil sont vraies masque_remp = Numeric.logical_and(tab1, tab2) # application du masque pour ne conserver que les positions ou les 2 conditions de seuil sont vraies tab1 = Numeric.where(masque_remp, tab1, 0) tab2 = Numeric.where(masque_remp, tab2, 0) return tab1, tab2 def _appelAstar(self,L1,L2,texte1,texte2,lt1): """ Alignement A* entre les 2 séquences pre: isinstance(L1,list) and isinstance(L2,list) and isinstance(texte1,str) and isinstance(texte2,str) post: (len(__return__[0])==len(__return__[1])) or (len(__return__[0])==len(__return__[1])+1) or \ (len(__return__[0])+1==len(__return__[1]))""" # dicoPos1 stocke pour chaque bloc de L1 ses positions dans cette liste logging.log(5,"debut _appelAstar") bu_l_texte1 = self.l_texte1 ; bu_l_texte2 = self.l_texte2 self.l_texte1 = lt1 ; self.l_texte2 = len(texte2) LC1 = [] ; LC2 = [] ; LH1 = [] ; LH2 = [] ; lRepetT1 = [] ; lRepetT2 = [] for bloc in L1: cle = hash(texte1[bloc[0]:bloc[1]]) LC1.append(cle) LH1.append((cle, bloc[0], bloc[1])) lRepetT1.append(bloc[1]-bloc[0]) logging.log(5,'init L1 done') for bloc in L2: cle = hash(texte2[bloc[0]-lt1:bloc[1]-lt1]) LC2.append(cle) LH2.append((cle, bloc[0], bloc[1])) lRepetT2.append(bloc[1]-bloc[0]) logging.log(5,'init L2 done') arrayRepetT1 = numpy.array(lRepetT1) arrayRepetT2 = numpy.array(lRepetT2) dicoPos = {} ; coutFixe = {} ; dicoMinMax = {} dicoPos[1], (coutFixeRepeteT1, coutFixeNonRepeteT1), lSup = self.calcPosCoutFixe(LH1, self.l_texte1) dicoPos[2], (coutFixeRepeteT2, coutFixeNonRepeteT2), lIns = self.calcPosCoutFixe(LH2, self.l_texte2, debut_texte=self.l_texte1) coutFixe = ((coutFixeRepeteT1, coutFixeNonRepeteT1), (coutFixeRepeteT2, coutFixeNonRepeteT2)) logging.log(5,"fin calcPosCoutFixe") dicoMinMax1 = self.extractMinMax(lSup) dicoMinMax2 = self.extractMinMax(lIns) #logging.debug(dicoMinMax1) #logging.debug(dicoMinMax2) diffSym = self.preTraitDiffSym(LH1,LH2,dicoPos, lRepetT1, lRepetT2) # pré-calcul des différences symétriques logging.log(5,"fin preTraitDiffSym") lSuppression = [] ; lInsertion = [] for deb,fin in lSup: if fin-deb == 0: lSuppression.append(-1) else: lSuppression.append(fin-deb) for deb,fin in lIns: if fin-deb == 0: lInsertion.append(-1) else: lInsertion.append(fin-deb) # A* #trace1('best, closedSet = self.deplacements_pond_Astar(LC1,LC2,diffSym,dicoPos,coutFixe, dicoMinMax1,dicoMinMax2, lSuppression, lInsertion, arrayRepetT1, arrayRepetT2, MO=True)',locals()) best, closedSet = self.deplacements_pond_Astar(LC1,LC2,diffSym,dicoPos,coutFixe, dicoMinMax1,dicoMinMax2, lSuppression, lInsertion, arrayRepetT1, arrayRepetT2, MO=True) #best, closedSet = cost.deplacements_pond_Astar(LC1,LC2,diffSym,dicoPos,coutFixe, dicoMinMax1,dicoMinMax2, lSuppression, lInsertion, arrayRepetT1, arrayRepetT2, True, self.l_texte1, self.l_texte2, (self.coeffX, self.coeffY, self.coeffZ)) logging.log(5,"fin deplacements_pond_Astar") dicoBest={1:{}, 2:{}} # dico des positions dans L1 et L2 du meilleur parcours while best != (-1,-1): # on remonte le meilleur parcours trouvé dicoBest[1][best[0]]=1 dicoBest[2][best[1]]=1 best=closedSet[best][1] # noeud père s1=self._makeLRes(dicoBest[1],L1) s2=self._makeLRes(dicoBest[2],L2) logging.log(5,'s1 = '+str(s1)) logging.log(5,'s2 = '+str(s2)) del dicoPos, coutFixe, dicoBest self.l_texte1 = bu_l_texte1 ; self.l_texte2 = bu_l_texte2 logging.log(5,"fin _appelAstar") return s1,s2 def cout__(self,(k,l),(i,j),diffSym,((coutFixeRepeteT1, coutFixeNonRepeteT1), (coutFixeRepeteT2, coutFixeNonRepeteT2)), bestValue, dicoMinMax1, dicoMinMax2, lSuppression, lInsertion, arrayRepetT1, arrayRepetT2): return cost.fct_cout(k,l,i,j,diffSym,coutFixeRepeteT1, coutFixeNonRepeteT1, coutFixeRepeteT2, coutFixeNonRepeteT2, bestValue, dicoMinMax1, dicoMinMax2, lSuppression, lInsertion, arrayRepetT1, arrayRepetT2, self.coeffX, self.coeffY, self.coeffZ, self.l_texte1, self.l_texte2) #def cout(self,d1,d2,f1,f2,diffSym,(coutFixeRepete, coutFixeNonRepete)): def cout(self,(k,l),(i,j),diffSym,((coutFixeRepeteT1, coutFixeNonRepeteT1), (coutFixeRepeteT2, coutFixeNonRepeteT2)), bestValue, dicoMinMax1, dicoMinMax2, lSuppression, lInsertion, arrayRepetT1, arrayRepetT2): """ calcul du cout d'un noeud """ #calcRemp = False #if calcRemp: # liste_remp1 = lSuppression[k+1:i] ; liste_remp2 = lInsertion[l+1:j] # if len(liste_remp1) > 0 and len(liste_remp2) > 0: # remp1, remp2 = self.extractRemplacements(liste_remp1, liste_remp2) #tot_remp1 = numarray.sum(remp1) ; tot_remp2 = numarray.sum(remp2) #numarray.where(remp1 > 0, 1, 0, remp1) #numarray.where(remp2 > 0, 1, 0, remp2) #nb_remp1 = numarray.sum(remp1) ; nb_remp2 = numarray.sum(remp2) #tot_remp1 = Numeric.sum(remp1) ; tot_remp2 = Numeric.sum(remp2) #remp1 = Numeric.where(remp1 > 0, 1, 0) #remp2 = Numeric.where(remp2 > 0, 1, 0) #nb_remp1 = Numeric.sum(remp1) ; nb_remp2 = Numeric.sum(remp2) #print remp1, remp2 # tot_remp1 = numpy.sum(remp1) ; tot_remp2 = numpy.sum(remp2) # remp1 = numpy.where(remp1 > 0, 1, 0) # remp2 = numpy.where(remp2 > 0, 1, 0) # nb_remp1 = numpy.sum(remp1) ; nb_remp2 = numpy.sum(remp2) # assert nb_remp1 == nb_remp2 #print remp1, remp2,tot_remp1, tot_remp2, nb_remp1, nb_remp2 # else: # tot_remp1 = tot_remp2 = nb_remp1 = nb_remp2 = 0 #else: # tot_remp1 = tot_remp2 = nb_remp1 = nb_remp2 = 0 tot_remp1 = tot_remp2 = nb_remp1 = nb_remp2 = 0 assert nb_remp1 == nb_remp2 (best_cumulINV, best_cumulMOV, best_cumulINS, best_cumulSUP, best_cumulREP, best_nbINV, best_nbMOV, best_nbINS, best_nbSUP, best_nbREP, best_inv_max, best_mov_max) = bestValue[2] assert arrayRepetT1[i] == coutFixeRepeteT1[i] - coutFixeRepeteT1[i-1] assert arrayRepetT2[j] == coutFixeRepeteT2[j] - coutFixeRepeteT2[j-1] cumulINV = best_cumulINV + ( coutFixeRepeteT1[i] - coutFixeRepeteT1[i-1] + coutFixeRepeteT2[j] - coutFixeRepeteT2[j-1] ) #cumulINV = ( coutFixeRepeteT1[i] - coutFixeRepeteT1[i-1] + # coutFixeRepeteT2[j] - coutFixeRepeteT2[j-1] ) assert sum(arrayRepetT1[k+1:i]) == coutFixeRepeteT1[i-1] - coutFixeRepeteT1[k] assert sum(arrayRepetT2[l+1:j]) == coutFixeRepeteT2[j-1] - coutFixeRepeteT2[l] cumulMOV = best_cumulMOV + ( coutFixeRepeteT1[i-1] - coutFixeRepeteT1[k] + coutFixeRepeteT2[j-1] - coutFixeRepeteT2[l] ) #cumulMOV = ( coutFixeRepeteT1[i-1] - coutFixeRepeteT1[k] + # coutFixeRepeteT2[j-1] - coutFixeRepeteT2[l] ) assert coutFixeNonRepeteT2[j] - coutFixeNonRepeteT2[l] - tot_remp2 >= 0,(coutFixeNonRepeteT2[j],coutFixeNonRepeteT2[l],tot_remp2) cumulINS = best_cumulINS + ( coutFixeNonRepeteT2[j] - coutFixeNonRepeteT2[l] )#- tot_remp2) #cumulINS = (coutFixeNonRepeteT2[j] - # coutFixeNonRepeteT2[l] )#- tot_remp2) assert coutFixeNonRepeteT1[i] - coutFixeNonRepeteT1[k] - tot_remp1 >= 0, (coutFixeNonRepeteT1[i], coutFixeNonRepeteT1[k],tot_remp1) cumulSUP = best_cumulSUP + ( coutFixeNonRepeteT1[i] - coutFixeNonRepeteT1[k])# - tot_remp1) #cumulSUP = ( coutFixeNonRepeteT1[i] - # coutFixeNonRepeteT1[k] )#- tot_remp1) #print cumulSUP, best_cumulSUP,coutFixeNonRepeteT1[i], coutFixeNonRepeteT1[k], tot_remp1 assert tot_remp1 + tot_remp2 >= 0, (tot_remp1, tot_remp2) cumulREP = best_cumulREP + ( tot_remp1 + tot_remp2 ) #cumulREP = 0.0 + ( tot_remp1 + tot_remp2 ) assert best_cumulINV < cumulINV <= self.l_texte1 + self.l_texte2 - cumulMOV, (best_cumulINV, cumulINV, self.l_texte1, self.l_texte2, cumulMOV) assert best_cumulMOV <= cumulMOV <= self.l_texte1 + self.l_texte2 - cumulINV assert best_cumulINS <= cumulINS <= self.l_texte2, (best_cumulINS, cumulINS,coutFixeNonRepeteT2[j], coutFixeNonRepeteT2[l],tot_remp2, self.l_texte2) assert best_cumulSUP <= cumulSUP <= self.l_texte1 a="""assert best_cumulREP <= cumulREP <= self.l_texte1 + self.l_texte2, (best_cumulREP, cumulREP) """ # on calcule les couts totaux f en sommant les couts fixes g et les # couts heuristiques h, tels que f = g + h pour tous les paramètres (difference_sym, nb_blocs_diff_sym, inv_min_after, inv_max_after, mov_min_after, mov_max_after) = diffSym[(i,j)] #h_INV = (coutFixeRepeteT1[-1] - coutFixeRepeteT1[i] + # coutFixeRepeteT2[-1] - coutFixeRepeteT2[j] - difference_sym) # cout fixe, paramètre X g_x = 0.0 - (cumulMOV+cumulINV) #+ cumulINV - cumulINS - cumulSUP #- cumulREP # cout heuristique, paramètre X h_x = + difference_sym #- h_INV #diffSym[(i,j)][0] # cout total normalisé, paramètre X f_x = (1 + (g_x - h_x) / (self.l_texte1 + self.l_texte2)) / 2 #print f_x assert 0 <= f_x <= 1, f_x # pb indices entre arrayRepetT1 et cumulRepet probably # bloc invariant de taille max entre les précédents et le courant (i,j) #current_inv_max = max(best_inv_max, arrayRepetT1[i]) current_inv_max = best_inv_max if current_inv_max < arrayRepetT1[i]: current_inv_max = arrayRepetT1[i] # bloc move de taille max entre les précédents et les courants situés entre (k,l) et (i,j) nb_mov_added1 = nb_mov_added2 = max_mov_added1 = max_mov_added2 = 0 if i - (k+1) > 0: #a1 = arrayRepetT1[k+1:i] #max_mov_added1 = numpy.max(a1) max_mov_added1 = numpy.max(arrayRepetT1[k+1:i]) #max_mov_added1 = 0 #for x in xrange(k+1,i): # if arrayRepetT1[x] > max_mov_added1: max_mov_added1 = arrayRepetT1[x] nb_mov_added1 = i - (k + 1) #nb_mov_added1 = len(a1) if j - (l+1) > 0: #a2 = arrayRepetT2[l+1:j] #max_mov_added2 = numpy.max(a2) max_mov_added2 = numpy.max(arrayRepetT2[l+1:j]) #max_mov_added2 = 0 #for x in xrange(l+1,j): # if arrayRepetT2[x] > max_mov_added2: max_mov_added2 = arrayRepetT2[x] nb_mov_added2 = j - (l + 1) #nb_mov_added2 = len(a2) #current_mov_max = max(best_mov_max, max_mov_added1, max_mov_added2) current_mov_max = best_mov_max if current_mov_max < max_mov_added1: current_mov_max = max_mov_added1 if current_mov_max < max_mov_added2: current_mov_max = max_mov_added2 #if current_mov_max == 0: # print (k,l),(i,j),best_mov_max, max_mov_added1, max_mov_added2 nbINV = best_nbINV + 2 #nbMOV = best_nbMOV + (i - k - 1) + (j - l -1) nbMOV = best_nbMOV + nb_mov_added1 + nb_mov_added2 nbINS = best_nbINS + (j - l) - nb_remp2 nbSUP = best_nbSUP + (i - k) - nb_remp1 nbREP = best_nbREP + nb_remp1 + nb_remp2 # couts totaux, paramètres Yi #if current_inv_max == 0: # print (k,l),(i,j),arrayRepetT1[i] deno = current_inv_max if deno < inv_min_after: deno = inv_min_after f_yINV = ((0.0 + cumulINV + inv_min_after) / (nbINV +2)) / deno assert 0 < f_yINV <= 1, ((k,l),(i,j),f_yINV, cumulINV, inv_min_after, nbINV,current_inv_max,arrayRepetT1[:i+1],arrayRepetT2[:j+1]) #print (cumulMOV, mov_min_after, (nbMOV + 2),current_mov_max) if current_mov_max == 0: f_yMOV = 0 else: f_yMOV = ((cumulMOV + mov_min_after) / (nbMOV + 2)) / (current_mov_max) #else: f_yMOV = ((cumulMOV + diff_sym) / (nbMOV + nb_blocs_diff_sym)) / (current_mov_max) #logging.debug(dicoMinMax2) deno2 = dicoMinMax2[1][j] if deno2 < dicoMinMax2[2][j]: deno2 = dicoMinMax2[2][j] if deno2 == 0: f_yINS = 0 else: f_yINS = ((cumulINS + dicoMinMax2[2][j]) / (nbINS + 1)) / (deno2) #print (k,l),(i,j),f_yINS,(cumulINS, dicoMinMax2[2][j],(nbINS + 1),(dicoMinMax2[1][j],dicoMinMax2[2][j])) #print (cumulSUP, dicoMinMax1[2][i],(nbSUP + 1)),(dicoMinMax1[1][i],dicoMinMax1[2][i]) deno1 = dicoMinMax1[1][i] if deno1 < dicoMinMax1[2][i]: deno1 = dicoMinMax1[2][i] if deno1 == 0: f_ySUP = 0 else: f_ySUP = ((cumulSUP + dicoMinMax1[2][i]) / (nbSUP + 1)) / (deno1) #if max(dicoMinMax2[1][j],dicoMinMax1[1][i],dicoMinMax2[2][j],dicoMinMax1[2][i]) == 0: f_yREP = 0 if deno1+deno2 == 0: f_yREP = 0 else: f_yREP = (((cumulREP + dicoMinMax2[2][j] + dicoMinMax1[2][i]) / (nbREP + 2)) / (deno1+deno2)) # cout total normalisé, paramètre Y f_y = (f_yINV + f_yMOV + f_yINS + f_ySUP + f_yREP) / 5 #print (k,l),(i,j),(f_y, (f_yINV, f_yMOV, f_yINS, f_ySUP, f_yREP)) assert 0 <= f_y <= 1, (f_y, (f_yINV, f_yMOV, f_yINS, f_ySUP, f_yREP)) # cout total normalisé, paramètre Z1 denominateur_z1 = ((cumulINS + dicoMinMax2[2][j]) + (cumulSUP + dicoMinMax1[2][i]) + (cumulREP + dicoMinMax2[2][j] + dicoMinMax1[2][i]) + (cumulMOV + mov_min_after)) if denominateur_z1 == 0: f_z1 = 0 else: f_z1 = (1.0 + cumulMOV + mov_min_after) / denominateur_z1 # cout total normalisé, paramètre Z2 denominateur_z2 = ((cumulINS + dicoMinMax2[2][j]) + (cumulSUP + dicoMinMax1[2][i]) + (cumulREP + dicoMinMax2[2][j] + dicoMinMax1[2][i])) if denominateur_z2 == 0: f_z2 = 0 else: f_z2 = (1.0 + cumulREP + dicoMinMax2[2][j] + dicoMinMax1[2][i]) / (denominateur_z2) # cout total normalisé, paramètre Z f_z0 = (0.0 + cumulINV+ inv_min_after)/(cumulINV+ inv_min_after+cumulMOV+ mov_min_after) #f_z = (f_z0 + f_z1 + f_z2) / 3 f_z = (f_z0 + f_z2) / 2 f_temp = (1.0/3 * f_x + 1.0/3 * f_y + 1.0/3 * f_z) assert 0 <= f_temp <= 1 if f_temp == 0: f_temp = 1.0 / sys.maxint f = 1.0 / f_temp # f_temp # #logging.debug((f_temp,f_x, f_y , f_z)) return (f, (k,l), (cumulINV, cumulMOV, cumulINS, cumulSUP, cumulREP, nbINV, nbMOV, nbINS, nbSUP, nbREP, current_inv_max, current_mov_max)) def preTraitDiffSym(self,L1,L2,dicoPos, lRepetT1, lRepetT2, niveau=0): diffSym={} # dico[(i,j)] stocke le cout heuristique et la longueur de la liste résultant de la différence symétrique entre L1[i+1:] et L2[j+1:] LH1=L1 ; LH2=L2 logging.log(5,'len(LH2)= '+str(len(LH2))+' / len(LH1)= '+str(len(LH1))) len_L1 = len(LH1) ; len_L2 = len(LH2) i=0 for posB1 in xrange(len_L1-1,-1,-1): if (i<10 or i>len_L1-10 or i%100==0): logging.log(5,'itération LH1: '+str(i)) LL1 = Mset2(LH1[posB1+1:]) if (i<10 or i>len_L1-10 or i%100==0): logging.log(5,' done MSet1') liste_pos2 = dicoPos[2][LH1[posB1][0]]#texte[L1[posB1][0]:L1[posB1][1]]] j = len(liste_pos2)-1 while j >= 0: posB2 = liste_pos2[j] if j == len(liste_pos2)-1: LL2 = Mset2(LH2[posB2+1:]) #; ds = nb = 0 LL1res, inv_min1, inv_max1, mov_min1, mov_max1 = LL1.difference(LL2) LL2res, inv_min2, inv_max2, mov_min2, mov_max2 = LL2.difference(LL1) assert inv_min1 == inv_min2 assert inv_max1 == inv_max2 #print inv_min_long1, inv_min_long2 #if inv_min_long1 < sys.maxint: print inv_min_long1 else: next_posB2 = liste_pos2[j+1] #new_LL1 = Mset(LH1[posB2+1:next_posB2+1]) #LL1res, inv_min_long1, mov_min_long1 = LL1res.difference(LL2res.intersection(new_LL1)) #LL2res, inv_min_long2, mov_min_long2 = LL2res.difference(new_LL1) #assert inv_min_long1 == inv_min_long2, str(inv_min_long1) +'/'+ str(inv_min_long2)+'/'+str(new_LL1)+'/'+str(LL2res.intersection(new_LL1)) new_LL2 = Mset2(LH2[posB2+1:next_posB2]) #print 'new_LL2',new_LL2 t2, inv_min_t2, inv_max_t2, mov_min_t2, mov_max_t2 = new_LL2.difference(LL1res) LL2res.union(t2) inv_min2 = min(inv_min2, inv_min_t2) inv_max2 = max(inv_max2, inv_max_t2) mov_min2 = min(mov_min2, mov_min_t2) mov_max2 = max(mov_max2, mov_max_t2) LL1res, inv_min_t1, inv_max_t1, mov_min_t1, mov_max_t1 = LL1res.difference(new_LL2) inv_min1 = min(inv_min1, inv_min_t1) inv_max1 = max(inv_max1, inv_max_t1) mov_min1 = min(mov_min1, mov_min_t1) mov_max1 = max(mov_max1, mov_max_t1) #print inv_min_long1, inv_min_long2 assert inv_min1 == inv_min2, str(inv_min1) +'/'+ str(inv_min2) assert inv_max1 == inv_max2 # liste_remp1 = [] ; liste_remp2 = [] # if posB1 < len_L1-1: liste_remp1 = lRepetT1[posB1+1:] # if posB2 < len_L2-1: liste_remp2 = lRepetT2[posB2+1:] # #print liste_remp1, liste_remp2 # if len(liste_remp1) > 0 and len(liste_remp2) > 0: # remp1, remp2 = self.extractRemplacements(liste_remp1, liste_remp2) # nonzero_remp1 = numarray.nonzero(remp1)[0] # nonzero_remp2 = numarray.nonzero(remp2)[0] # if len(nonzero_remp1) > 0 and len(nonzero_remp2) > 0: # pos_first_remp1 = nonzero_remp1[0] # pos_first_remp2 = nonzero_remp2[0] # long_remp1 = remp1[pos_first_remp1] # long_remp2 = remp2[pos_first_remp2] # assert long_remp1 / long_remp2 > self.seuil_remplacements or long_remp1 / long_remp2 > 1 / self.seuil_remplacements # else:long_remp1 = long_remp2 = 0 # else: long_remp1 = long_remp2 = 0 ds2 = LL1res.valeur + LL2res.valeur nb2 = LL1res.length + LL2.length mov_min = min(mov_min1, mov_min2) mov_max = max(mov_max1, mov_max2) assert inv_min1 == inv_min2 assert inv_max1 == inv_max2 inv_min = inv_min1 if inv_min == sys.maxint: inv_min = 0 # pas de mov après (i,j), on considère donc que le mov minimum # après (i,j) vaut 0 if mov_min == sys.maxint: mov_min = 0 diffSym[(posB1,posB2)] = (ds2, nb2, inv_min, inv_max1, mov_min, mov_max)#, long_remp1+long_remp2) j -= 1 if (i<10 or i>len_L1-10 or i%100==0): logging.log(5,' done itération interne') i+=1 #print '------------------' return diffSym class AlignShapiraRecur(AlignAstarRecur): """ algo GREEDY de Sahpira et Storer 2002""" def deplacements_pond2(self, t1, t2, niveau=0): """ pre: isinstance(t1,str) and isinstance(t2,str) """ #print "T1:"+t1 ; print "T2:"+t2 if len(t1)==0 or len(t2)==0: return [],[],[],[] logging.log(5,"debut dep_pond niveau "+str(niveau)) LResT1,LResT2 = self.compute_alignement(t1,t2) if len(LResT1)==0 or len(LResT2)==0: return [],[],[],[] texte=t1+t2 debutT1=i=0 ; debutT2=len(t1) lResBC1=[] ; lResBC2=[] ; lResDEP1=[] ; lResDEP2=[] #print "LResT1:"+str(LResT1); print "LResT2:"+str(LResT2) while (ilen(LResT2): assert(len(LResT1)==len(LResT2)+1 and LResT1[-1][0] is None) lResDEP1.extend(LResT1[-1][1]) elif len(LResT2)>len(LResT1): assert(len(LResT2)==len(LResT1)+1 and LResT2[-1][0] is None) lResDEP2.extend(LResT2[-1][1]) else: assert(len(LResT1)==len(LResT2)) logging.log(5,"fin dep_pond niveau "+str(niveau)) return lResDEP1,lResDEP2,lResBC1,lResBC2 def compute_alignement(self,t1,t2): """ prends les 2 textes en entrée et renvoie 2 listes au format [(BC,[BDeps le précédant])] """ lLCS = self._compute_lLCS(t1,t2) if len(lLCS) == 0: return [],[] lTexte = list(t1) ; lTexte.extend(list(t2)) len_t1 = len(t1) while len(lLCS) > 1: (longueur,inv_len_lOcc,cle,lOcc) = lLCS.pop(0) # recherche du nb d'occurrences à remplacer lOcc_t1 = [] ; lOcc_t2 = [] for occ in lOcc: if occ >= len_t1: lOcc_t1.append(occ) else: lOcc_t2.append(occ) nb_occ_replace = min(len(lOcc_t1), len(lOcc_t2)) # remplacement des occurrences for i in xrange(nb_occ_replace): occ1 = lOcc_t1[i] ; occ2 = lOcc_t2[i] for j in xrange(occ1,occ1+longueur): lTexte[j] = (longueur,inv_len_lOcc,cle,occ1) for j in xrange(occ2,occ2+longueur): lTexte[j] = (longueur,inv_len_lOcc,cle,occ2) # transformation en une liste avec les blocs répétéss logging.debug('transformation en une liste avec les blocs répétéss') lTexte2 = [] ; i = 0 ; new_len_t1 = 0 while i < len(lTexte): item = lTexte[i] if isinstance(item, tuple): item2 = item ; i += item[0] else: item2 = (1,1,item,i) i += 1 if item2[3] < len_t1: new_len_t1 += 1 lTexte2.append(item2) if __debug__: longueur = 0 for car in lTexte2: if isinstance(car, tuple): longueur += car[0] else: longueur += 1 assert longueur == len(lTexte) == len(t1) + len(t2) LResT1,LResT2 = self.compute_ed_array(lTexte2, new_len_t1) return LResT1,LResT2 def compute_ed(self, texte, len_t1): logging.debug('compute_ed') t1 = texte[:len_t1] ; t2 = texte[len_t1:] ; len_t2 = len(t2) logging.debug('nb ligne = %d, nb_col = %d',len_t1,len_t2) # initialisation de la table de programmation dynamique table = {(-1,-1) : (0,None)} # indexé par (i,j) for i in xrange(len_t1): table[(i,-1)] = i,(i-1,-1),'D' for j in xrange(len_t2): table[(-1,j)] = j,(-1,j-1),'I' for i in xrange(len_t1): # calcul de toutes les cellules #logging.debug('ligne %d',i) for j in xrange(len_t2): deletion = table[(i-1,j)] insertion = table[(i,j-1)] diagonal = table[(i-1,j-1)] if (t1[i][2] == t2[j][2]): table[(i,j)] = min((diagonal[0],(i-1,j-1),'BC'), (deletion[0]+1,(i-1,j),'D'), (insertion[0]+1,(i,j-1),'I')) else: table[(i,j)] = min((deletion[0]+1,(i-1,j),'D'), (insertion[0]+1,(i,j-1),'I')) logging.debug("ed = "+str(table[i,j])) # check_move # extraction du chemin optimal i = len_t1-1 ; j = len_t2-1 ; path = [] while i != -1 and j != -1: cell = table[i,j] path.append(((i,j),cell[0],cell[1],cell[2])) i = cell[1][0] ; j = cell[1][1] path.reverse() # recherche du nb d'insertions et deletions pour chaque caractère pour les 2 textes dico_pos_t1 = {}; dico_pos_t2 = {} for (i,j),ed,pere,typ in path: if typ == 'D': item = t1[i] ; cle = item[2] try: dico_pos_t1[cle].append((i,j)) except KeyError: dico_pos_t1[cle] = [(i,j)] elif typ == 'I': item = t2[i] ; cle = item[2] try: dico_pos_t2[cle].append((i,j)) except KeyError: dico_pos_t2[cle] = [(i,j)] # remplacement des indels par des moves i = len_t1-1 ; j = len_t2-1 ; path = [] while i != -1 or j != -1: cell = table[i,j] ; typ = cell[2] #logging.debug(((i,j),cell)) if typ == 'D': item = t1[i] ; cle = item[2] if dico_pos_t2.has_key(cle): pos_t2 = dico_pos_t2[cle].pop() if len(dico_pos_t2[cle]) == 0: del dico_pos_t2[cle] cell_t2 = table[pos_t2] table[pos_t2] = cell_t2[0],cell_t2[1],'M' table[i,j] = cell[0],cell[1],'M' elif typ == 'I': item = t2[j] ; cle = item[2] if dico_pos_t1.has_key(cle): pos_t1 = dico_pos_t1[cle].pop() if len(dico_pos_t1[cle]) == 0: del dico_pos_t1[cle] cell_t1 = table[pos_t1] table[pos_t1] = cell_t1[0],cell_t1[1],'M' table[i,j] = cell[0],cell[1],'M' cell = table[i,j] path.append(((i,j),cell[0],cell[1],cell[2])) i = cell[1][0] ; j = cell[1][1] path.reverse() # transformation dans le format requis pour l'algo recursif identique à __makeLRes() LRes1 = [] ; LRes2 = [] ; accuDep1 = [] ; accuDep2 = [] for (i,j),ed,(i_pere,j_pere),typ in path: item1 = t1[i] ; item2 = t2[j] if typ == 'BC': item1 = t1[i] LRes1.append(([item1[3],item1[3]+item1[0]],accuDep1)) item2 = t2[j] LRes2.append(([item2[3],item2[3]+item2[0]],accuDep2)) accuDep1 = [] ; accuDep2 = [] elif typ == 'M': if i > i_pere: # deletion transformée en M item1 = t1[i] accuDep1.append([item1[3],item1[3]+item1[0]]) elif j > j_pere: # ins transformée en M item2 = t2[j] accuDep2.append([item2[3],item2[3]+item2[0]]) if len(accuDep1)>0: LRes1.append((None,accuDep1)) if len(accuDep2)>0: LRes2.append((None,accuDep2)) #logging.debug(LRes1) return LRes1, LRes2 def compute_ed_array(self, texte, len_t1): logging.debug('compute_ed_array') t1 = texte[:len_t1] ; t2 = texte[len_t1:] ; len_t2 = len(t2) logging.debug('nb ligne = %d, nb_col = %d',len_t1,len_t2) # initialisation de la table de programmation dynamique scoreTable = Numeric.zeros((len_t1+1,len_t2+1),Numeric.Int16) pathTable = Numeric.zeros((len_t1+1,len_t2+1),Numeric.Int8) #scoreTable = numarray.zeros((len_t1+1,len_t2+1),numarray.Int16) #pathTable = numarray.zeros((len_t1+1,len_t2+1),numarray.Int8) #scoreTable = numpy.zeros((len_t1+1,len_t2+1),numpy.int16) #pathTable = numpy.zeros((len_t1+1,len_t2+1),numpy.int8) #print table.shape for i in xrange(len_t1+1): scoreTable[i,0] = i pathTable[i,0] = 1 for j in xrange(len_t2+1): scoreTable[0,j] = j ; pathTable[0,j] = 2 scoreTable[0,0] = 0 ; pathTable[0,0] = 0 for i in xrange(1,len_t1+1): # calcul de toutes les cellules if i%100==0: logging.debug('debut ligne %d',i) for j in xrange(1,len_t2+1): deletion = scoreTable[i-1,j] insertion = scoreTable[i,j-1] diagonal = scoreTable[i-1,j-1] if (t1[i-1][2] == t2[j-1][2]): scoreTable[i,j], pathTable[i,j] = min((diagonal,0),(deletion+1,1),(insertion+1,2)) else: scoreTable[i,j], pathTable[i,j] = min((deletion+1,1),(insertion+1,2)) logging.debug("ed = "+str(scoreTable[i,j])) # check_move # extraction du chemin optimal i = len_t1 ; j = len_t2 ; path = [] while i != 0 and j != 0: score = scoreTable[i,j] ; direction = pathTable[i,j] if direction == 0: prev_i = i-1 ; prev_j = j-1 ; typeDir = 'BC' elif direction == 1: prev_i = i-1 ; prev_j = j ; typeDir = 'D' else: prev_i = i ; prev_j = j-1 ; typeDir = 'I' #ins path.append(((i,j), score, (prev_i,prev_j), typeDir)) i = prev_i ; j = prev_j path.reverse() # recherche du nb d'insertions et deletions pour chaque caractère pour les 2 textes dico_pos_t1 = {}; dico_pos_t2 = {} for (i,j),ed,pere,typ in path: if typ == 'D': if i == 0: continue assert 1 <= i <= len(t1), str(i)+'/'+str(len(t1)) item = t1[i-1] ; cle = item[2] try: dico_pos_t1[cle].append((i,j)) except KeyError: dico_pos_t1[cle] = [(i,j)] elif typ == 'I': if j == 0: continue assert 1 <= j <= len(t2), str(j)+'/'+str(len(t2)) item = t2[j-1] ; cle = item[2] try: dico_pos_t2[cle].append((i,j)) except KeyError: dico_pos_t2[cle] = [(i,j)] # remplacement des indels par des moves i = len_t1 ; j = len_t2 ; path = [] while i != 0 or j != 0: #print i,j score = scoreTable[i,j] direction = pathTable[i,j] #logging.debug(((i,j),cell)) if direction == 1: #deletion item = t1[i-1] ; cle = item[2] if dico_pos_t2.has_key(cle): pos_t2 = dico_pos_t2[cle].pop() if len(dico_pos_t2[cle]) == 0: del dico_pos_t2[cle] #cell_t2 = table[pos_t2] #table[pos_t2] = cell_t2[0],cell_t2[1],'M' pathTable[pos_t2] = 4 # ins transformé en move #table[i,j] = cell[0],cell[1],'M' pathTable[i,j] = 3 # del transformé en mov elif direction == 2: item = t2[j-1] ; cle = item[2] if dico_pos_t1.has_key(cle): pos_t1 = dico_pos_t1[cle].pop() if len(dico_pos_t1[cle]) == 0: del dico_pos_t1[cle] #cell_t1 = table[pos_t1] #table[pos_t1] = cell_t1[0],cell_t1[1],'M' pathTable[pos_t1] = 3 #table[i,j] = cell[0],cell[1],'M' pathTable[i,j] = 4 #else: assert direction == 0 , direction #cell = table[i,j] #path.append(((i,j),cell[0],cell[1],cell[2])) direction = pathTable[i,j] if direction == 0: pere = (i-1,j-1); typ='BC' elif direction == 1: pere = (i-1,j);typ='D' elif direction == 3: pere = (i-1,j);typ='M' elif direction == 2: pere = (i,j-1); typ='I' elif direction == 4: pere = (i,j-1); typ='M' path.append(((i,j),score,pere,typ)) i = pere[0] ; j = pere[1] path.reverse() # transformation dans le format requis pour l'algo recursif identique à __makeLRes() LRes1 = [] ; LRes2 = [] ; accuDep1 = [] ; accuDep2 = [] for (i,j),ed,(i_pere,j_pere),typ in path: item1 = t1[i-1] ; item2 = t2[j-1] if typ == 'BC': item1 = t1[i-1] LRes1.append(([item1[3],item1[3]+item1[0]],accuDep1)) item2 = t2[j-1] LRes2.append(([item2[3],item2[3]+item2[0]],accuDep2)) accuDep1 = [] ; accuDep2 = [] elif typ == 'M': if i > i_pere: # deletion transformée en M item1 = t1[i-1] accuDep1.append([item1[3],item1[3]+item1[0]]) elif j > j_pere: # ins transformée en M item2 = t2[j-1] accuDep2.append([item2[3],item2[3]+item2[0]]) if len(accuDep1)>0: LRes1.append((None,accuDep1)) if len(accuDep2)>0: LRes2.append((None,accuDep2)) #logging.debug(LRes1) return LRes1, LRes2 def _compute_lLCS(self,t1,t2): """ Extrait des 2 textes, les 2 séquences de blocs répétés """ st = suffix_tree.GeneralisedSuffixTree([t1,t2]) blocs_texte = st.get_MEM_index_chaine3(self.long_min_pivots) del st blocs_texte = self.remUnique(blocs_texte,len(t1)) lLCS = [] for (cle,longueur),lOcc in blocs_texte.iteritems(): bisect.insort_right(lLCS, (longueur,1.0/len(lOcc),cle,lOcc)) return lLCS class AlignShapiraRecurGap(AlignShapiraRecur): def __getGapMaxEtPos(self,arrayItem): """recherche le point précédent maximum et sa position et le plus proche du point""" pos = 0 maximum = -1 posMax = -1 if len(arrayItem)<=0 : return -1,-1 for elem in arrayItem: if elem >= maximum : #>= car on va chercher l'élément le plus proche posMax = pos maximum = elem pos+=1 #on doit retourner la postiotion relative a l'élément derriere l'arrayItem : [len|len-1| ... |2|1]X return self.wOuverture+maximum*self.wExtension, len(arrayItem)-posMax def compute_ed_array(self, texte, len_t1): self.wOuverture = 5 self.wExtension = 2 logging.debug('compute_ed_array') t1 = texte[:len_t1] ; t2 = texte[len_t1:] ; len_t2 = len(t2) logging.debug('nb ligne = %d, nb_col = %d',len_t1,len_t2) # initialisation de la table de programmation dynamique scoreTable = Numeric.zeros((len_t1+1,len_t2+1))#,Numeric.Int) pathTable = Numeric.zeros((len_t1+1,len_t2+1))#,Numeric.Int) pathTableVal = Numeric.zeros((len_t1+1,len_t2+1))#,Numeric.Int) #print table.shape for i in xrange(len_t1+1): scoreTable[i,0] = i pathTable[i,0] = 1 pathTableVal[i,0] = i for j in xrange(len_t2+1): scoreTable[0,j] = j ; pathTable[0,j] = 2; pathTableVal[0,j] = j scoreTable[0,0] = 0 ; pathTable[0,0] = 0 ; pathTableVal[0,0] = 0 for i in xrange(1,len_t1+1): # calcul de toutes les cellules logging.debug('ligne %d',i) for j in xrange(1,len_t2+1): (deletion,posDeletion) = self.__getGapMaxEtPos(scoreTable[i-1,:j]) (insertion,posInsertion) = self.__getGapMaxEtPos(scoreTable[:i,j-1]) diagonal = scoreTable[i-1,j-1] scoreTable[i,j],pathTable[i,j],pathTableVal[i,j] = max((deletion,1,posDeletion),(insertion,2,posInsertion),(diagonal,0,0)) # if pathTable[i,j] != 0 and pathTableVal[i,j]==0: # print scoreTable[i,j],pathTable[i,j], pathTableVal[i,j],">>>",(deletion,1,posDeletion),(insertion,2,posInsertion),(diagonal,0,0) # for i in xrange(1,len_t1+1): # calcul de toutes les cellules # logging.debug('ligne %d',i) # for j in xrange(1,len_t2+1): # deletion = scoreTable[i-1,j] # insertion = scoreTable[i,j-1] # diagonal = scoreTable[i-1,j-1] # if (t1[i-1][2] == t2[j-1][2]): # scoreTable[i,j], pathTable[i,j] = min((diagonal,0),(deletion+1,1),(insertion+1,2)) # else: # scoreTable[i,j], pathTable[i,j] = min((deletion+1,1),(insertion+1,2)) logging.debug("ed = "+str(scoreTable[i,j])) # check_move # extraction du chemin optimal i = len_t1 ; j = len_t2 ; path = [] while i != 0 and j != 0: score = scoreTable[i,j] ; direction = pathTable[i,j] #print ">>", i,j,abs(pathTableVal[i,j]),score,direction if direction == 0: prev_i = i-1 ; prev_j = j-1 ; typeDir = 'BC' elif direction == 1: prev_i = i-pathTableVal[i,j] ; prev_j = j ; typeDir = 'D' else: prev_i = i ; prev_j = j-pathTableVal[i,j] ; typeDir = 'I' #ins path.append(((i,j), score, (prev_i,prev_j), typeDir)) i = prev_i ; j = prev_j path.reverse() # recherche du nb d'insertions et deletions pour chaque caractère pour les 2 textes dico_pos_t1 = {}; dico_pos_t2 = {} for (i,j),ed,pere,typ in path: if typ == 'D': item = t1[i-1] ; cle = item[2] try: dico_pos_t1[cle].append((i,j)) except KeyError: dico_pos_t1[cle] = [(i,j)] elif typ == 'I': assert 0 <=i <= len(t2), str(i)+'/'+str(len(t2)) item = t2[i-1] ; cle = item[2] try: dico_pos_t2[cle].append((i,j)) except KeyError: dico_pos_t2[cle] = [(i,j)] # remplacement des indels par des moves i = len_t1 ; j = len_t2 ; path = [] while i != 0 or j != 0: #print ">>",i,j score = scoreTable[i,j] direction = pathTable[i,j] #logging.debug(((i,j),cell)) if direction == 1: #deletion item = t1[i-1] ; cle = item[2] if dico_pos_t2.has_key(cle): pos_t2 = dico_pos_t2[cle].pop() if len(dico_pos_t2[cle]) == 0: del dico_pos_t2[cle] #cell_t2 = table[pos_t2] #table[pos_t2] = cell_t2[0],cell_t2[1],'M' pathTable[pos_t2] = [4,pathTableVal[pos_t2]] # ins transformé en move #table[i,j] = cell[0],cell[1],'M' pathTable[i,j] = [3,pathTableVal[i,j]] # del transforme en mov elif direction == 2: item = t2[j-1] ; cle = item[2] if dico_pos_t1.has_key(cle): pos_t1 = dico_pos_t1[cle].pop() if len(dico_pos_t1[cle]) == 0: del dico_pos_t1[cle] #cell_t1 = table[pos_t1] #table[pos_t1] = cell_t1[0],cell_t1[1],'M' pathTable[pos_t1] = [3,abs(pathTableVal[pos_t1])] #table[i,j] = cell[0],cell[1],'M' pathTable[i,j] = [4,abs(pathTableVal[i,j])] #else: assert direction == 0 , direction #cell = table[i,j] #path.append(((i,j),cell[0],cell[1],cell[2])) direction = pathTable[i,j] if direction == 0: pere = (i-1,j-1); typ='BC' elif direction == 1: pere = (i-pathTableVal[i,j],j);typ='D' elif direction == 3: pere = (i-pathTableVal[i,j],j);typ='M' elif direction == 2: pere = (i,j-pathTableVal[i,j]); typ='I' elif direction == 4: pere = (i,j-pathTableVal[i,j]); typ='M' path.append(((i,j),score,pere,typ)) i = pere[0] ; j = pere[1] path.reverse() # transformation dans le format requis pour l'algo recursif identique à __makeLRes() LRes1 = [] ; LRes2 = [] ; accuDep1 = [] ; accuDep2 = [] for (i,j),ed,(i_pere,j_pere),typ in path: item1 = t1[i-1] ; item2 = t2[j-1] if typ == 'BC': item1 = t1[i-1] LRes1.append(([item1[3],item1[3]+item1[0]],accuDep1)) item2 = t2[j-1] LRes2.append(([item2[3],item2[3]+item2[0]],accuDep2)) accuDep1 = [] ; accuDep2 = [] elif typ == 'M': if i > i_pere: # deletion transformée en M item1 = t1[i-1] accuDep1.append([item1[3],item1[3]+item1[0]]) elif j > j_pere: # ins transformée en M item2 = t2[j-1] accuDep2.append([item2[3],item2[3]+item2[0]]) if len(accuDep1)>0: LRes1.append((None,accuDep1)) if len(accuDep2)>0: LRes2.append((None,accuDep2)) #logging.debug(LRes1) return LRes1, LRes2 class AlignAstarRecurBBL(AlignAstarRecur): def run(self, t1, t2): """ pre: isinstance(t1,str) and isinstance(t2,str) """ # application de psyco ? self.preTraitDiffSym = psyco.proxy(self.preTraitDiffSym) # ajoute-t-on les déplacements récursifs ? # par défaut non car on perd l'assertion de non-chevauchement des déplacements self.addSubDep = False # on enléve les $ car le suffix_tree ne les supporte pas sepTable = string.maketrans("$",".") if '$' in t1: t1 = t1.translate(sepTable) if '$' in t2: t2 = t2.translate(sepTable) bbl = self.deplacements_pond2(t1,t2) return bbl def deplacements_pond2(self, t1, t2): """ pre: isinstance(t1,str) and isinstance(t2,str) """ #print "T1:"+t1 ; print "T2:"+t2 if len(t1)==0 or len(t2)==0: return [] #if len(t1)==0 and len(t2)==0: return [] #elif len(t1) > 0 and len(t2) == 0: return [(('S',0,len(t1),[]),None)] #elif len(t1) == 0 and len(t2) > 0: return [(None,('I',0,len(t2),[]))] logging.log(5,"debut dep_pond") # recherche des 2 séquences d'homologies s1,s2 = self._texteToSeqHomo(t1,t2) if __debug__: self.ass2__(s1,0,len(t1),t1) self.ass2__(s2,len(t1),len(t1)+len(t2),t1+t2) #print "S1:"+self.lint2str(s1,t1) ; print "S2:"+self.lint2str(s2,t1+t2) if len(s1)==0 or len(s2)==0: return [] #if len(s1)==0 and len(s2)==0: return [] #elif len(s1) > 0 and len(s2) == 0: return [(('S',0,len(t1),[]),None)] #elif len(s1) == 0 and len(s2) > 0: return [(None,('I',0,len(t2),[]))] # identification des BC et D LResT1,LResT2 = self._appelAstar(s1,s2,t1,t2,len(t1)) assert -1 <= len(LResT1) - len(LResT2) <= 1 debutT1 = posT1 = 0 ; debutT2 = posT2 = len(t1) ; bbl = [] #print "LResT1:"+str(LResT1); print "LResT2:"+str(LResT2) for (BC1,lDep1),(BC2,lDep2) in zip(LResT1,LResT2): #print '(BC1,lDep1),(BC2,lDep2):'+str(((BC1,lDep1),(BC2,lDep2))) if BC1 is not None: finT1 = BC1[0] # bloc normal else: finT1 = len(t1) # dernier bloc de la liste if BC2 is not None: finT2 = BC2[0] else: finT2 = len(t1)+len(t2) # comparaison récursive #print debutT1,finT1,debutT2,finT2 #print "T1:"+t1 ; print "T2:"+t2 texteRec1 = t1[debutT1:finT1] ; texteRec2 = t2[debutT2-len(t1):finT2-len(t1)] bblRec = self.deplacements_pond2(texteRec1,texteRec2) #print 'bblRec:'+str(bblRec) if __debug__: for B1Rec,B2Rec in bblRec: # test validité corrdonnees résultat assert B1Rec is None or 0 <= B1Rec[1] < B1Rec[2] <= len(texteRec1), B1Rec assert B2Rec is None or len(texteRec1) <= B2Rec[1] < B2Rec[2] <= len(texteRec1)+len(texteRec2) assert posT1 == debutT1 and posT2 == debutT2 for i in xrange(len(bblRec)-1,-1,-1): # parcous bbl récursive # MAJ des coordonnées par rapport à la bbl principale B1,B2 = bblRec[i] if B1 is None: NB1 = None else: NB1 = (B1[0], B1[1] + posT1, B1[2] + posT1, [(d1+posT1,d2+posT1) for (d1,d2) in B1[3]]) if B2 is None: NB2 = None else: NB2 = (B2[0], B2[1] + posT2-len(texteRec1), B2[2] + posT2-len(texteRec1), [(d1+posT2-len(texteRec1),d2+posT2-len(texteRec1)) for (d1,d2) in B2[3]]) if __debug__: if NB1 is not None: assert posT1 <= NB1[1] < NB1[2] <= finT1 for d,f in NB1[3]: assert NB1[1] <= d < f <= NB1[2] , NB1 if NB2 is not None: assert posT2 <= NB2[1] < NB2[2] <= finT2 , str(posT2)+' <= '+str(NB2)+' <= '+str(finT2) for d,f in NB2[3]: NB2[1] <= d < f <= NB2[2] bblRec[i] = (NB1,NB2) if __debug__: assert posT1 == debutT1 and posT2 == debutT2 for B1,B2 in bblRec: # test coordonnées par rapport à t1 et t2 assert B1 is None or posT1 <= B1[1] < B1[2] <= finT1 assert B2 is None or posT2 <= B2[1] < B2[2] <= finT2, str(posT2)+' <= '+str(B2)+' <= '+str(finT2) # ajout de la bbl récursive dans la nouvelle bbl nbbl = [] #; debB1 = debutT1 ; debB2 = debutT2 for B1,B2 in bblRec: if B1 is not None and (B1[0] == 'S'): assert posT1 == B1[1] and B1[1] < B1[2] nbbl.append((B1,B2)) posT1 = B1[2] elif B2 is not None and (B2[0] == 'I'): assert posT2 == B2[1] and B2[1] < B2[2] nbbl.append((B1,B2)) posT2 = B2[2] else: assert B1 is not None and B1[0] == 'BC' and B2 is not None and B2[0] == 'BC' assert B1[1] < B1[2] and B2[1] < B2[2] nbbl.append((B1,B2)) posT1 = B1[2] ; posT2 = B2[2] else: # fin du parcours ou bblRec vide if posT1 < finT1: nbbl.append((('S',posT1,finT1,[]),None)) posT1 = finT1 if posT2 < finT2: nbbl.append((None,('I',posT2,finT2,[]))) posT2 = finT2 #debutT1 = debB1 ; debutT2 = debB2 assert nbbl > 0 if __debug__ and len(nbbl) > 1: # test ordre lastBB1,lastBB2 = nbbl[-1] ; lastLastBB1,lastLastBB2 = nbbl[-2] if lastBB1 is not None and lastLastBB1 is not None: assert debutT1 <= lastLastBB1[2] <= lastBB1[1] <= finT1 , lastLastBB1 + lastBB1 if lastBB2 is not None and lastLastBB2 is not None: assert debutT2 <= lastLastBB2[2] <= lastBB2[1] <= finT2 test1 = test2 = True for B1,B2 in nbbl: # test premier de la liste if test1 and B1 is not None: assert B1[1] == debutT1 ; test1 = False if test2 and B2 is not None: assert B2[1] == debutT2 ; test2 = False if not test1 and not test2: break test1 = test2 = True nbbl2 = nbbl[:] ; nbbl2.reverse() for B1,B2 in nbbl2: #test dernier de la liste if test1 and B1 is not None: assert B1[2] == finT1 ; test1 = False if test2 and B2 is not None: assert B2[2] == finT2 ; test2 = False if not test1 and not test2: break i = j = k = 0 while (k < len(nbbl)): B1,B2 = nbbl[k] if B1 is not None: while (i < len(lDep1)): D1 = lDep1[i] if B1[1] <= D1[0] < D1[1] <= B1[2]: #B1[3].append(D1) bisect.insort_right(B1[3], (D1[0],D1[1])) i += 1 else: assert B1[2] < D1[1] i += 1 break if B2 is not None: while (j < len(lDep2)): D2 = lDep2[j] if B2[1] <= D2[0] < D2[1] <= B2[2]: #B2[3].append(D2) bisect.insort_right(B2[3], (D2[0],D2[1])) j += 1 else: assert B2[2] < D2[1] j += 1 break k += 1 if __debug__: for B1,B2 in nbbl: if B1 is not None: assert debutT1 <= B1[1] < B1[2] <= finT1 , str(debutT1)+' <= '+str(B1)+' <= '+str(finT1) for d,f in B1[3]: assert B1[1] <= d < f <= B1[2] , B1 if B2 is not None: assert debutT2 <= B2[1] < B2[2] <= finT2 , str(debutT2)+' <= '+str(B2)+' <= '+str(finT2) for d,f in B2[3]: B2[1] <= d < f <= B2[2] bbl.extend(nbbl) # ajout de la nouvelle bbl dans la bbl générale if BC1 is not None: # ajout du BC terminal de l'itération assert BC2 is not None bbl.append((('BC',BC1[0],BC1[1],[]),('BC',BC2[0],BC2[1],[]))) posT1 = BC1[1] ; posT2 = BC2[1] debutT1 = posT1 ; debutT2 = posT2 assert posT1 == debutT1 and posT2 == debutT2 if len(LResT1) > len(LResT2): assert(len(LResT1)==len(LResT2)+1 and LResT1[-1][0] is None) if posT1 < len(t1): bbl.append((('S',posT1,len(t1),[(d,f) for d,f in LResT1[-1][1]]),None)) if posT2 < len(t1)+len(t2): bbl.append((None,('I',posT2,len(t1)+len(t2),[]))) # ?? elif len(LResT2) > len(LResT1): assert(len(LResT2)==len(LResT1)+1 and LResT2[-1][0] is None) if posT1 < len(t1): bbl.append((('S',posT1,len(t1),[]),None)) # ?? if posT2 < len(t1)+len(t2): bbl.append((None,('I',posT2,len(t1)+len(t2),[(d,f) for d,f in LResT2[-1][1]]))) else: assert len(LResT1) == len(LResT2) if posT1 < len(t1): bbl.append((('S',posT1,len(t1),[]),None)) if posT2 < len(t1)+len(t2): bbl.append((None,('I',posT2,len(t1)+len(t2),[]))) if __debug__: #print t1 ; print t2 ; print bbl self._testOrdre(bbl,len(t1)) test1 = test2 = True for B1,B2 in bbl: if test1 and B1 is not None: assert B1[1] == 0 ; test1 = False if test2 and B2 is not None: assert B2[1] == len(t1) ; test2 = False if not test1 and not test2: break test1 = test2 = True bbl2 = bbl[:] ; bbl2.reverse() for B1,B2 in bbl2: if test1 and B1 is not None: assert B1[2] == len(t1),str(B1)+'/'+str(len(t1)) ; test1 = False if test2 and B2 is not None: assert B2[2] == len(t1)+len(t2) ; test2 = False if not test1 and not test2: break logging.log(5,"fin dep_pond") return bbl def _testOrdre(self,liste,len_t1): """ Teste le bon ordre des blocs dans la liste """ posT1 = 0 # position courante dans le texte source posT2 = len_t1 # position courante dans le texte cible prevB1 = ('',0,0,[]) ; prevB2 = ('',posT2,posT2,[]) for (B1,B2) in liste: if B1 is not None: assert B1[0] == 'BC' or B1[0] == 'D' or B1[0] == 'S' # la position précédente est <= au début du bloc courant assert posT1 == B1[1],str(posT1)+' '+str(prevB1)+' '+str(B1) prevDep = posT1 for dep in B1[3]:# les déplacements sont bien inclus dans le bloc courant assert B1[1] <= dep[0] <= dep[1] <= B1[2] assert prevDep <= dep[0], B1[3] prevDep = dep[0] posT1 = B1[2] prevB1 = B1 if B2 is not None: assert B2[0] == 'BC' or B2[0] == 'D' or B2[0] == 'I' assert posT2 == B2[1], str(posT2)+' '+str(prevB2)+' '+str(B2) prevDep = posT2 for dep in B2[3]: assert B2[1] <= dep[0] <= dep[1] <= B2[2] assert prevDep <= dep[0] prevDep = dep[0] posT2 = B2[2] prevB2 = B2 import contract contract.checkmod(__name__) def trace1(commande,dic_locals): import profile,pstats profile.runctx(commande,globals(), dic_locals,'c:\workspace\medite\statFile') s = pstats.Stats('c:\workspace\medite\statFile') s.sort_stats('time') s.print_stats() sys.stderr.flush() sys.stdout.flush() def trace2(commande,dic_locals): import hotshot,hotshot.stats prof = hotshot.Profile("c:\workspace\medite\statFile") benchtime = prof.runctx(commande,globals(), dic_locals) prof.close() stats = hotshot.stats.load("c:\workspace\medite\statFile") stats.strip_dirs() stats.sort_stats('cumulative', 'time', 'calls') stats.print_stats() def _test(): import doctest, alignement return doctest.testmod(alignement) if __name__ == '__main__': _test()