Module conversationkg.conversations.emails

Expand source code
import warnings
import json
import re

import datetime
from dateutil.parser import parse as du_parse

from email_reply_parser import EmailReplyParser
from email.utils import parseaddr

import spacy
nlp = spacy.load("en_core_web_md")

from rake_nltk import Rake
rake = Rake()

from .entities import Person, Link, Address, KeyWord, TopicInstance
#from .topics import TopicInstance
from .ledger import Universe

url_re = re.compile(r"http[s]?:\/\/(?:[a-zA-Z]|[0-9]|[$-_@.&+~]|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+")
address_pattern = re.compile(r'[\w\.-]+@[\w\.-]+')


# MERGING
def merge_reported_authors(author, from_, name, email):
    return name, email

def merge_reported_times(date, date_from_body, isosent):
    return date_from_body

def merge_reported_ids(id_, id_from_body):
    return id_


# PARSING
def parse_name_address(person_str):
    return parseaddr(person_str)

def parse_time_sent(s):
    try:
        dt = du_parse(s)
    except ValueError:
#        print("ValueError:", s)
        return datetime.datetime(1,1,1).replace(tzinfo=datetime.timezone.utc)
    
    if not dt.tzinfo:
            dt = dt.replace(tzinfo=datetime.timezone.utc)
    if dt.tzinfo.utcoffset(dt) > datetime.timedelta(hours=24) or\
            dt.tzinfo.utcoffset(dt) < -datetime.timedelta(hours=24):
        raise ValueError("Timezone outside of 24 hours: ", dt.tzinfo.utcoffset(dt))
    
    if dt is None: raise ValueError(s)
    return dt


class Email(metaclass=Universe):
    @classmethod
    def from_email_dict(cls, mail_dict, **unused_kwargs):
        return cls(EmailBody(mail_dict["body"]),
                   Person(*merge_reported_authors(mail_dict["author"],
                                             mail_dict["from"],
                                             mail_dict["name"],
                                             mail_dict["email"])),
                    Person(*parse_name_address(mail_dict["to"])),
                    parse_time_sent(merge_reported_times(mail_dict["date"],
                                        mail_dict["date_from_body"],
                                        mail_dict["isosent"])),
                    merge_reported_ids(mail_dict["id"],
                                     mail_dict["id_from_body"]),
                    mail_dict["inreplyto"],
                    mail_dict["subject"],
                    [], # attachments
                    []) # observers, i.e. persons in CC               
                    
        
    def __init__(self, body, sender, receiver, time, 
                 message_id, inreplyto_id, 
                 subject, observers, attachments, **unused_kwargs):
        self.message_id = message_id
        self.inreplyto_id = inreplyto_id
        
        self.body = body
        self.sender = sender
        self.receiver = receiver
        
        self.time = time
        self.subject = subject
        self.observers = observers
        
        self.attachments = attachments
        
        self.organisations = (self.sender.organisation,
                              self.receiver.organisation)
        
        self.first_observed_at = self.time
        
#        self.topic = None
        
        Universe.observe(body, self, "evidenced_by")
        Universe.observe(sender, self, "evidenced_by")
        Universe.observe(receiver, self, "evidenced_by")
        
        
    # for sorting
    def __lt__(self, other):
        if not isinstance(other, Email):
            raise TypeError(f"<Email> cannot be compared to {type(other)}!")
        
        if self.time < other.time:
            return True
        return False
    
    def __eq__(self, other):
        if not (type(self) == type(other) == Email):
            return False
        return hash(self) == hash(other)
    
    def __hash__(self):
        return hash((self.time, self.subject))    
    
    def __repr__(self):
        return f"Email from <{str(self.sender.address)}> to <{str(self.receiver.address)}>"
        
#        return f"Email({str(self.sender)}, {str(self.receiver)}, {self.time.date()})"
    
    def __str__(self):
        return repr(self)
        
    
    def to_json(self, dumps=False):
        d = {k:v for k, v in self.__dict__.items()}
        d["class"] = self.__class__.__name__
        d["time"] = str(self.time)
        d["body"] = self.body.to_json(dumps=False)
        d["sender"] = self.sender.to_json(dumps=False)
        d["receiver"] = self.receiver.to_json(dumps=False)
        del d["organisations"]
        d["observers"] = [e.to_json(dumps=False) for e in self.observers]
        
        if self.topic:
            d["topic"] = self.topic.to_json(dumps=False)
        
        if dumps: return json.dumps(d)
        return d
    
    
    @classmethod
    def from_json(cls, json_dict):
        body = EmailBody.from_json(json_dict["body"])
        sender = Person.from_json(json_dict["sender"])
        receiver = Person.from_json(json_dict["receiver"])
        time = parse_time_sent(json_dict["time"])
        message_id = json_dict["message_id"]
        inreplyto_id = json_dict["inreplyto_id"]
        subject = json_dict["subject"]
        observers = [Person.from_json(p_dict) for p_dict in json_dict["observers"]]
        
        email = cls(body, sender, receiver, time, message_id, inreplyto_id, subject, observers)
        
        if "topic" in json_dict:
            email.topic = TopicInstance.from_json(json_dict["topic"])
        
        return email
        

        
class EmailBody(str, metaclass=Universe):
    def __new__(cls, body_str, 
                 links=None, addresses=None, entities=None, **kwargs):        
        self = super().__new__(cls, body_str)
        return self
        
    
    def __init__(self, body_str, 
                 links=None, addresses=None, entities=[], **kwargs):
        
        self.body, self.signature, self.quoted = EmailBody.discern_quoted(body_str)

        self.normalised = self.normalise()
                
        self.links = links if links else tuple(self.extract_links())
        self.addresses = addresses if addresses else tuple(self.extract_addresses())
        self.code_snippets = []
        
        
        if entities:
            self.entities = entities
        else:
            self.entities = self.discover_entities()
        
        for entity in self.entities:
            Universe.observe(entity, self, "mentioned_in")
            
    
    def normalise(self):
        normalised_self = self.strip('"').strip("'").lower()
        return normalised_self
    
    def extract_links(self):
        for l in url_re.findall(self.normalised):
            link = Link(l)
            Universe.observe(link, self, "mentioned_in")
            yield link
    
    def extract_addresses(self):
        for addr in address_pattern.findall(self.normalised):
            address = Address(addr)
            Universe.observe(address, self, "mentioned_in")
            yield address
            
    def discover_entities(self):
        s = str(self.normalised)
        if len(s) > nlp.max_length:
            warnings.warn(f"Email body of {len(self)} characters exceeds spacy's maximum"
                            "of {nlp.max_length}! Clipping the body to the maximum length and proceeding.")
            
            s = s[:nlp.max_length]
        ents = nlp(s).ents
        ents = [(str(e).strip(), e.label_) for e in ents]
        return ents
    
    def discover_keywords(self):
        rake.extract_keywords_from_text(self.normalised)
        kws = rake.get_ranked_phrases_with_scores()
        return [KeyWord(phrase) for score, phrase in kws if score > 1.0]
    
    def to_json(self, dumps=False):
        d = {"class": self.__class__.__name__,
             "self": str(self), 
             "links": [l.to_json(dumps=False) for l in self.links],
             "addresses":[a for a in self.addresses],
             "entities":[(e, l) for e, l in self.entities]}  # e.to_json(dumps=False)
        
        if dumps: return json.dumps(d)
        return d
        
    
    @classmethod
    def from_json(cls, json_dict):
        body = json_dict["self"]
        links = [Link.from_json(l) for l in json_dict["links"]]
        addresses = [Address(a) for a in json_dict["addresses"]]
        entities = [(e_str, l) for e_str, l in json_dict["entities"]] 
        
        return cls(body, links, addresses, entities)
    
    
    @staticmethod
    def discern_quoted(body_text):
        parsed_email = EmailReplyParser.read(body_text)
        
        reply = ""
        quoted = ""
        signature = ""
        
        for fragment in parsed_email.fragments:
            if fragment.quoted:
                quoted += fragment.content
            elif fragment.signature:
                signature += fragment.content
            else:
                reply += fragment.content
                
        return (reply, signature, quoted)
        
        

Functions

def merge_reported_authors(author, from_, name, email)
Expand source code
def merge_reported_authors(author, from_, name, email):
    return name, email
def merge_reported_ids(id_, id_from_body)
Expand source code
def merge_reported_ids(id_, id_from_body):
    return id_
def merge_reported_times(date, date_from_body, isosent)
Expand source code
def merge_reported_times(date, date_from_body, isosent):
    return date_from_body
def parse_name_address(person_str)
Expand source code
def parse_name_address(person_str):
    return parseaddr(person_str)
def parse_time_sent(s)
Expand source code
def parse_time_sent(s):
    try:
        dt = du_parse(s)
    except ValueError:
#        print("ValueError:", s)
        return datetime.datetime(1,1,1).replace(tzinfo=datetime.timezone.utc)
    
    if not dt.tzinfo:
            dt = dt.replace(tzinfo=datetime.timezone.utc)
    if dt.tzinfo.utcoffset(dt) > datetime.timedelta(hours=24) or\
            dt.tzinfo.utcoffset(dt) < -datetime.timedelta(hours=24):
        raise ValueError("Timezone outside of 24 hours: ", dt.tzinfo.utcoffset(dt))
    
    if dt is None: raise ValueError(s)
    return dt

Classes

class Email (body, sender, receiver, time, message_id, inreplyto_id, subject, observers, attachments, **unused_kwargs)
Expand source code
class Email(metaclass=Universe):
    @classmethod
    def from_email_dict(cls, mail_dict, **unused_kwargs):
        return cls(EmailBody(mail_dict["body"]),
                   Person(*merge_reported_authors(mail_dict["author"],
                                             mail_dict["from"],
                                             mail_dict["name"],
                                             mail_dict["email"])),
                    Person(*parse_name_address(mail_dict["to"])),
                    parse_time_sent(merge_reported_times(mail_dict["date"],
                                        mail_dict["date_from_body"],
                                        mail_dict["isosent"])),
                    merge_reported_ids(mail_dict["id"],
                                     mail_dict["id_from_body"]),
                    mail_dict["inreplyto"],
                    mail_dict["subject"],
                    [], # attachments
                    []) # observers, i.e. persons in CC               
                    
        
    def __init__(self, body, sender, receiver, time, 
                 message_id, inreplyto_id, 
                 subject, observers, attachments, **unused_kwargs):
        self.message_id = message_id
        self.inreplyto_id = inreplyto_id
        
        self.body = body
        self.sender = sender
        self.receiver = receiver
        
        self.time = time
        self.subject = subject
        self.observers = observers
        
        self.attachments = attachments
        
        self.organisations = (self.sender.organisation,
                              self.receiver.organisation)
        
        self.first_observed_at = self.time
        
#        self.topic = None
        
        Universe.observe(body, self, "evidenced_by")
        Universe.observe(sender, self, "evidenced_by")
        Universe.observe(receiver, self, "evidenced_by")
        
        
    # for sorting
    def __lt__(self, other):
        if not isinstance(other, Email):
            raise TypeError(f"<Email> cannot be compared to {type(other)}!")
        
        if self.time < other.time:
            return True
        return False
    
    def __eq__(self, other):
        if not (type(self) == type(other) == Email):
            return False
        return hash(self) == hash(other)
    
    def __hash__(self):
        return hash((self.time, self.subject))    
    
    def __repr__(self):
        return f"Email from <{str(self.sender.address)}> to <{str(self.receiver.address)}>"
        
#        return f"Email({str(self.sender)}, {str(self.receiver)}, {self.time.date()})"
    
    def __str__(self):
        return repr(self)
        
    
    def to_json(self, dumps=False):
        d = {k:v for k, v in self.__dict__.items()}
        d["class"] = self.__class__.__name__
        d["time"] = str(self.time)
        d["body"] = self.body.to_json(dumps=False)
        d["sender"] = self.sender.to_json(dumps=False)
        d["receiver"] = self.receiver.to_json(dumps=False)
        del d["organisations"]
        d["observers"] = [e.to_json(dumps=False) for e in self.observers]
        
        if self.topic:
            d["topic"] = self.topic.to_json(dumps=False)
        
        if dumps: return json.dumps(d)
        return d
    
    
    @classmethod
    def from_json(cls, json_dict):
        body = EmailBody.from_json(json_dict["body"])
        sender = Person.from_json(json_dict["sender"])
        receiver = Person.from_json(json_dict["receiver"])
        time = parse_time_sent(json_dict["time"])
        message_id = json_dict["message_id"]
        inreplyto_id = json_dict["inreplyto_id"]
        subject = json_dict["subject"]
        observers = [Person.from_json(p_dict) for p_dict in json_dict["observers"]]
        
        email = cls(body, sender, receiver, time, message_id, inreplyto_id, subject, observers)
        
        if "topic" in json_dict:
            email.topic = TopicInstance.from_json(json_dict["topic"])
        
        return email

Static methods

def from_email_dict(mail_dict, **unused_kwargs)
Expand source code
@classmethod
def from_email_dict(cls, mail_dict, **unused_kwargs):
    return cls(EmailBody(mail_dict["body"]),
               Person(*merge_reported_authors(mail_dict["author"],
                                         mail_dict["from"],
                                         mail_dict["name"],
                                         mail_dict["email"])),
                Person(*parse_name_address(mail_dict["to"])),
                parse_time_sent(merge_reported_times(mail_dict["date"],
                                    mail_dict["date_from_body"],
                                    mail_dict["isosent"])),
                merge_reported_ids(mail_dict["id"],
                                 mail_dict["id_from_body"]),
                mail_dict["inreplyto"],
                mail_dict["subject"],
                [], # attachments
                []) # observers, i.e. persons in CC               
def from_json(json_dict)
Expand source code
@classmethod
def from_json(cls, json_dict):
    body = EmailBody.from_json(json_dict["body"])
    sender = Person.from_json(json_dict["sender"])
    receiver = Person.from_json(json_dict["receiver"])
    time = parse_time_sent(json_dict["time"])
    message_id = json_dict["message_id"]
    inreplyto_id = json_dict["inreplyto_id"]
    subject = json_dict["subject"]
    observers = [Person.from_json(p_dict) for p_dict in json_dict["observers"]]
    
    email = cls(body, sender, receiver, time, message_id, inreplyto_id, subject, observers)
    
    if "topic" in json_dict:
        email.topic = TopicInstance.from_json(json_dict["topic"])
    
    return email

Methods

def to_json(self, dumps=False)
Expand source code
def to_json(self, dumps=False):
    d = {k:v for k, v in self.__dict__.items()}
    d["class"] = self.__class__.__name__
    d["time"] = str(self.time)
    d["body"] = self.body.to_json(dumps=False)
    d["sender"] = self.sender.to_json(dumps=False)
    d["receiver"] = self.receiver.to_json(dumps=False)
    del d["organisations"]
    d["observers"] = [e.to_json(dumps=False) for e in self.observers]
    
    if self.topic:
        d["topic"] = self.topic.to_json(dumps=False)
    
    if dumps: return json.dumps(d)
    return d
class EmailBody (body_str, links=None, addresses=None, entities=[], **kwargs)

str(object='') -> str str(bytes_or_buffer[, encoding[, errors]]) -> str

Create a new string object from the given object. If encoding or errors is specified, then the object must expose a data buffer that will be decoded using the given encoding and error handler. Otherwise, returns the result of object.str() (if defined) or repr(object). encoding defaults to sys.getdefaultencoding(). errors defaults to 'strict'.

Expand source code
class EmailBody(str, metaclass=Universe):
    def __new__(cls, body_str, 
                 links=None, addresses=None, entities=None, **kwargs):        
        self = super().__new__(cls, body_str)
        return self
        
    
    def __init__(self, body_str, 
                 links=None, addresses=None, entities=[], **kwargs):
        
        self.body, self.signature, self.quoted = EmailBody.discern_quoted(body_str)

        self.normalised = self.normalise()
                
        self.links = links if links else tuple(self.extract_links())
        self.addresses = addresses if addresses else tuple(self.extract_addresses())
        self.code_snippets = []
        
        
        if entities:
            self.entities = entities
        else:
            self.entities = self.discover_entities()
        
        for entity in self.entities:
            Universe.observe(entity, self, "mentioned_in")
            
    
    def normalise(self):
        normalised_self = self.strip('"').strip("'").lower()
        return normalised_self
    
    def extract_links(self):
        for l in url_re.findall(self.normalised):
            link = Link(l)
            Universe.observe(link, self, "mentioned_in")
            yield link
    
    def extract_addresses(self):
        for addr in address_pattern.findall(self.normalised):
            address = Address(addr)
            Universe.observe(address, self, "mentioned_in")
            yield address
            
    def discover_entities(self):
        s = str(self.normalised)
        if len(s) > nlp.max_length:
            warnings.warn(f"Email body of {len(self)} characters exceeds spacy's maximum"
                            "of {nlp.max_length}! Clipping the body to the maximum length and proceeding.")
            
            s = s[:nlp.max_length]
        ents = nlp(s).ents
        ents = [(str(e).strip(), e.label_) for e in ents]
        return ents
    
    def discover_keywords(self):
        rake.extract_keywords_from_text(self.normalised)
        kws = rake.get_ranked_phrases_with_scores()
        return [KeyWord(phrase) for score, phrase in kws if score > 1.0]
    
    def to_json(self, dumps=False):
        d = {"class": self.__class__.__name__,
             "self": str(self), 
             "links": [l.to_json(dumps=False) for l in self.links],
             "addresses":[a for a in self.addresses],
             "entities":[(e, l) for e, l in self.entities]}  # e.to_json(dumps=False)
        
        if dumps: return json.dumps(d)
        return d
        
    
    @classmethod
    def from_json(cls, json_dict):
        body = json_dict["self"]
        links = [Link.from_json(l) for l in json_dict["links"]]
        addresses = [Address(a) for a in json_dict["addresses"]]
        entities = [(e_str, l) for e_str, l in json_dict["entities"]] 
        
        return cls(body, links, addresses, entities)
    
    
    @staticmethod
    def discern_quoted(body_text):
        parsed_email = EmailReplyParser.read(body_text)
        
        reply = ""
        quoted = ""
        signature = ""
        
        for fragment in parsed_email.fragments:
            if fragment.quoted:
                quoted += fragment.content
            elif fragment.signature:
                signature += fragment.content
            else:
                reply += fragment.content
                
        return (reply, signature, quoted)

Ancestors

  • builtins.str

Static methods

def discern_quoted(body_text)
Expand source code
@staticmethod
def discern_quoted(body_text):
    parsed_email = EmailReplyParser.read(body_text)
    
    reply = ""
    quoted = ""
    signature = ""
    
    for fragment in parsed_email.fragments:
        if fragment.quoted:
            quoted += fragment.content
        elif fragment.signature:
            signature += fragment.content
        else:
            reply += fragment.content
            
    return (reply, signature, quoted)
def from_json(json_dict)
Expand source code
@classmethod
def from_json(cls, json_dict):
    body = json_dict["self"]
    links = [Link.from_json(l) for l in json_dict["links"]]
    addresses = [Address(a) for a in json_dict["addresses"]]
    entities = [(e_str, l) for e_str, l in json_dict["entities"]] 
    
    return cls(body, links, addresses, entities)

Methods

def discover_entities(self)
Expand source code
def discover_entities(self):
    s = str(self.normalised)
    if len(s) > nlp.max_length:
        warnings.warn(f"Email body of {len(self)} characters exceeds spacy's maximum"
                        "of {nlp.max_length}! Clipping the body to the maximum length and proceeding.")
        
        s = s[:nlp.max_length]
    ents = nlp(s).ents
    ents = [(str(e).strip(), e.label_) for e in ents]
    return ents
def discover_keywords(self)
Expand source code
def discover_keywords(self):
    rake.extract_keywords_from_text(self.normalised)
    kws = rake.get_ranked_phrases_with_scores()
    return [KeyWord(phrase) for score, phrase in kws if score > 1.0]
def extract_addresses(self)
Expand source code
def extract_addresses(self):
    for addr in address_pattern.findall(self.normalised):
        address = Address(addr)
        Universe.observe(address, self, "mentioned_in")
        yield address
Expand source code
def extract_links(self):
    for l in url_re.findall(self.normalised):
        link = Link(l)
        Universe.observe(link, self, "mentioned_in")
        yield link
def normalise(self)
Expand source code
def normalise(self):
    normalised_self = self.strip('"').strip("'").lower()
    return normalised_self
def to_json(self, dumps=False)
Expand source code
def to_json(self, dumps=False):
    d = {"class": self.__class__.__name__,
         "self": str(self), 
         "links": [l.to_json(dumps=False) for l in self.links],
         "addresses":[a for a in self.addresses],
         "entities":[(e, l) for e, l in self.entities]}  # e.to_json(dumps=False)
    
    if dumps: return json.dumps(d)
    return d