Module conversationkg.kgs.KGs

Expand source code
#from ..conversations.corpus import EmailCorpus, Conversation
#from ..conversations.emails import Email
#from ..conversations.topics import TopicModel
from ..conversations.entities import Person as WholePerson


from ..conversations import corpus, emails, entities  # , topics

conversations_modules = [corpus, emails, entities] # , topics]


import numpy as np

from tqdm import tqdm
import json

import Levenshtein
import spacy
nlp = spacy.load("en_core_web_md")




# distance threshold value of around 0.3 seems to capture identity quite well
class Person(WholePerson):
    def __init__(self, person):  #, distance_threshold=0.0):
        self.__dict__ = person.__dict__
        
        self.name = self.name.lower()

    def __repr__(self):
        return f"PersonNode({str(self)})"
        
    def __str__(self):
        return f"{self.name if self.name else '_'}"

    
    def distance_from(self, other):
        my_name, your_name = self.name, other.name
        
        if max(len(my_name), len(your_name)) == 0:
            return 0.
        
        d = Levenshtein.distance(my_name, your_name)
        
        return d/max(len(my_name), len(your_name))
    
    
    def __hash__(self):
#        raise NotImplementedError
        return hash(self.name)
        
   
    def __eq__(self, other):
        if not (type(self) == type(other)):
            return False
        return self.name == other.name


def put(d, x, i):
    if not x in d:
        d[x] = i
        i += 1
    return d[x], i

def put_based_on_eq(d, x, i):
    if not type(x) is Person:
        return put(d, x, i)
    
    print(x.name, end=", ")

    approx_matches = [other_x for other_x in d if x == other_x]
    
    if not approx_matches:
        d[x] = i
        i += 1
        return d[x], i
    else:
        found_i = d[np.random.choice(approx_matches)]
        return found_i, i




class KG:
    @classmethod
    def from_email_corpus(cls, email_corpus, triples=[], provenances=[]):    
        for conv in email_corpus:
            for email in conv:
                triples.append((email, "part_of", conv)) # both
                provenances.append(email.message_id)
                
                for link in email.body.links: 
                    triples.append((email, "mentions", link)) # both
                    provenances.append(email.message_id)

                for addr in email.body.addresses:
                    triples.append((email, "mentions", addr)) # both
                    provenances.append(email.message_id)
        
        
        for c1, c2 in zip(email_corpus, email_corpus[1:]):
            triples.append((c1, "before", c2))
            provenances.append(c2[0].message_id)
            
            for e1, e2 in zip(c1, c1[1:]):
                triples.append((e1, "before", e2))
                provenances.append(e2.message_id)

        
        return cls(triples, provenances)
    
    
    
    def __init__(self, triples, provenances):
        self.triples = triples
        self.provenances = provenances
    
    def translate(self, entity2ind=None, pred2ind=None, attach=False):
        if entity2ind or pred2ind:
            assert entity2ind and pred2ind, "Please provide both entity2ind and pred2ind or none!"
            i = max(entity2ind.values()) + 1
            j = max(pred2ind.values()) + 1
        else:
            i = j = 0
            entity2ind, pred2ind = {}, {}
        
        put_ = put
        
        translated = []
        for s, p, o in tqdm(self.triples, desc="translating"):
            s_prime, i = put_(entity2ind, s, i)
            o_prime, i = put_(entity2ind, o, i)
            p_prime, j = put_(pred2ind, p, j)
            
            translated.append((s_prime, p_prime, o_prime))
        
        
        if attach:
            self.translated = translated
            self.entity2ind = entity2ind
            self.pred2ind = pred2ind
        else:
            return translated, entity2ind, pred2ind


    @staticmethod
    def unified_translation(*kgs, attach=False):
        uni_e2i, uni_p2i = {}, {}
        for kg in tqdm(kgs, desc="unified translating"):
            translated, uni_e2i, uni_p2i = kg.translate(uni_e2i, uni_p2i, attach=False)
            if attach:
                kg.translated = translated

        if attach:
            for kg in kgs:
                kg.entity2ind = uni_e2i
                kg.pred2ind = uni_p2i
        else:
            return uni_e2i, uni_p2i
        
    
    def tuples(self):
        return [(s, o) for s, p, o in self.triples]
    
    def entities(self, filter_f=lambda x: True):
        return set(e for s, p, o in self.triples for e in (s, o) if filter_f(e))
    
    def predicates(self):
        return set(p for s, p, o in self.triples)
    
    def store(self, name, save_mapping=True):
        with open(f"{name}.json", "w") as handle:
            json.dump(self.translated, handle)
            
        with open(f"{name}.provenances.json", "w") as handle:
            json.dump(self.provenances, handle)
        
        if save_mapping:
            reversed_d = self.reverse_mapping(self.entity2ind)
            json_d = {i:e.to_json() for i, e in reversed_d.items()}
            
            with open(f"{name}.ind2entity.json", "w") as handle:
                json.dump(json_d, handle)
            
            reverse_d = self.reverse_mapping(self.pred2ind)
            with open(f"{name}.ind2pred.json", "w") as handle:
                json.dump(reverse_d, handle)
    
    
    @classmethod
    def restore(cls, name, load_mapping_of=None):
        def get_class(cls_name):
            for mod in conversations_modules:
                try:
                    cls = getattr(mod, cls_name)
                    return cls
                except AttributeError:
                    pass
            raise AttributeError(f"{cls_name} could not be found in any of the modules!")
        
        
        def json_to_entity(json_dict):
            try:
                json_dict["class"]
            except KeyError:
                print(json_dict.keys())
                raise
            
            cls_name = json_dict["class"]
            cls = get_class(cls_name)
            return cls.from_json(json_dict)
        
        
        if load_mapping_of is None:
            load_mapping_of = name
        
        with open(f"{load_mapping_of}.ind2entity.json") as handle:
            loaded_entity_mapping = {int(i): d for i, d in json.load(handle).items()}
            ind2entity = {i:json_to_entity(d) for i, d in loaded_entity_mapping.items()}
        
        ind2entity = {i: (Person(x) if type(x) is WholePerson else x)
                        for i, x in ind2entity.items()}
        
        with open(f"{load_mapping_of}.ind2pred.json") as handle:
            ind2pred = {int(i): d for i, d in json.load(handle).items()}
        
        
        with open(f"{name}.json") as handle:
            loaded = json.load(handle)    
        
        restored_triples = [(ind2entity[s],
                             ind2pred[p],
                             ind2entity[o]) for s, p, o in loaded]
                
        
        with open(f"{name}.provenances.json") as handle:
            provenances = json.load(handle)
            
        
        kg = KG(restored_triples, provenances)
        
        kg.translated = loaded
        kg.entity2ind = kg.reverse_mapping(ind2entity)
        kg.pred2ind = kg.reverse_mapping(ind2pred)
        
        return kg
    
    
    @staticmethod
    def reverse_mapping(d):
        rev_d = {}
        
        for k, v in d.items():
            if not v in rev_d:
                rev_d[v] = k
            else:
                print("duplicate:", v)
                if not type(v) is Person:
                    raise ValueError("Non-bijective mapping!")
        
        return rev_d
    
    
    def to_csv(self, save_path):
        raise DeprecationWarning("Deprecated; Use:\nfrom kgs.writers import CSVWriter\n"
                                 "CSVWriter(kg).to_csv(save_path)")
        
        
    @staticmethod
    def _merge_nodes(kg, node_type, distance_threshold):
        entities = kg.entities(lambda x: type(x) is node_type)
        merge_with = {}
        for x in entities:
            matches = [x2 for x2 in entities
                       if x.distance_from(x2) <= distance_threshold 
                          and not x == x2]
            
            merge_with[x] = set(matches)

        sorted_d = sorted(merge_with.items(), 
                          key=lambda it: (len(it[1]), -len(it[0].name)), 
                          reverse=True)

        merging_f = {}
        donotmerge = set()
        alreadymerged = set()

        for x, s in sorted_d:
            if not x in alreadymerged:
                for x2 in s:
                    if not x2 in donotmerge:   
                        merging_f[x2] = x
                        donotmerge.add(x)
                        alreadymerged.add(x2)
        
        return merging_f
    
        
    @classmethod
    def merge_persons_of(cls, kg, distance_threshold):
        merging_f = cls._merge_nodes(kg, Person, distance_threshold)

        replace = lambda entity: merging_f[entity]\
                        if entity in merging_f else entity
        
        new_triples = []

        for s, p, o in kg.triples:
            s_, o_ = replace(s), replace(o)
    
            if s_ == o_:
                print(s, p, o)
            else:
                new_triples.append((s_, p, o_))


        old_provenances = kg.provenances
        new_kg = cls(new_triples, old_provenances)
        new_kg.merge_mapping = merging_f
        
        return new_kg

Functions

def put(d, x, i)
Expand source code
def put(d, x, i):
    if not x in d:
        d[x] = i
        i += 1
    return d[x], i
def put_based_on_eq(d, x, i)
Expand source code
def put_based_on_eq(d, x, i):
    if not type(x) is Person:
        return put(d, x, i)
    
    print(x.name, end=", ")

    approx_matches = [other_x for other_x in d if x == other_x]
    
    if not approx_matches:
        d[x] = i
        i += 1
        return d[x], i
    else:
        found_i = d[np.random.choice(approx_matches)]
        return found_i, i

Classes

class KG (triples, provenances)
Expand source code
class KG:
    @classmethod
    def from_email_corpus(cls, email_corpus, triples=[], provenances=[]):    
        for conv in email_corpus:
            for email in conv:
                triples.append((email, "part_of", conv)) # both
                provenances.append(email.message_id)
                
                for link in email.body.links: 
                    triples.append((email, "mentions", link)) # both
                    provenances.append(email.message_id)

                for addr in email.body.addresses:
                    triples.append((email, "mentions", addr)) # both
                    provenances.append(email.message_id)
        
        
        for c1, c2 in zip(email_corpus, email_corpus[1:]):
            triples.append((c1, "before", c2))
            provenances.append(c2[0].message_id)
            
            for e1, e2 in zip(c1, c1[1:]):
                triples.append((e1, "before", e2))
                provenances.append(e2.message_id)

        
        return cls(triples, provenances)
    
    
    
    def __init__(self, triples, provenances):
        self.triples = triples
        self.provenances = provenances
    
    def translate(self, entity2ind=None, pred2ind=None, attach=False):
        if entity2ind or pred2ind:
            assert entity2ind and pred2ind, "Please provide both entity2ind and pred2ind or none!"
            i = max(entity2ind.values()) + 1
            j = max(pred2ind.values()) + 1
        else:
            i = j = 0
            entity2ind, pred2ind = {}, {}
        
        put_ = put
        
        translated = []
        for s, p, o in tqdm(self.triples, desc="translating"):
            s_prime, i = put_(entity2ind, s, i)
            o_prime, i = put_(entity2ind, o, i)
            p_prime, j = put_(pred2ind, p, j)
            
            translated.append((s_prime, p_prime, o_prime))
        
        
        if attach:
            self.translated = translated
            self.entity2ind = entity2ind
            self.pred2ind = pred2ind
        else:
            return translated, entity2ind, pred2ind


    @staticmethod
    def unified_translation(*kgs, attach=False):
        uni_e2i, uni_p2i = {}, {}
        for kg in tqdm(kgs, desc="unified translating"):
            translated, uni_e2i, uni_p2i = kg.translate(uni_e2i, uni_p2i, attach=False)
            if attach:
                kg.translated = translated

        if attach:
            for kg in kgs:
                kg.entity2ind = uni_e2i
                kg.pred2ind = uni_p2i
        else:
            return uni_e2i, uni_p2i
        
    
    def tuples(self):
        return [(s, o) for s, p, o in self.triples]
    
    def entities(self, filter_f=lambda x: True):
        return set(e for s, p, o in self.triples for e in (s, o) if filter_f(e))
    
    def predicates(self):
        return set(p for s, p, o in self.triples)
    
    def store(self, name, save_mapping=True):
        with open(f"{name}.json", "w") as handle:
            json.dump(self.translated, handle)
            
        with open(f"{name}.provenances.json", "w") as handle:
            json.dump(self.provenances, handle)
        
        if save_mapping:
            reversed_d = self.reverse_mapping(self.entity2ind)
            json_d = {i:e.to_json() for i, e in reversed_d.items()}
            
            with open(f"{name}.ind2entity.json", "w") as handle:
                json.dump(json_d, handle)
            
            reverse_d = self.reverse_mapping(self.pred2ind)
            with open(f"{name}.ind2pred.json", "w") as handle:
                json.dump(reverse_d, handle)
    
    
    @classmethod
    def restore(cls, name, load_mapping_of=None):
        def get_class(cls_name):
            for mod in conversations_modules:
                try:
                    cls = getattr(mod, cls_name)
                    return cls
                except AttributeError:
                    pass
            raise AttributeError(f"{cls_name} could not be found in any of the modules!")
        
        
        def json_to_entity(json_dict):
            try:
                json_dict["class"]
            except KeyError:
                print(json_dict.keys())
                raise
            
            cls_name = json_dict["class"]
            cls = get_class(cls_name)
            return cls.from_json(json_dict)
        
        
        if load_mapping_of is None:
            load_mapping_of = name
        
        with open(f"{load_mapping_of}.ind2entity.json") as handle:
            loaded_entity_mapping = {int(i): d for i, d in json.load(handle).items()}
            ind2entity = {i:json_to_entity(d) for i, d in loaded_entity_mapping.items()}
        
        ind2entity = {i: (Person(x) if type(x) is WholePerson else x)
                        for i, x in ind2entity.items()}
        
        with open(f"{load_mapping_of}.ind2pred.json") as handle:
            ind2pred = {int(i): d for i, d in json.load(handle).items()}
        
        
        with open(f"{name}.json") as handle:
            loaded = json.load(handle)    
        
        restored_triples = [(ind2entity[s],
                             ind2pred[p],
                             ind2entity[o]) for s, p, o in loaded]
                
        
        with open(f"{name}.provenances.json") as handle:
            provenances = json.load(handle)
            
        
        kg = KG(restored_triples, provenances)
        
        kg.translated = loaded
        kg.entity2ind = kg.reverse_mapping(ind2entity)
        kg.pred2ind = kg.reverse_mapping(ind2pred)
        
        return kg
    
    
    @staticmethod
    def reverse_mapping(d):
        rev_d = {}
        
        for k, v in d.items():
            if not v in rev_d:
                rev_d[v] = k
            else:
                print("duplicate:", v)
                if not type(v) is Person:
                    raise ValueError("Non-bijective mapping!")
        
        return rev_d
    
    
    def to_csv(self, save_path):
        raise DeprecationWarning("Deprecated; Use:\nfrom kgs.writers import CSVWriter\n"
                                 "CSVWriter(kg).to_csv(save_path)")
        
        
    @staticmethod
    def _merge_nodes(kg, node_type, distance_threshold):
        entities = kg.entities(lambda x: type(x) is node_type)
        merge_with = {}
        for x in entities:
            matches = [x2 for x2 in entities
                       if x.distance_from(x2) <= distance_threshold 
                          and not x == x2]
            
            merge_with[x] = set(matches)

        sorted_d = sorted(merge_with.items(), 
                          key=lambda it: (len(it[1]), -len(it[0].name)), 
                          reverse=True)

        merging_f = {}
        donotmerge = set()
        alreadymerged = set()

        for x, s in sorted_d:
            if not x in alreadymerged:
                for x2 in s:
                    if not x2 in donotmerge:   
                        merging_f[x2] = x
                        donotmerge.add(x)
                        alreadymerged.add(x2)
        
        return merging_f
    
        
    @classmethod
    def merge_persons_of(cls, kg, distance_threshold):
        merging_f = cls._merge_nodes(kg, Person, distance_threshold)

        replace = lambda entity: merging_f[entity]\
                        if entity in merging_f else entity
        
        new_triples = []

        for s, p, o in kg.triples:
            s_, o_ = replace(s), replace(o)
    
            if s_ == o_:
                print(s, p, o)
            else:
                new_triples.append((s_, p, o_))


        old_provenances = kg.provenances
        new_kg = cls(new_triples, old_provenances)
        new_kg.merge_mapping = merging_f
        
        return new_kg

Subclasses

Static methods

def from_email_corpus(email_corpus, triples=[], provenances=[])
Expand source code
@classmethod
def from_email_corpus(cls, email_corpus, triples=[], provenances=[]):    
    for conv in email_corpus:
        for email in conv:
            triples.append((email, "part_of", conv)) # both
            provenances.append(email.message_id)
            
            for link in email.body.links: 
                triples.append((email, "mentions", link)) # both
                provenances.append(email.message_id)

            for addr in email.body.addresses:
                triples.append((email, "mentions", addr)) # both
                provenances.append(email.message_id)
    
    
    for c1, c2 in zip(email_corpus, email_corpus[1:]):
        triples.append((c1, "before", c2))
        provenances.append(c2[0].message_id)
        
        for e1, e2 in zip(c1, c1[1:]):
            triples.append((e1, "before", e2))
            provenances.append(e2.message_id)

    
    return cls(triples, provenances)
def merge_persons_of(kg, distance_threshold)
Expand source code
@classmethod
def merge_persons_of(cls, kg, distance_threshold):
    merging_f = cls._merge_nodes(kg, Person, distance_threshold)

    replace = lambda entity: merging_f[entity]\
                    if entity in merging_f else entity
    
    new_triples = []

    for s, p, o in kg.triples:
        s_, o_ = replace(s), replace(o)

        if s_ == o_:
            print(s, p, o)
        else:
            new_triples.append((s_, p, o_))


    old_provenances = kg.provenances
    new_kg = cls(new_triples, old_provenances)
    new_kg.merge_mapping = merging_f
    
    return new_kg
def restore(name, load_mapping_of=None)
Expand source code
@classmethod
def restore(cls, name, load_mapping_of=None):
    def get_class(cls_name):
        for mod in conversations_modules:
            try:
                cls = getattr(mod, cls_name)
                return cls
            except AttributeError:
                pass
        raise AttributeError(f"{cls_name} could not be found in any of the modules!")
    
    
    def json_to_entity(json_dict):
        try:
            json_dict["class"]
        except KeyError:
            print(json_dict.keys())
            raise
        
        cls_name = json_dict["class"]
        cls = get_class(cls_name)
        return cls.from_json(json_dict)
    
    
    if load_mapping_of is None:
        load_mapping_of = name
    
    with open(f"{load_mapping_of}.ind2entity.json") as handle:
        loaded_entity_mapping = {int(i): d for i, d in json.load(handle).items()}
        ind2entity = {i:json_to_entity(d) for i, d in loaded_entity_mapping.items()}
    
    ind2entity = {i: (Person(x) if type(x) is WholePerson else x)
                    for i, x in ind2entity.items()}
    
    with open(f"{load_mapping_of}.ind2pred.json") as handle:
        ind2pred = {int(i): d for i, d in json.load(handle).items()}
    
    
    with open(f"{name}.json") as handle:
        loaded = json.load(handle)    
    
    restored_triples = [(ind2entity[s],
                         ind2pred[p],
                         ind2entity[o]) for s, p, o in loaded]
            
    
    with open(f"{name}.provenances.json") as handle:
        provenances = json.load(handle)
        
    
    kg = KG(restored_triples, provenances)
    
    kg.translated = loaded
    kg.entity2ind = kg.reverse_mapping(ind2entity)
    kg.pred2ind = kg.reverse_mapping(ind2pred)
    
    return kg
def reverse_mapping(d)
Expand source code
@staticmethod
def reverse_mapping(d):
    rev_d = {}
    
    for k, v in d.items():
        if not v in rev_d:
            rev_d[v] = k
        else:
            print("duplicate:", v)
            if not type(v) is Person:
                raise ValueError("Non-bijective mapping!")
    
    return rev_d
def unified_translation(*kgs, attach=False)
Expand source code
@staticmethod
def unified_translation(*kgs, attach=False):
    uni_e2i, uni_p2i = {}, {}
    for kg in tqdm(kgs, desc="unified translating"):
        translated, uni_e2i, uni_p2i = kg.translate(uni_e2i, uni_p2i, attach=False)
        if attach:
            kg.translated = translated

    if attach:
        for kg in kgs:
            kg.entity2ind = uni_e2i
            kg.pred2ind = uni_p2i
    else:
        return uni_e2i, uni_p2i

Methods

def entities(self, filter_f=<function KG.<lambda>>)
Expand source code
def entities(self, filter_f=lambda x: True):
    return set(e for s, p, o in self.triples for e in (s, o) if filter_f(e))
def predicates(self)
Expand source code
def predicates(self):
    return set(p for s, p, o in self.triples)
def store(self, name, save_mapping=True)
Expand source code
def store(self, name, save_mapping=True):
    with open(f"{name}.json", "w") as handle:
        json.dump(self.translated, handle)
        
    with open(f"{name}.provenances.json", "w") as handle:
        json.dump(self.provenances, handle)
    
    if save_mapping:
        reversed_d = self.reverse_mapping(self.entity2ind)
        json_d = {i:e.to_json() for i, e in reversed_d.items()}
        
        with open(f"{name}.ind2entity.json", "w") as handle:
            json.dump(json_d, handle)
        
        reverse_d = self.reverse_mapping(self.pred2ind)
        with open(f"{name}.ind2pred.json", "w") as handle:
            json.dump(reverse_d, handle)
def to_csv(self, save_path)
Expand source code
def to_csv(self, save_path):
    raise DeprecationWarning("Deprecated; Use:\nfrom kgs.writers import CSVWriter\n"
                             "CSVWriter(kg).to_csv(save_path)")
def translate(self, entity2ind=None, pred2ind=None, attach=False)
Expand source code
def translate(self, entity2ind=None, pred2ind=None, attach=False):
    if entity2ind or pred2ind:
        assert entity2ind and pred2ind, "Please provide both entity2ind and pred2ind or none!"
        i = max(entity2ind.values()) + 1
        j = max(pred2ind.values()) + 1
    else:
        i = j = 0
        entity2ind, pred2ind = {}, {}
    
    put_ = put
    
    translated = []
    for s, p, o in tqdm(self.triples, desc="translating"):
        s_prime, i = put_(entity2ind, s, i)
        o_prime, i = put_(entity2ind, o, i)
        p_prime, j = put_(pred2ind, p, j)
        
        translated.append((s_prime, p_prime, o_prime))
    
    
    if attach:
        self.translated = translated
        self.entity2ind = entity2ind
        self.pred2ind = pred2ind
    else:
        return translated, entity2ind, pred2ind
def tuples(self)
Expand source code
def tuples(self):
    return [(s, o) for s, p, o in self.triples]
class Person (person)
Expand source code
class Person(WholePerson):
    def __init__(self, person):  #, distance_threshold=0.0):
        self.__dict__ = person.__dict__
        
        self.name = self.name.lower()

    def __repr__(self):
        return f"PersonNode({str(self)})"
        
    def __str__(self):
        return f"{self.name if self.name else '_'}"

    
    def distance_from(self, other):
        my_name, your_name = self.name, other.name
        
        if max(len(my_name), len(your_name)) == 0:
            return 0.
        
        d = Levenshtein.distance(my_name, your_name)
        
        return d/max(len(my_name), len(your_name))
    
    
    def __hash__(self):
#        raise NotImplementedError
        return hash(self.name)
        
   
    def __eq__(self, other):
        if not (type(self) == type(other)):
            return False
        return self.name == other.name

Ancestors

Methods

def distance_from(self, other)
Expand source code
def distance_from(self, other):
    my_name, your_name = self.name, other.name
    
    if max(len(my_name), len(your_name)) == 0:
        return 0.
    
    d = Levenshtein.distance(my_name, your_name)
    
    return d/max(len(my_name), len(your_name))