# -*- coding: iso-8859-1 -*- # Copyright 20003 - 2008: Julien Bourdaillet (julien.bourdaillet@lip6.fr), Jean-Gabriel Ganascia (jean-gabriel.ganascia@lip6.fr) # This file is part of MEDITE. # # MEDITE is free software; you can redistribute it and/or modify # it under the terms of the GNU General Public License as published by # the Free Software Foundation; either version 2 of the License, or # (at your option) any later version. # # MEDITE is distributed in the hope that it will be useful, # but WITHOUT ANY WARRANTY; without even the implied warranty of # MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the # GNU General Public License for more details. # # You should have received a copy of the GNU General Public License # along with Foobar; if not, write to the Free Software # Foundation, Inc., 51 Franklin St, Fifth Floor, Boston, MA 02110-1301 USA #import fengHMM from HMM import * #import sys import string from blocTexte import * import logging from MediteAppli import transDiacritic #import de la fonction de conversion #import hotshot,hotshot.stats,sys class algoFeng: """Classe effectuant la comparaison des textes en utilisant la methode définie par S. Feng et R. Manmatha """ def __init__(self, txt1, txt2, ignorerSeparateurs=False, ignorerCase=True, ignorerDiacritiques=False, p2=2, calculDep=True , lenMinBlocDep=4, debugLevel = 4): """Constructeur de l'objet @param txt1: la premiere version du texte a comparer @type txt1: String @param txt1: la premiere version du texte a comparer @type txt1: String @param ignorerSeparateurs: Optionnel,Définit si l'on souhaite ou non ignorer les separateurs (ponctuations etc..), Faux par defaut @type ignorerSeparateurs: Bool @param ignorerCase: ignore la casse @type ignorerCase: bool @param ignorerDiacritiques: sensibilité aux accents @type ignorerDiacritiques: bool @param p2: paramètre ratio des remplacements pour insertions/suppressions @type p2: integer @param calculDep: Optionnel, defininit si l'on souhaite ou non calculer les deplacements @type calculDep: Bool @param lenMinBlocDep: longueue minimale bloc déplacé @type lenMinBlocDep: integer @param debugLevel: Optionnel,definit le niveau de log, les valeurs suivantes sont utilisées : logging.INFO = tres peu verbeux (affichage du debut et fin de l'algo seulement) logging.DEBUG = peu verbeux 4 verbeux (affichage de certaines variables, affichage des textes, affichage de la progression générale du hmm) 3 tres verbeux (affichage dans les boucles / methodes) 2 tres tres verbeux (affichage des tableaux etc...), tres peu lisible 1 maxi debug (affichage de tous les contenus des appels du fichier HMM.py) /!\ les dictionaires de 150 termes c'est pas joli a voir, ne pas utiliser !! @type debugLevel: Integer """ self.texte1 = txt1 self.texte2 = txt2 self.ignorerSeparateurs = ignorerSeparateurs self.ignorerCase = ignorerCase self.ignorerDiacritiques = ignorerDiacritiques self.calculDep = calculDep self.lenMinBlocDep = lenMinBlocDep self.p2 = p2 #config des logs #level logging.INFO = tres peu verbeux (affichage du debut et fin de l'algo seulement) #level logging.DEBUG = peu verbeux #level 4 verbeux (affichage de certaines variables, affichage des textes, affichage de la progression générale du hmm) #level 3 tres verbeux (affichage dans les boucles / methodes) #level 2 tres tres verbeux (affichage des tableaux etc...), tres peu lisible #level 1 maxi debug (affichage de tous les contenus des appels du fichier HMM.py) /!\ les dictionaires de 150 termes c'est pas joli a voir, ne pas utiliser !! logging.basicConfig(level=debugLevel,datefmt='%a, %d %b %Y %H:%M:%S',format='%(asctime)s %(levelname)s %(message)s')#,filename='hmm.log',filemode='w') logging.info("algoFeng initialisé, separateurs ignorés : %s, loglevel : %s", ignorerSeparateurs,debugLevel) if __debug__ : logging.log(4,"texte1 :\n%s",self.texte1) logging.log(4,"texte2 :\n%s",self.texte2) if self.ignorerDiacritiques: #self.sepTable = string.maketrans('çéèàùâêîôûäëïöüÿÇÉÈÀÙÂÊÎÔÛÄËÏÖÜ','ceeauaeiouaeiouyCEEAUAEIOUAEIOU') #self.texte1 = self.texte1.translate(self.sepTable) #self.texte2 = self.texte2.translate(self.sepTable) # fait la conversion pour les chaine unicode # self.texte1 = unicodedata.normalize('NFKD', unicode(self.texte1,'ascii')).encode('ASCII', 'ignore') # self.texte2 = unicodedata.normalize('NFKD', unicode(self.texte2,'ascii')).encode('ASCII', 'ignore') self.texte1 = transDiacritic(self.texte1) self.texte2 = transDiacritic(self.texte2) if self.ignorerCase: # si comparaison insensible à la casse, on convertit les 2 chaines en minuscule self.texte1 = self.texte1.lower() self.texte2 = self.texte2.lower() # self.prof = hotshot.Profile("./statFile") def aligne(self): """ Effectue la comparaison des textes, fonction principale Renvoie un tuplet ((blocsAlignes1,blocsAlignes2),(blocsSupprimés Texte1,blocs Ajouté Texte 2), (dep1,dep2),(corespondances entre Dep),(remp1,remp2)) @return: blocs sur lesquels on aligne pour le texte1 et texte2, blocs supprimés, blocs ajoutés, blocs deplacés du texte1, blocs deplacés du texte2 @type: ([(posAligneTxt1Debut,posAligneTxt1Fin),...],[(posAligneTxt2Debut,posAligneTxt2Fin),...]),([(posSupprTxt1Debut,posSupprTxt1Fin),...],[(posAjoutTxt2Debut,posAjoutTxt2Fin),...]),([(posDepTxt1Debut,posDepTxt1Fin),...],[(posDepTxt2Debut,posDepTxt2Fin),...]),([(paires de blocs déplacés)]), ([(posRempTxt1Debut,posRempTxt1Fin),...],[(posRempTxt2Debut,posRempTxt2Fin),...]) avec pos des Integers """ #PHASE 1 logging.info("debut algoFeng.aligne") #split des textes 1 et 2 => liste de (hash, poslistemots, posdebut,posfin+1) => texte[posdebut:posfin] donne le mot et posfin - posdebut la taille # crée des listes de blocs listeMots1 = self.__creerListeMots(self.texte1) listeMots2 = self.__creerListeMots(self.texte2) logging.debug("listeMots crées") logging.log(2,"listeMots1 : %s",listeMots1) logging.log(2,"listeMots2 : %s", listeMots2) #création des dictionaires: pour chaque mot, ses positions et ses occurrences dicoTexte1 = self.__creerDico(listeMots1) dicoTexte2 = self.__creerDico(listeMots2) logging.debug("dicoTexte crées") logging.log(2,"dicoTexte1 : %s",dicoTexte1) logging.log(2,"dicoTexte2 : %s", dicoTexte2) #pour tous les mots de dicoTexte1 occurent une fois dans le texte on crée la liste A # ancienne méthode avec des voisins identiques, et occurrences pouvant plusieurs # fois dans le texte 2, tel que dans le papier de Feng # donnait des listes avec moins d'apax car les contextes des hapax devait être identiques également # et non seulement les apax # A,B = self.__creerListesFeng(dicoTexte1,dicoTexte2,listeMots1,listeMots2) ################################################################################################# #PHASE 1.(a) et 1.(b) #On crée les listes des mots présents une seul fois dans le texte et commun aux deux textes # cad les listes d'apax ################################################################################################# A,B = self.__creerListes(dicoTexte1,dicoTexte2) logging.debug("listes A et B crées") logging.debug("(nbBlocsListe / nbBlocsTexte) : texte1 -> %d / %d , texte2 -> %d / %d",len(A),len(listeMots1),len(B),len(listeMots2)) logging.log(2,"liste A : %s",A) logging.log(2,"liste B : %s",B) if __debug__ : setA = set(x.val for x in A) setB = set(x.val for x in B) assert setA.symmetric_difference(setB) == set([]) #il n'y a pas d'element qui ne soit pas commun aux deux liste ################################################################################################# #PHASE1.(c) #On aligne les mots présents une seul fois dans le texte et commun aux deux textes #C'est sur le resultat de cet alignement que l'on va déterminer les bornes pour la phase 2 ################################################################################################# hmmPhase1 = algoHMM(A, B) logging.debug("Debut calcul hmm phase1 (mot ancres)") # motAncres = apax alignés conformément au papier de Feng motAncres = hmmPhase1.calculeAlignement() logging.debug("Fin calcul hmm phase1") blocsAjoutes1 = [] # cad blocs supprimés dans le texte1 blocsAjoutes2 = [] # cad blocs insérés dans le texte2 blocsAlignes1 = [] # blocs invariants texte 1 blocsAlignes2 = [] # blocs invariants texte 2 logging.debug("Debut postprocessing Phase1") # filtre et garde uniquement les paires effectivement identiques (blocsAlignes1,blocsAlignes2) = self.__filtrageApareillementsIdentiques(motAncres,listeMots1,listeMots2) logging.log(2,"motAncres : %s",motAncres) if __debug__ : logging.log(3,"Liste des mots ancrés (mot texte1 | mot texte2):") for blocs in motAncres : #on a des mots identiques assert listeMots1[blocs[0]].val == listeMots2[blocs[1]].val logging.log(3,"(%s | %s)",self.texte1[listeMots1[blocs[0]].debut:listeMots1[blocs[0]].fin], self.texte2[listeMots2[blocs[1]].debut:listeMots2[blocs[1]].fin]) assert len(blocsAlignes1) == len(blocsAlignes2) for i in xrange(len(blocsAlignes1)): #les indices des blocsAlignes sont corrects assert hash(self.texte1[blocsAlignes1[i][0]:blocsAlignes1[i][1]]) == hash(self.texte2[blocsAlignes2[i][0]:blocsAlignes2[i][1]]) logging.debug("Fin postrpoc Phase1") logging.log(2,"blocsAlignes1 : %s",blocsAlignes1) logging.log(2,"blocsAlignes2 : %s",blocsAlignes2) ################################################################################################## #PHASE 2 #On lance le hmm entre les bornes definies par les mots ancrés # et récursivement, la phase 3 et lancée à chaque phase 2 ################################################################################################# logging.debug("Debut Phase2") ancreGauche1 = -1 ancreGauche2 = -1 if __debug__ : lAncre = len(listeMots2) for i in xrange(len(motAncres)): ancreDroit1 = motAncres[i][0] ancreDroit2 = motAncres[i][1] #on selectionne les mots compris entre les deux mots ancrés exclus (retAlignes1,retAlignes2),(retAjoutes1,retAjoutes2) = self.__lanceHMMPhase2([(ancreGauche1,ancreGauche2),(ancreDroit1,ancreDroit2)],listeMots1,listeMots2) blocsAlignes1.extend(retAlignes1) blocsAlignes2.extend(retAlignes2) blocsAjoutes1.extend(retAjoutes1) blocsAjoutes2.extend(retAjoutes2) #traitement blocAlignes ancreGauche1 = ancreDroit1 ancreGauche2 = ancreDroit2 if __debug__: logging.log(4,"Phase2 : %0.0f %% Done",float((ancreGauche2*lAncre**-1)*100)) #dernier traitement pour la partie a droite du dernier mot ancré logging.log(4,"Phase2 : traitement partie droite finale : longueur liste1 : %s, longueur liste2 : %s", len(listeMots1)-1-ancreGauche1,len(listeMots2)-1-ancreGauche2) if ancreGauche1 < len(listeMots1)-1 and ancreGauche2 < len(listeMots2)-1: (retAlignes1,retAlignes2),(retAjoutes1,retAjoutes2) = self.__lanceHMMPhase2([(ancreGauche1,ancreGauche2),(len(listeMots1),len(listeMots2))],listeMots1,listeMots2) #postprocessing repetitions ?? #traitement blocs alignés blocsAlignes1.extend(retAlignes1) blocsAlignes2.extend(retAlignes2) blocsAjoutes1.extend(retAjoutes1) blocsAjoutes2.extend(retAjoutes2) #comme on a mis les mots ancrés d'abord, les listes de blocs alignés ne sont pas triées # car on a en premier ajouté les blocs invariants qui étaient les anchors # et ensuite les invariants entre les anchors blocsAlignes1.sort() blocsAlignes2.sort() # blocsAjoutes1.sort() # blocsAjoutes2.sort() logging.debug("Fin phase2 et 3") #traitement sur les blocs alignés (regroupement de blocs consecutifs etc..) if __debug__ : #les listes de mots ajoutés sont normalement triées en retour de __lanceHMMPhase2 bAj1 = blocsAjoutes1[:] bAj1.sort() bAj2 = blocsAjoutes2[:] bAj2.sort() assert blocsAjoutes1 == bAj1 assert blocsAjoutes2 == bAj2 # recherche des blocs déplacés dep1=[] dep2=[] coresDep=[] if self.calculDep: # regroupe les blocs adjacents self.__regrouperBlocs(blocsAjoutes1, self.texte1,True) self.__regrouperBlocs(blocsAjoutes2, self.texte2,True) #recherche des blocs deplacés #(nbocc,len,posListe,posTxt) logging.debug("Debut du calcul des blocs déplacés") # # for k,v in dicoTexte1.iteritems() : # if dicoTexte2.has_key(k) : # depk1 = [] # depk2=[] # for pos1 in v[3]: # if (pos1,pos1+v[1]) in blocsAjoutes1 : # depk1.append((pos1,pos1+v[1])) # # for pos2 in dicoTexte2[k][3]: # if (pos2,pos2+v[1]) in blocsAjoutes2 : # depk2.append((pos2,pos2+v[1])) # for i in xrange(min(len(depk1),len(depk2))): # blocsAjoutes1.pop(blocsAjoutes1.index(depk1[i])) # dep1.append(depk1[i]) # blocsAjoutes2.pop(blocsAjoutes2.index(depk2[i])) # dep2.append(depk2[i]) # hash : nbOcc1,[(posD1,posF2)...],nbOcc2,[(posD2,posF2)..] # pour calculer les déplacements, on regarde tous les blocs supprimés et insérés # que l'on essaye d'apparier comme déplacements: # on a un dicoDep qui regroupe les positions et occurrences de blocs qui sont # identiques dans texte 1 et 2 dicoDep={} # on parcours les suppresisons for posDebut,posFin in blocsAjoutes1 : # on garde celles de taille minimale if posFin-posDebut >= self.lenMinBlocDep: # on les ajoute au dicoDep ou on met à jour les entrées du dico val = hash(self.texte1[posDebut:posFin]) if not dicoDep.has_key(val): dicoDep[val]=(1,[(posDebut,posFin)],0,[]) else : nbOcc1,lPos1,nbOcc2,lPos2 = dicoDep[val] lPos1.append((posDebut,posFin)) lPos1.sort() dicoDep[val] = (nbOcc1+1,lPos1,nbOcc2,lPos2) for posDebut,posFin in blocsAjoutes2 : if posFin-posDebut >= self.lenMinBlocDep: val = hash(self.texte2[posDebut:posFin]) if not dicoDep.has_key(val): dicoDep[val]=(0,[],1,[(posDebut,posFin)]) else : nbOcc1,lPos1,nbOcc2,lPos2 = dicoDep[val] lPos2.append((posDebut,posFin)) lPos2.sort() dicoDep[val] = (nbOcc1,lPos1,nbOcc2+1,lPos2) for val,(nbOcc1,lPos1,nbOcc2,lPos2) in dicoDep.iteritems(): # on ne garde que des blocs présents dans les 2 textes for i in xrange(min(nbOcc1,nbOcc2)): dep1.append(lPos1[i]) dep2.append(lPos2[i]) coresDep.append((lPos1[i],lPos2[i])) # paires de blocs déplacés coresDep.sort() # positions des deplacements dans les textes 1 et 2 dep1.sort() dep2.sort() #suppression des éléments deplacés des liste des ajoutés for ind in dep1 : blocsAjoutes1.pop(blocsAjoutes1.index(ind)) for ind in dep2 : blocsAjoutes2.pop(blocsAjoutes2.index(ind)) logging.log(2,"Blocs déplacés1 : %s",dep1) logging.log(2,"Blocs déplacés2 : %s",dep2) assert len(blocsAlignes1)==len(blocsAlignes2) remp1=[] remp2=[] aSupr1=[] aSupr2=[] logging.debug("Debut du regroupage des liste de blocs") # permet de regrouper si on a enlevé des déplacements ou si on ne # recherchait pas les déplacements self.__regrouperBlocs(blocsAjoutes1, self.texte1) self.__regrouperBlocs(blocsAjoutes2, self.texte2) logging.debug("Fin du regroupage des listes de blocs") # on parcours les textes 1 et 2 entre les blocs invariants # et on regarde leur ratio de taille pour voir si on peut en faire des remplacements for i in xrange(len(blocsAlignes1)-1): # entre 2 invariants if (blocsAlignes2[i+1][0]-blocsAlignes2[i][1]) != 0: ratio = float(blocsAlignes1[i+1][0]-blocsAlignes1[i][1])/float(blocsAlignes2[i+1][0]-blocsAlignes2[i][1]) if ratio>=float(self.p2**-1) and ratio<=self.p2 and self.texte1[blocsAlignes1[i][1]:blocsAlignes1[i+1][0]]!=" ": remp1.append((blocsAlignes1[i][1],blocsAlignes1[i+1][0])) remp2.append((blocsAlignes2[i][1],blocsAlignes2[i+1][0])) # enlève les blocs remplacés que l'on vient juste de trouver des blocs # insérés et supprimés # PB: on cherche des remplacements dans blocsAlignes sans tenir compte du fait # que certains blocs on put être considérés comme déplacés auparavant car on a # recherché les déplacements uniquement dans blocsAjoutés (et pas dans blocsAlignés) for bloc in remp1: try: blocsAjoutes1.pop(blocsAjoutes1.index(bloc)) except: #bloc déplacé on ne fait rien # a tester: le bloc devrait être dans les déplacés et ou pourrait le # supprimer de remp1 pass for bloc in remp2: try: blocsAjoutes2.pop(blocsAjoutes2.index(bloc)) except: #c'est un bloc déplacé pass remp1.sort() remp2.sort() #self.__regrouperBlocs(blocsAlignes1,self.texte1) #self.__regrouperBlocs(blocsAlignes2,self.texte2) self.__regrouperBlocs(remp1, self.texte1,False) self.__regrouperBlocs(remp2, self.texte2,False) if __debug__ : #affichage du texte aligné res = "" for deb,fin in blocsAlignes1 : res += self.texte1[deb:fin] +" " res+="\n============\n" for deb,fin in blocsAlignes2 : res += self.texte2[deb:fin] + " " logging.log(4,"Texte aligné :\n%s",res) logging.info("fin algoFeng.aligne") # self.prof.close() # stats = hotshot.stats.load("./statFile") # stats.sort_stats('time','calls') # stats.print_stats(50) return (blocsAlignes1,blocsAlignes2),(blocsAjoutes1,blocsAjoutes2),(dep1,dep2),(coresDep),(remp1,remp2) ############################### #Definition des methodes privées ############################## def __lanceHMMPhase2(self,bornesAncres,listeMots1,listeMots2): """Lance la phase 2 de l'algo. Ici on lance la comparaison HMM au niveau des MOTS @param listeMots1: la liste de blocs pour le texte1 @param listeMots2: la liste de blocs pour le texte2 @param bornesAncres: les bornes pour l'alignement, sur lequel on va couper les listeMots @type liste1,liste2,listeMots1,listeMots2 : liste de blocs ou les champs val, posDebut, posFin et index sont renseignés @type bornesAncres: [(borneGaucheListeMots1,borneGaucheListeMots2),(borneDroiteListeMots1,borneDroiteListeMots2)] @return: liste des blocs alignes, liste des blocs ajoutes @type: ([(DebutBlocTxt1,FinBlocTxt1)...],[(DebutBlocTxt2,FinBlocTxt2)...]),([(DebutBlocTxt1,FinBlocTxt1)...],[(DebutBlocTxt2,FinBlocTxt2)...]) """ #partie du texte sur laquelle on cherche a aligner liste1 = listeMots1[bornesAncres[0][0]+1:bornesAncres[1][0]] liste2 = listeMots2[bornesAncres[0][1]+1:bornesAncres[1][1]] logging.log(3,"Fonction __lanceHMMPhase2 invoquée bornes listeMots1 :[%s;%s] bornes listeMots2 :[%s;%s]",bornesAncres[0][0],bornesAncres[1][0],bornesAncres[0][1],bornesAncres[1][1]) hmmPhase2 = algoHMM(liste1,liste2) # self.prof.runctx("motAlignes = hmmPhase2.calculeAlignement()",globals(),locals()) motAlignes = hmmPhase2.calculeAlignement() #postprocessing apareillements blocsAjoutes1 = [] blocsAjoutes2 = [] blocAlignes1 = [] blocAlignes2 = [] (blocAlignes1,blocAlignes2) = self.__filtrageApareillementsIdentiques(motAlignes,listeMots1,listeMots2) if __debug__ : logging.log(3,"Mots Alignés phase2 : (texte1 | texte2)") #les mots alignés sont bien identiques for blocs in motAlignes : assert listeMots1[blocs[0]].val == listeMots2[blocs[1]].val logging.log(3,"(%s | %s)",self.texte1[listeMots1[blocs[0]].debut:listeMots1[blocs[0]].fin], self.texte2[listeMots2[blocs[1]].debut:listeMots2[blocs[1]].fin]) #on a autant de blocs alignés pour les 2 textes assert len(blocAlignes1) == len(blocAlignes2) for i in xrange(len(blocAlignes1)): #les indices des blocsAlignes sont corrects assert hash(self.texte1[blocAlignes1[i][0]:blocAlignes1[i][1]]) == hash(self.texte2[blocAlignes2[i][0]:blocAlignes2[i][1]]) logging.log(2,"blocsAlignés1 : %s",blocAlignes1) logging.log(2,"blocsAlignés2 : %s",blocAlignes2) ########################################################################################## #PHASE3 #Alignement au niveau des caracteres entre chaque mot aligné dans la phase 2 ########################################################################################## logging.log(3,"Debut phase 3") (retAlignes1,retAlignes2),(retAjoutes1,retAjoutes2) = self.__alignePhase3(motAlignes, bornesAncres,listeMots1,listeMots2) blocAlignes1.extend(retAlignes1) blocAlignes2.extend(retAlignes2) blocsAjoutes1.extend(retAjoutes1) blocsAjoutes2.extend(retAjoutes2) #bien que blocAlignes et retAlignes soit triés, leur concaténation ne l'est pas necessairement puisque la phase 3 aligne entre les mots de la phase2 blocAlignes1.sort() blocAlignes2.sort() blocsAjoutes1.sort() blocsAjoutes2.sort() logging.log(3,"Fin phase3") logging.log(3,"Fin __lanceHMMPhase2") #print "fin __lanceHMMPhase2 : ",blocAlignes1,blocAlignes2 return (blocAlignes1,blocAlignes2),(blocsAjoutes1,blocsAjoutes2) def __alignePhase3(self,motsAlignes, bornesAncres,listeMots1, listeMots2): """Lance la Phase 3 de l'algo Alignement au niveau des caracteres @param motsAlignes: la liste des mots alignés en phase 2 @param bornesAncres: les bornes entre lesquelles on travaille @param listeMots1: la liste des blocs du texte 1 @param listeMots2: la liste des blocs du texte 2 @type motsAlignes: [(posListeMots1,posListeMots2)...] @type bornesAncres: [(borneGaucheListeMots1,borneGaucheListeMots2),(borneDroiteListeMots1,borneDroiteListeMots2)] @type liste1,liste2,listeMots1,listeMots2 : liste de blocs ou les champs val, posDebut, posFin et index sont renseignés @return: liste de blocs alignés, liste des blocs ajoutes @rtype: ([(DebutBlocTxt1,FinBlocTxt1)...],[(DebutBlocTxt2,FinBlocTxt2)...]),([(DebutBlocTxt1,FinBlocTxt1)...],[(DebutBlocTxt2,FinBlocTxt2)...]) """ #print "Debut __alignePhase3 : ",bornesAncres,motsAlignes #concaténation des liste de mots alignés et mots ancrés, trié par ordre de position dans les textes logging.log(3,"Fonction __alignePhase3 invoquée, bornes liste1 :[%s %s] liste2:[%s %s]",bornesAncres[0][0],bornesAncres[1][0],bornesAncres[0][1],bornesAncres[1][1]) borneGauche1 = bornesAncres[0][0] borneGauche2 = bornesAncres[0][1] blocAlignes1 = [] blocAlignes2 = [] blocAjoutes1 = [] blocAjoutes2 = [] for i in xrange(len(motsAlignes)): borneDroite1 = motsAlignes[i][0] borneDroite2 = motsAlignes[i][1] (retAlignes1,retAlignes2),(retAjoutes1,retAjoutes2) = self.__lanceHMMPhase3(listeMots1[borneGauche1+1:borneDroite1],listeMots2[borneGauche2+1:borneDroite2]) if __debug__ : #on verifie que les car alignés soient bien situés dans les bornes for pos in retAlignes1+retAjoutes1: for subPos in pos : assert listeMots1[borneGauche1+1].debut <= subPos #le bloc commence ou termine apres la borne de gauche assert listeMots1[borneDroite1].fin >= subPos #le bloc commence ou termine avant la borne droite for pos in retAlignes2+retAjoutes2: for subPos in pos : assert listeMots2[borneGauche2+1].debut <= subPos #le bloc commence ou termine apres la borne de gauche assert listeMots2[borneDroite2].fin >= subPos #le bloc commence ou termine avant la borne droite blocAlignes1.extend(retAlignes1) blocAlignes2.extend(retAlignes2) blocAjoutes1.extend(retAjoutes1) blocAjoutes2.extend(retAjoutes2) if __debug__ : #les liste sont normalement triées en retour de __lanceHMMPhase3 bAl1 = blocAlignes1[:] bAl1.sort() bAl2 = blocAlignes2[:] bAl2.sort() bAj1 = blocAjoutes1[:] bAj1.sort() bAj2 = blocAjoutes2[:] bAj2.sort() assert blocAlignes1 == bAl1 assert blocAlignes2 == bAl2 assert blocAjoutes1 == bAj1 assert blocAjoutes2 == bAj2 borneGauche1 = borneDroite1 borneGauche2 = borneDroite2 #traitement pour la partie a droite du dernier mot aligne if borneGauche1 < len(listeMots1)-1 and borneGauche2 < len(listeMots2)-1: (retAlignes1,retAlignes2),(retAjoutes1,retAjoutes2) = self.__lanceHMMPhase3(listeMots1[borneGauche1+1:bornesAncres[1][0]], listeMots2[borneGauche2+1:bornesAncres[1][1]]) #blocAlignes = ... if __debug__ : #on verifie que les car alignés soient bien situés dans les bornes for pos in retAlignes1+retAjoutes1: for subPos in pos : assert listeMots1[borneGauche1+1].debut <= subPos #le bloc commence ou termine apres la borne de gauche assert listeMots1[bornesAncres[1][0]-1].fin >= subPos #le bloc commence ou termine avant la borne droite for pos in retAlignes2+retAjoutes2: for subPos in pos : assert listeMots2[borneGauche2+1].debut <= subPos #le bloc commence ou termine apres la borne de gauche assert listeMots2[bornesAncres[1][1]-1].fin >= subPos #le bloc commence ou termine avant la borne droite blocAlignes1.extend(retAlignes1) blocAlignes2.extend(retAlignes2) blocAjoutes1.extend(retAjoutes1) blocAjoutes2.extend(retAjoutes2) if __debug__ : #les liste sont normalement triées en retour de __lanceHMMPhase3 bAl1 = blocAlignes1[:] bAl1.sort() bAl2 = blocAlignes2[:] bAl2.sort() bAj1 = blocAjoutes1[:] bAj1.sort() bAj2 = blocAjoutes2[:] bAj2.sort() assert blocAlignes1 == bAl1 assert blocAlignes2 == bAl2 assert blocAjoutes1 == bAj1 assert blocAjoutes2 == bAj2 logging.log(3,"Fin __alignePhase3") #print "fin __alignePhase3 : ",blocAlignes1,blocAlignes2 return (blocAlignes1,blocAlignes2),(blocAjoutes1,blocAjoutes2) def __lanceHMMPhase3(self,liste1,liste2): """Crée les listes de caracteres et effectue la comparaison HMM sur ces 2 listes recherche les blocs ajoutes / supprimes @param liste1: les elements du texte1 que l'on souhaite aligner @param liste2: les elements du texte2 que l'on souhaite aligner @type liste1,liste2: liste de blocs ou les champs val, posDebut, posFin et index sont renseignés @return: liste de blocs alignés, liste des blocs ajoutes @rtype: ([(DebutBlocTxt1,FinBlocTxt1)...],[(DebutBlocTxt2,FinBlocTxt2)...]),([(DebutBlocTxt1,FinBlocTxt1)...],[(DebutBlocTxt2,FinBlocTxt2)...]) """ #on créer une liste de blocs pour chaque texte ou un bloc est un caractere logging.log(3,"Fonction __lanceHMMPhase3 invoquée") listeCar1 = [] listeCar2 = [] # print "debut __lanceHMMPhase3:\n" logging.log(3,"Création des listes de caracteres") index = 0 for bloc in liste1 : for i in xrange(bloc.fin-bloc.debut) : listeCar1.append(bloc(self.texte1[i+bloc.debut],bloc.debut+i,bloc.debut+i+1,index)) index += 1 index = 0 for bloc in liste2: for i in xrange(bloc.fin-bloc.debut) : listeCar2.append(bloc(self.texte2[i+bloc.debut],bloc.debut+i,bloc.debut+i+1,index)) index += 1 logging.log(2,"listeCar1 : %s",listeCar1) logging.log(2,"listeCar2 : %s",listeCar2) hmmPhase3 = algoHMM(listeCar1,listeCar2) # self.prof.runctx("carAlignes = hmmPhase3.calculeAlignement()",globals(),locals()) carAlignes = hmmPhase3.calculeAlignement() (blocAlignes1,blocAlignes2) = self.__filtrageApareillementsIdentiques(carAlignes, listeCar1, listeCar2) if __debug__ : logging.log(2,"Resultat HMM (apres postprocessing): %s",carAlignes) logging.log(2,"Caracteres alignés : (texte1 | texte2)") for bloc in carAlignes : assert listeCar1[bloc[0]].val == listeCar2[bloc[1]].val logging.log(2,"(%s | %s)",listeCar1[bloc[0]].val, listeCar2[bloc[1]].val) assert len(blocAlignes1) == len(blocAlignes2) for i in xrange(len(blocAlignes1)): #les indices des blocsAlignes sont corrects assert hash(self.texte1[blocAlignes1[i][0]:blocAlignes1[i][1]]) == hash(self.texte2[blocAlignes2[i][0]:blocAlignes2[i][1]]) i = 0 #calcul des blocs ajoutes / supprimes : bloc non aligne de txt1 => supr, txt2 => ajout logging.log(3,"Debut du calcul des blocs supprimés / ajoutés") for bloc1,bloc2 in carAlignes: #on supprime les blocs alignés des listes de blocs listeCar1.pop(bloc1-i) listeCar2.pop(bloc2-i) i+=1 blocAjoutes1 = [] blocAjoutes2 =[] for bloc in listeCar1: blocAjoutes1.append((bloc.debut,bloc.fin)) for bloc in listeCar2: blocAjoutes2.append((bloc.debut,bloc.fin)) logging.log(2,"BlocAjoutes1 : %s",blocAjoutes1) logging.log(2,"BlocAjoutes2 : %s", blocAjoutes2) logging.log(3,"fin __lanceHMMPhase3") return (blocAlignes1,blocAlignes2),(blocAjoutes1,blocAjoutes2) def __creerListeMots(self,texte): """effectue la séparation des mots du texte @param texte: le texte que l'on souhaite separer @type texte: String @return: liste de bloc(hash,posdebut,posfin+1,posListeMots) """ if self.ignorerSeparateurs : #si on ignore les separateurs, on va tous les remplacer par le caractere " " #separateurs : !"#$%&'()*+, -./:;<=>?@[\]^_`{|}~\n\r\t #separateurs = string.punctuation+"\n\r\t " separateurs = """!\r,\n:\t;-?"'`’()""" sepTable = string.maketrans(separateurs," "*len(separateurs)) texte = texte.translate(sepTable) texteSplit = texte.split(" ") pos = -1 posListe = 0 listeMots = [] for mot in texteSplit: if len(mot) == 0 : #on a splité entre 2 separateurs pos+=1 else: pos+=1 listeMots.append( bloc(hash(mot), pos, pos+len(mot), posListe) ) #(hash,posDebut,posFin+1,index) pos+=len(mot) posListe+=1 return listeMots def __creerDico(self,liste): """Crée pour chaque mot un dictionnaire comportant son nombre d'occurences et ses positions @param liste: la liste a traiter @type liste: liste de bloc(hash,posdebut,posfin+1,posListeMots) @return: le dictionnaire des mots présentds dans la liste @type dico de (hash : (nbOccurences,len,[posOccListe,...],[posOccTxt...]) """ #index = 0 dico = {} for mot in liste : #si le hash n'est pas déja indexé dans le dictionnaire hashMot=mot.val if not(dico.has_key(hashMot)): #ajout du mot au dictionnaire dico[hashMot] = (1,mot.fin-mot.debut,[mot.index],[mot.debut]) #(nbocc,len,posListe,posTxt) else: #modification des valeurs associées au mot dans le dictionnaire (nbOcc,lenMot,posListe,posTexte) = dico[hashMot] posListe.append(mot.index) posTexte.append(mot.debut) dico[hashMot] = (nbOcc+1,lenMot,posListe,posTexte) #index+=1 return dico def __creerListes(self,dicoTexte1,dicoTexte2): """Cree les listes de la phase 1.(a) et 1.(b) de l'algo liste des apax : termes uniques dans chaque texte et communs aux 2 textes @param dicoTexte1,dicoTexte2 : les dictionnaires pour les deux textes @type dicoTexte1,dicoTexte2: dico de (hash : (nbOccurences,len,[posOccListe,...],[posOccTxt...]) @return les listes A,B des apax dans les textes 1 et 2 @type: listes de bloc(val,index) """ #PHASE 1.(a) A = [] # liste apax texte 1 for key,v in dicoTexte1.iteritems(): if v[0]==1 : A.append(bloc(key , index=v[2][0])) #sa position dans listeMots1 #tri de la liste a dans l'ordre d'apparition des mots dans le texte1 A.sort(cmp=lambda x,y: cmp(x.index,y.index)) #PHASE 1.(b) #filtrer la liste #print self.texte1,"\n",self.texte2,"\n",A,"\n=========================" #création de la liste B : liste des mots correspondants a la liste A dans le texte2, et filtrage de la liste 1 de telle sorte que chaque éléement de A aparaisse dans B B=[] # liste apax texte 2 aSupprimer=[] # cpt = 1 for mot in A: if not dicoTexte2.has_key(mot.val): aSupprimer.append(mot) elif dicoTexte2[mot.val][0]>1 : aSupprimer.append(mot) else : listeIndex = dicoTexte2[mot.val][2] for index in listeIndex: B.append(bloc(mot.val,index=index)) for mot in aSupprimer : A.pop(A.index(mot)) #tri de la liste B dans l'ordre d'apparition des mots dans le texte2 B.sort(cmp=lambda x,y: cmp(x.index,y.index)) #TODO : créer la liste B return A,B def __filtrageApareillementsIdentiques(self,aligneHMM,listeMots1,listeMots2): """determine l'alignement une fois que le HMM a renvoyé les blocs considérés comme alignés /!\ EFFECTUE DES EFFETS DE BORD SUR aligneHMM /!\ renvoie la liste des blocs alignés, et le cas échéant la liste des blocs insérés sous forme de liste de (posDeb,posFin) @param aligneHMM: le resultat de l'alignement HMM */!\_MODIFIE EN RETOUR PAR CETTE METHODE_/!\* @type aligneHMM: [(posBlocAligné1,posBlocAligné2)...] @param listeMots1,listeMots2: les listes de mots pour les textes @type listeMots1,listeMots2: liste de blocs ou les champs val, posDebut, posFin et index sont renseignés """ aSupprimer = [] blocsAlignes1 = [] blocsAlignes2 = [] aligneHMM.sort() for pos1,pos2 in aligneHMM : if listeMots1[pos1].val != listeMots2[pos2].val : #on ajoute le couple a la liste des couples a supprimer car les mots ne correspondent pas aSupprimer.append((pos1,pos2)) for pos in aSupprimer : aligneHMM.pop(aligneHMM.index(pos)) #on a filtré tous les éléments dont le hash / valeur ne correspondent pas #si on a plusieurs correspondances pour un bloc, on prend le premier # cad qu'un état à émis plusieurs mots dans le texte 2 aSupprimer = [] posActuelle=-1 for pos1,pos2 in aligneHMM : if pos1 != posActuelle : posActuelle = pos1 blocsAlignes1.append((listeMots1[pos1].debut,listeMots1[pos1].fin)) #(blocDebut,blocFin) blocsAlignes2.append((listeMots2[pos2].debut,listeMots2[pos2].fin)) #(blocDebut,blocFin) else : aSupprimer.append((pos1,pos2)) for pos in aSupprimer : aligneHMM.pop(aligneHMM.index(pos)) return (blocsAlignes1,blocsAlignes2) def __regrouperBlocs(self,listeBlocs,txt,ignorerPonctuation=False): """regroupe les blocs adjacents d'une liste de blocs /!\ EFFECTUE UN EFFET DE BORD SUR listeBlocs /!\ @param listeBlocs: la liste des blocs @type listeBlocs: [(posDebut,posFin+1)] @param txt: le texte correspondant @type txt: String """ separateurs = " " if self.ignorerSeparateurs : #si on ignore les separateurs, on va tous les remplacer par le caractere " " #separateurs : !"#$%&'()*+, -./:;<=>?@[\]^_`{|}~\n\r\t separateurs = """!\r,\n:\t;-?"'`’()""" i=0 while i != len(listeBlocs)-1: if listeBlocs[i][1] == listeBlocs[i+1][0] : listeBlocs[i] = (listeBlocs[i][0],listeBlocs[i+1][1]) del listeBlocs[i+1] # à priori à enlever, mais ne change pas les résultats pour les XP # en ignorer ponctuation elif (not ignorerPonctuation) and txt[listeBlocs[i][1]] in separateurs: listeBlocs[i] = (listeBlocs[i][0],listeBlocs[i][1]+1) else : i += 1 ####################################################################################################### #Depracated ####################################################################################################### def __creerListesFeng(self,dicoTexte1,dicoTexte2,listeMots1,listeMots2): """ @deprecated: nous n'utilisons pas les methodes definies dans le papier de Feng & Manmatha, qui s'averent tres contraignantes sur les HMM @see: __creerListes """ #PHASE 1.(a) A = [] for key,v in dicoTexte1.iteritems(): if v[0]==1 : A.append(bloc(key , index=v[2][0])) #sa position dans listeMots1 #tri de la liste a dans l'ordre d'apparition des mots dans le texte1 A.sort(cmp=lambda x,y: cmp(x.index,y.index)) #PHASE 1.(b) #filtrer la liste #print self.texte1,"\n",self.texte2,"\n",A,"\n=========================" self.__filtrerListe(A,dicoTexte1,dicoTexte2,listeMots1,listeMots2) #print A,"\n==================" #création de la liste B : liste des mots correspondants a la liste A dans le texte2 B=[] # cpt = 1 for mot in A: for index in dicoTexte2[mot.val][2]: B.append(bloc(mot.val,index=index)) #tri de la liste B dans l'ordre d'apparition des mots dans le texte2 B.sort(cmp=lambda x,y: cmp(x.index,y.index)) #TODO : créer la liste B return A,B def __filtrerListe(self,liste,dicoTexte1,dicoTexte2,listeMots1,listeMots2): """ """ aSupprimmer = [] for element in liste: #suppression des éléments n'aparaissant pas dans le texte2 mot = element.val if not dicoTexte2.has_key(mot) : aSupprimmer.append(element) else: #suppresion des mots dont les voisins immédiats ne sont pas identiques pos1 = dicoTexte1[mot][2][0] #position du mot dans listeMots1 suivant1 = None precedant1 = None if pos1 > 0 : precedant1 = listeMots1[pos1-1].val #mot précédent le mot actuel if pos1 < len(listeMots1)-1: suivant1 = listeMots1[pos1+1].val #mot suivant le mot actuel motTrouveDansTexte2 = False for pos2 in dicoTexte2[mot][2]: #pos2 position du mot dans listeMots2 identique = True if pos2 > 0 and precedant1 != None : precedant2 = listeMots2[pos2-1].val #mot précédent le mot correspondant dans le texte2 if precedant1 != precedant2 : identique = False if pos2 < len(listeMots1)-1 and suivant1 != None: suivant2 = listeMots2[pos2+1].val #mot suivant le mot correspondant dans le texte2 if suivant1 != suivant2: identique = False if identique : motTrouveDansTexte2 = True if not motTrouveDansTexte2: #on n'a pas trouvé de mot ayant des voisins immédiats identiques dans le texte2 aSupprimmer.append(element) for elem in aSupprimmer : liste.pop(liste.index(elem)) def __filtrageApareillementsIdentiquesOld(self,aligneHMM,listeMots1,listeMots2): """determine l'alignement une fois que le HMM a renvoyé les blocs considérés comme alignés /!\ EFFECTUE DES EFFETS DE BORD SUR aligneHMM /!\ renvoie la liste des blocs alignés, et le cas échéant la liste des blocs insérés sous forme de liste de (posDeb,posFin) @deprecated: cette fonction est moche (mais vraiment tres moche en fait) :( @see: __filtrageApareillementsIdentiques """ #print "debut __filtrageApareillementsIdentiques" aligne = {} blocsAlignes1 = [] blocsAlignes2 = [] blocsAjoutes1 = [] blocsAjoutes2 = [] aSupprimer = [] for pos in aligneHMM : pos1 = pos[0] pos2 = pos[1] hashMot = listeMots1[pos1].val if hashMot == listeMots2[pos2].val : #le mot est identique if not aligne.has_key(hashMot): #on n'a pas encore mis ce mot ds le dico des mots alignés aligne[hashMot] = (1,[(pos1,pos2)]) else : #on a deja mis ce mot ds le dico des mots alignés (cpt,listePos) = aligne[hashMot] cpt += 1 listePos.append((pos1,pos2)) aligne[hashMot] = (cpt,listePos) else : #on ajoute le couple a la liste des couples a supprimer car les mots ne correspondent pas aSupprimer.append(pos) # for pos in aSupprimer : # #le mot n'est pas identique, on l'enleve de la liste des mots alignés par les HMM # #/!\ LA LISTE EST MODIFIEE EN RETOUR # del aligneHMM[aligneHMM.index(pos)] #traitement du dico des mots alignes for mot, (cpt,listePos) in aligne.iteritems(): if cpt == 1 : #il n'y a qu'un seul alignement possible blocsAlignes1.append((listeMots1[listePos[0][0]].debut,listeMots1[listePos[0][0]].fin)) #(blocDebut,blocFin) blocsAlignes2.append((listeMots2[listePos[0][1]].debut,listeMots2[listePos[0][1]].fin)) #(blocDebut,blocFin) #sort sur le premier element des tuples par defaut blocsAlignes1.sort() blocsAlignes2.sort() #A FAIRE §§§§§§ else : # print mot, (cpt,listePos) #plus d'un alignement possible #plusieurs cas : [a,b] [c,d] .... les couples n'ont aucun élément commun : ils sont tous ok #[a,b][a,c][c,d].... les couples ont des éléments communs : on doit choisir les meilleurs couples, les mots restant sont des insertions / suppresions # => on va créer un dictionnaire par texte afin de determiner l'unicité des index #on ne peut pas utiliser les dictionnaires principaux car il est posible que l'on ne travaille que sur une sous-liste de listeMots dicoPos1={} dicoPos2={} for pos in listePos : pos1=pos[0] pos2=pos[1] if not dicoPos1.has_key(pos1): dicoPos1[pos1] = (1,[pos2]) #nbOcc,liste des positions des termes alignés dans le texte2 else : #plus d'une occurence de cette position nbOcc,listeAlignement = dicoPos1[pos1] nbOcc += 1 listeAlignement.append(pos2) dicoPos1[pos1] = (nbOcc,listeAlignement) #de meme pour pos2 if not dicoPos2.has_key(pos2): dicoPos2[pos2] = (1,[pos1]) #nbOcc,liste des positions des termes alignés dans le texte1 else : #plus d'une occurence de cette position nbOcc,listeAlignement = dicoPos2[pos2] nbOcc += 1 listeAlignement.append(pos1) dicoPos2[pos2] = (nbOcc,listeAlignement) # print "dico1: ",dicoPos1 # print "dico2: ",dicoPos2 for pos1,(nbOcc1,listeLies1) in dicoPos1.iteritems(): if nbOcc1 == 1 and len(listeLies1) == 1: #une seule occurence de ce terme #on a un seul terme aligné avec celui ci dans le texte2 blocsAlignes1.append((listeMots1[pos1].debut,listeMots1[pos1].fin)) blocsAlignes2.append((listeMots2[listeLies1[0]].debut,listeMots2[listeLies1[0]].fin)) else : #plus d'un terme possible listeLies1.sort() # print ">",listeLies1 bons=[] #liste des positions n'etant liées qu'a pos1 autres=[] #liste des positions auant plus d'un alignement possible for pos2 in listeLies1: if dicoPos2[pos2][0]==1 : bons.append(pos2) else : autres.append(pos2) # print "bons :",bons # print "autres; ",autres if len(bons)>0: # # /§\ ON PEUT MODIFIER ICI # #on prend le premier element blocsAlignes1.append((listeMots1[pos1].debut,listeMots1[pos1].fin)) blocsAlignes2.append((listeMots2[bons[0]].debut,listeMots2[bons[0]].fin)) bons.pop(0) for pos2 in bons : aSupprimer.append((pos1,pos2)) # else : # #il faut trouver le meilleur des autres alignements # on ne peut normalement pas rentrer dans ce bloc for pos in aSupprimer : #le mot n'est pas identique, on l'enleve de la liste des mots alignés par les HMM #/!\ LA LISTE EST MODIFIEE EN RETOUR del aligneHMM[aligneHMM.index(pos)] #tri des listes blocsAlignes1.sort() blocsAlignes2.sort() return (blocsAlignes1,blocsAlignes2),(blocsAjoutes1,blocsAjoutes2)