Module conversationkg.conversations.factories
Expand source code
import numpy as np
import json
from tqdm import tqdm
from functools import reduce
# topic modelling
from sklearn.decomposition import LatentDirichletAllocation
# named entity recognition
import spacy
import stanza
# keyword extraction
from rake_nltk import Rake
#from .ledger import Universe
from .entities import Entity, StringEntity
from .entities import Topic, TopicInstance
from .entities import Person, Organisation
from .entities import KeyWord
from sklearn.feature_extraction.text import TfidfVectorizer as tfidf
from sklearn.feature_extraction.text import CountVectorizer as count
class FactoryChain:
"""
Not Implemented (yet). Idea is to wrap a sequence of factory classes into this FactoryChain
which runs the factories on a corpus.
Most important use: Some factories depend on the output of others (e.g. TopicFactory depends on VectorFactory),
so the FactoryChain could help ensure that factories are executed in the correct order and produce meaningful
error messages if this is not the case (rather than pure AttributeErrors).
Other uses: FactoryChain could keep track of the factories that have been applied, gather statistics,
help parallelisation, preform preparation and postprocessing tasks, etc.
"""
def __init__(*classes, **keyword_args):
"""
FactoryChain is initialised with a list of factory classes and keyword arguments, where
each key is expected to be the name of a factory class and the value is a dict of parameters
for that factory, e.g. CountVectoriser=dict(max_df=0.7, min_df=5).
"""
# no_dependencies = {c for c in classes if getattr(c, "depends_on", None) is None}
for c in classes:
if c.depends_on not in classes:
raise ValueError(f"Factory {c} depends on {c.depends_on} but the latter"
f"is not in the given chain of classes!")
from joblib import Parallel, delayed
class Factory:
"""
The base class for any specialised factory family, such as VectorFactory, TopicFactory, etc.
Use this class as base class to create a new factory family; perhaps for instance a StatsFactory
for a family of factories which observe statistics about emails and conversations and attach those to both.
TODO:
- default __init__? -> perhaps for checks if subclassing factory family is well-defined?
- default __call__ function with basic functionality (that factories override if necessary)
- parallelisation (including parameters)
"""
def __init__(self):
raise NotImplementedError
def process_conversation(self, conversation):
raise NotImplementedError
def process_email(self, email):
raise NotImplementedError
def parallelise_call(self, conversation_iter, n_jobs=-1, processing_function=None):
if processing_function is None:
processing_function = self.process_conversation
delayed_func = delayed(processing_function)
# Parallel(n_jobs=n_jobs, require='sharedmem')
return Parallel(n_jobs=n_jobs)(delayed_func(conv) for conv in conversation_iter)
def __call__(self, corpus, parallel=True, n_jobs=-1):
progressbar = tqdm(corpus,
desc=f"{self.__class__.__name__} iterating conversations"
f" {'in parallel' if parallel else ''}")
if parallel:
return self.parallelise_call(progressbar, n_jobs)
else:
processed_conversations = list(map(self.process_conversation, progressbar))
return processed_conversations
@staticmethod
def combine_processors(*processors):
"""
Convenience function which combines a list of processors (i.e. functions) into a single one, equivalent to
mathematical function composition. In terms of call order, the function at the first index of the list
is called last, while the function at the last index is called first (**unlike** the convention in
function composition).
May be unsafe to use if the given functions take and return different numbers and types of arguments.
Usage example:
to_lower = str.lower
remove_outer_whitespace = str.strip
remove_comma = lambda s: s.replace(",", "")
str_normaliser = Factory.combine_processors([remove_comma, remove_outer_whitespace, to_lower])
"""
return reduce(lambda f, g: lambda x: f(g(x)), processors, lambda x: x)
# return reduce(lambda f, g: lambda *x, **y: f(g(*x, **y)), processors, lambda *x, **y: (x, y))
class VectorFactory(Factory):
def __init__(self, corpus, vectoriser_algorithm, add_matrix_to_corpus=True, **vectoriser_kwargs):
default_args = dict(max_df=0.7, min_df=5)
default_args.update(vectoriser_kwargs)
self.fitted_corpus = corpus
self.add_matrix = add_matrix_to_corpus
self.vectoriser = vectoriser_algorithm(**default_args)
self.matrix = self.vectoriser.fit_transform([
email.body.normalised for email in corpus.iter_emails()
])
self.conversation_matrix = self.vectoriser.transform(
[conv.get_email_bodies(attr="normalised", join_str=" ") for conv in corpus]
)
self.vocabulary = self.vectoriser.get_feature_names()
def process_conversation(self, conversation):
conversation.vectorised = next(self.conv_iter)
for email in conversation:
email_vector = self.process_email(email)
return conversation.vectorised
def process_email(self, email):
email.body.vectorised = next(self.email_iter)
return email.body.vectorised
def __call__(self, corpus=None, **kwargs):
if not corpus:
corpus = self.fitted_corpus
if self.add_matrix:
corpus.vectorised_vocabulary = self.vocabulary
corpus.vectorised = self.matrix
corpus.conversations_vectorised = self.conversation_matrix
self.conv_iter = iter(self.conversation_matrix)
self.email_iter = iter(self.matrix)
return super().__call__(corpus, **kwargs)
# if not corpus:
# corpus = self.fitted_corpus
# conv_matrix = self.conversation_matrix
# email_ind = 0
# for i, conv in enumerate(corpus):
# conv.vector = conv_matrix[i]
# for email in conv:
# email.body.vectorised = self.matrix[email_ind]
# email_ind += 1
#
# if self.add_matrix:
# corpus.vectorised_vocabulary = self.vocabulary
# corpus.vectorised = self.matrix
# corpus.conversations_vectorised = self.conversation_matrix
class CountVectorizer(VectorFactory):
"""
Wrapper for scikit-learn's equally-named [CountVectorizer](https://scikit-learn.org/stable/modules/generated/sklearn.feature_extraction.text.CountVectorizer.html).
"""
def __init__(self, corpus, **vectoriser_kwargs):
super().__init__(corpus, count, **vectoriser_kwargs)
class TfidfVectorizer(VectorFactory):
"""
Wrapper for scikit-learn's equally-named [TfidfVectorizer](https://scikit-learn.org/stable/modules/generated/sklearn.feature_extraction.text.TfidfVectorizer.html).
"""
def __init__(self, corpus, **vectoriser_kwargs):
super().__init__(corpus, tfidf, **vectoriser_kwargs)
class TopicFactory(Factory):
depends_on = VectorFactory
def __init__(self, corpus, n_topics):
if not hasattr(self, "word_distribution"):
raise AttributeError("TopicFactory needs a word distribution to be well-defined!")
if not hasattr(self, "predict") or not callable(self.predict):
raise AttributeError("TopicFactory needs a prediction function to be well-defined!")
self.fitted_corpus = corpus
self.word_distribution = self.word_distribution/self.word_distribution.sum(axis=1)[:, np.newaxis]
self.topics = [Topic(i, dist, corpus.vectorised_vocabulary)
for i, dist in enumerate(self.word_distribution)]
def get_topic(self, topic_dist):
max_prob_ind = topic_dist.argmax()
return TopicInstance(self.topics[max_prob_ind], topic_dist[max_prob_ind])
def process_conversation(self, conversation):
conversation.topic = self.get_topic(next(self.conv_iter))
for email in conversation:
email_topic = self.process_email(email)
return conversation.topic
def process_email(self, email):
email.topic = self.get_topic(next(self.email_iter))
return email.topic
def __call__(self, corpus=None, **kwargs):
if not corpus:
corpus = self.fitted_corpus
self.conv_iter = iter(self.predict(corpus.conversations_vectorised))
self.email_iter = iter(self.predict(corpus.vectorised))
return super().__call__(corpus, **kwargs)
# email_ind = 0
# for i, conv in tqdm(enumerate(corpus),
# desc=f"{self.__class__.__name__} iterating conversations",
# total=len(corpus)):
# convo_topic_ind = convo_topic_dists[i].argmax()
# conv.topic = TopicInstance(self.topics[convo_topic_ind],
# convo_topic_dists[i][convo_topic_ind])
#
# for email in conv:
# email_topic_ind = email_topic_dists[email_ind].argmax()
# email.topic = TopicInstance(self.topics[email_topic_ind],
# email_topic_dists[email_ind][email_topic_ind])
# email_ind += 1
class SKLearnLDA(TopicFactory):
def __init__(self, corpus, n_topics, **kwargs):
self.model_args = dict(n_components=n_topics, max_iter=20, learning_method='batch',
random_state=0, verbose=1, n_jobs=-1)
self.model_args.update(learning_offset=self.model_args["max_iter"]//100*10)
self.model_args.update(dict(evaluate_every=self.model_args["max_iter"]//10))
self.model_args.update(kwargs)
model = LatentDirichletAllocation(**self.model_args)
model.fit(corpus.vectorised)
self.word_distribution = model.components_
self.predict = model.transform
super().__init__(corpus, n_topics)
def determine_n_components(self, ns_to_search, corpus=None):
if not corpus:
corpus = self.corpus_fitted
models = {}
for n in ns_to_search:
cur_args = {**self.model_args, **dict(n_components=n)}
cur_model = LatentDirichletAllocation(**cur_args).fit(corpus.vectorised)
models[n] = cur_model
print("Perplexities:", [(n, models[n].bound_) for n in ns_to_search])
return models
class GensimLDA(TopicFactory):
def __init__(self, email_corpus, n_topics, **kwargs):
raise NotImplementedError("LDA with Gensim not implemented yet, please feel free to add!\n"
"Make sure to add attrs word_distribution and predict(), "
"as required by the parent TopicFactory.")
class NamedEntityFactory(Factory):
def __init__(self, corpus, preprocessors=[], postprocessors=[]):
self.product_type = Entity
self.product_name = "entities"
self.pre = self.combine_processors(*preprocessors)
self.post = self.combine_processors(self.string_to_class, *postprocessors)
# super().__init__(self, corpus, preprocessors, postprocessors)
@staticmethod
def string_to_class(entity_list):
label_class_map = {"PERSON": lambda name: Person(name, ""),
"ORG": lambda name: Organisation(name, "")}
return [(label_class_map[l](e)) if l in label_class_map else NamedEntityFactory.entity_from_NER_label(e, l)
for e, l in entity_list]
@staticmethod
def entity_from_NER_label(entity_string, label):
cls = type(label.title(), (StringEntity, ), dict(class_dynamically_created=True))
return cls(entity_string)
def process_conversation(self, conversation):
conversation.entities = []
for email in conversation:
email_entities = self.process_email(email)
conversation.entities.append(email_entities)
return conversation.entities
def process_email(self, email):
email.entities = list(filter(None, self.post(self.get_entities_with_labels(self.pre(email.body.normalised)))))
return email.entities
# def __call__(self, corpus):
# for conv in tqdm(corpus,
# desc=f"{self.__class__.__name__} iterating conversations"):
# all_entities = []
# for email in conv:
# email.entities = list(filter(None,
# self.post(self.get_entities_with_labels(self.pre(email.body.normalised)))
# ))
# all_entities.extend(email.entities)
# conv.entities = all_entities
class SpaCyNER(NamedEntityFactory):
def __init__(self, preprocessors=[], postprocessors=[]):
self.nlp = spacy.load("en_core_web_md")
super().__init__(preprocessors, postprocessors)
def get_entities(self, text):
return [str(e) for e in self.nlp(text).ents]
def get_entities_with_labels(self, text):
return [(str(e), e.label_) for e in self.nlp(text).ents]
class StanzaNER(NamedEntityFactory):
def __init__(self, preprocessors=[], postprocessors=[]):
self.nlp = stanza.Pipeline('en', processors="tokenize, mwt, ner")
super().__init__(preprocessors, postprocessors)
def get_entities(self, text):
return [d.text for d in self.nlp(text).ents]
def get_entities_with_labels(self, text):
return [(d.text, d.type) for d in self.nlp(text).ents]
class KeyWordFactory(Factory):
def __init__(self, preprocessors=[], postprocessors=[]):
self.product_type = KeyWord
self.product_name = "keywords"
self.pre = self.combine_processors(*preprocessors)
self.post = self.combine_processors(self.output_to_class, *postprocessors)
# # set to False if process_conversation calls process_email, to True otherwise
# self.process_emails_separately = False
# super().__init__(self, corpus=None, preprocessors, postprocessors)
def process_conversation(self, conversation):
conversation.keywords = []
for email in conversation:
email_keywords = self.process_email(email)
conversation.keywords.append(email_keywords)
return conversation.keywords
def process_email(self, email):
email.keywords = list(filter(None, self.post(self.get_keywords(self.pre(email.body.normalised)))))
return email.keywords
# def __call__(self, corpus):
# for conv in tqdm(corpus,
# desc=f"{self.__class__.__name__} iterating conversations"):
# all_keywords = []
# for email in conv:
# email.keywords = list(filter(None,
# self.post(self.get_keywords(self.pre(email.body.normalised)))
# ))
# all_keywords.extend(email.keywords)
# conv.keywords = all_keywords
@staticmethod
def output_to_class(keyword_list):
return [KeyWord(s) for s in keyword_list]
class RakeKeyWordExtractor(KeyWordFactory):
def __init__(self, preprocessors=[], postprocessors=[]):
self.rake = Rake()
postprocessors = [self.remove_low_scores, *postprocessors] # self.combine_processors(self.remove_low_scores, *postprocessors)
super().__init__(preprocessors, postprocessors)
@staticmethod
def remove_low_scores(keyword_list, score=1.0):
return [phrase for score, phrase in keyword_list if score > 1.0]
def get_keywords(self, text):
self.rake.extract_keywords_from_text(text)
return self.rake.get_ranked_phrases_with_scores()
Classes
class CountVectorizer (corpus, **vectoriser_kwargs)
-
Wrapper for scikit-learn's equally-named CountVectorizer.
Expand source code
class CountVectorizer(VectorFactory): """ Wrapper for scikit-learn's equally-named [CountVectorizer](https://scikit-learn.org/stable/modules/generated/sklearn.feature_extraction.text.CountVectorizer.html). """ def __init__(self, corpus, **vectoriser_kwargs): super().__init__(corpus, count, **vectoriser_kwargs)
Ancestors
Inherited members
class Factory
-
The base class for any specialised factory family, such as VectorFactory, TopicFactory, etc. Use this class as base class to create a new factory family; perhaps for instance a StatsFactory for a family of factories which observe statistics about emails and conversations and attach those to both.
Todo
- default init? -> perhaps for checks if subclassing factory family is well-defined?
- default call function with basic functionality (that factories override if necessary)
- parallelisation (including parameters)
Expand source code
class Factory: """ The base class for any specialised factory family, such as VectorFactory, TopicFactory, etc. Use this class as base class to create a new factory family; perhaps for instance a StatsFactory for a family of factories which observe statistics about emails and conversations and attach those to both. TODO: - default __init__? -> perhaps for checks if subclassing factory family is well-defined? - default __call__ function with basic functionality (that factories override if necessary) - parallelisation (including parameters) """ def __init__(self): raise NotImplementedError def process_conversation(self, conversation): raise NotImplementedError def process_email(self, email): raise NotImplementedError def parallelise_call(self, conversation_iter, n_jobs=-1, processing_function=None): if processing_function is None: processing_function = self.process_conversation delayed_func = delayed(processing_function) # Parallel(n_jobs=n_jobs, require='sharedmem') return Parallel(n_jobs=n_jobs)(delayed_func(conv) for conv in conversation_iter) def __call__(self, corpus, parallel=True, n_jobs=-1): progressbar = tqdm(corpus, desc=f"{self.__class__.__name__} iterating conversations" f" {'in parallel' if parallel else ''}") if parallel: return self.parallelise_call(progressbar, n_jobs) else: processed_conversations = list(map(self.process_conversation, progressbar)) return processed_conversations @staticmethod def combine_processors(*processors): """ Convenience function which combines a list of processors (i.e. functions) into a single one, equivalent to mathematical function composition. In terms of call order, the function at the first index of the list is called last, while the function at the last index is called first (**unlike** the convention in function composition). May be unsafe to use if the given functions take and return different numbers and types of arguments. Usage example: to_lower = str.lower remove_outer_whitespace = str.strip remove_comma = lambda s: s.replace(",", "") str_normaliser = Factory.combine_processors([remove_comma, remove_outer_whitespace, to_lower]) """ return reduce(lambda f, g: lambda x: f(g(x)), processors, lambda x: x)
Subclasses
Static methods
def combine_processors(*processors)
-
Convenience function which combines a list of processors (i.e. functions) into a single one, equivalent to mathematical function composition. In terms of call order, the function at the first index of the list is called last, while the function at the last index is called first (unlike the convention in function composition). May be unsafe to use if the given functions take and return different numbers and types of arguments.
Usage example: to_lower = str.lower remove_outer_whitespace = str.strip remove_comma = lambda s: s.replace(",", "") str_normaliser = Factory.combine_processors([remove_comma, remove_outer_whitespace, to_lower])
Expand source code
@staticmethod def combine_processors(*processors): """ Convenience function which combines a list of processors (i.e. functions) into a single one, equivalent to mathematical function composition. In terms of call order, the function at the first index of the list is called last, while the function at the last index is called first (**unlike** the convention in function composition). May be unsafe to use if the given functions take and return different numbers and types of arguments. Usage example: to_lower = str.lower remove_outer_whitespace = str.strip remove_comma = lambda s: s.replace(",", "") str_normaliser = Factory.combine_processors([remove_comma, remove_outer_whitespace, to_lower]) """ return reduce(lambda f, g: lambda x: f(g(x)), processors, lambda x: x)
Methods
def parallelise_call(self, conversation_iter, n_jobs=-1, processing_function=None)
-
Expand source code
def parallelise_call(self, conversation_iter, n_jobs=-1, processing_function=None): if processing_function is None: processing_function = self.process_conversation delayed_func = delayed(processing_function) # Parallel(n_jobs=n_jobs, require='sharedmem') return Parallel(n_jobs=n_jobs)(delayed_func(conv) for conv in conversation_iter)
def process_conversation(self, conversation)
-
Expand source code
def process_conversation(self, conversation): raise NotImplementedError
def process_email(self, email)
-
Expand source code
def process_email(self, email): raise NotImplementedError
class FactoryChain (**keyword_args)
-
Not Implemented (yet). Idea is to wrap a sequence of factory classes into this FactoryChain which runs the factories on a corpus. Most important use: Some factories depend on the output of others (e.g. TopicFactory depends on VectorFactory), so the FactoryChain could help ensure that factories are executed in the correct order and produce meaningful error messages if this is not the case (rather than pure AttributeErrors). Other uses: FactoryChain could keep track of the factories that have been applied, gather statistics, help parallelisation, preform preparation and postprocessing tasks, etc.
FactoryChain is initialised with a list of factory classes and keyword arguments, where each key is expected to be the name of a factory class and the value is a dict of parameters for that factory, e.g. CountVectoriser=dict(max_df=0.7, min_df=5).
Expand source code
class FactoryChain: """ Not Implemented (yet). Idea is to wrap a sequence of factory classes into this FactoryChain which runs the factories on a corpus. Most important use: Some factories depend on the output of others (e.g. TopicFactory depends on VectorFactory), so the FactoryChain could help ensure that factories are executed in the correct order and produce meaningful error messages if this is not the case (rather than pure AttributeErrors). Other uses: FactoryChain could keep track of the factories that have been applied, gather statistics, help parallelisation, preform preparation and postprocessing tasks, etc. """ def __init__(*classes, **keyword_args): """ FactoryChain is initialised with a list of factory classes and keyword arguments, where each key is expected to be the name of a factory class and the value is a dict of parameters for that factory, e.g. CountVectoriser=dict(max_df=0.7, min_df=5). """ # no_dependencies = {c for c in classes if getattr(c, "depends_on", None) is None} for c in classes: if c.depends_on not in classes: raise ValueError(f"Factory {c} depends on {c.depends_on} but the latter" f"is not in the given chain of classes!")
class GensimLDA (email_corpus, n_topics, **kwargs)
-
The base class for any specialised factory family, such as VectorFactory, TopicFactory, etc. Use this class as base class to create a new factory family; perhaps for instance a StatsFactory for a family of factories which observe statistics about emails and conversations and attach those to both.
Todo
- default init? -> perhaps for checks if subclassing factory family is well-defined?
- default call function with basic functionality (that factories override if necessary)
- parallelisation (including parameters)
Expand source code
class GensimLDA(TopicFactory): def __init__(self, email_corpus, n_topics, **kwargs): raise NotImplementedError("LDA with Gensim not implemented yet, please feel free to add!\n" "Make sure to add attrs word_distribution and predict(), " "as required by the parent TopicFactory.")
Ancestors
Inherited members
class KeyWordFactory (preprocessors=[], postprocessors=[])
-
The base class for any specialised factory family, such as VectorFactory, TopicFactory, etc. Use this class as base class to create a new factory family; perhaps for instance a StatsFactory for a family of factories which observe statistics about emails and conversations and attach those to both.
Todo
- default init? -> perhaps for checks if subclassing factory family is well-defined?
- default call function with basic functionality (that factories override if necessary)
- parallelisation (including parameters)
Expand source code
class KeyWordFactory(Factory): def __init__(self, preprocessors=[], postprocessors=[]): self.product_type = KeyWord self.product_name = "keywords" self.pre = self.combine_processors(*preprocessors) self.post = self.combine_processors(self.output_to_class, *postprocessors) # # set to False if process_conversation calls process_email, to True otherwise # self.process_emails_separately = False # super().__init__(self, corpus=None, preprocessors, postprocessors) def process_conversation(self, conversation): conversation.keywords = [] for email in conversation: email_keywords = self.process_email(email) conversation.keywords.append(email_keywords) return conversation.keywords def process_email(self, email): email.keywords = list(filter(None, self.post(self.get_keywords(self.pre(email.body.normalised))))) return email.keywords # def __call__(self, corpus): # for conv in tqdm(corpus, # desc=f"{self.__class__.__name__} iterating conversations"): # all_keywords = [] # for email in conv: # email.keywords = list(filter(None, # self.post(self.get_keywords(self.pre(email.body.normalised))) # )) # all_keywords.extend(email.keywords) # conv.keywords = all_keywords @staticmethod def output_to_class(keyword_list): return [KeyWord(s) for s in keyword_list]
Ancestors
Subclasses
Static methods
def output_to_class(keyword_list)
-
Expand source code
@staticmethod def output_to_class(keyword_list): return [KeyWord(s) for s in keyword_list]
Methods
def process_conversation(self, conversation)
-
Expand source code
def process_conversation(self, conversation): conversation.keywords = [] for email in conversation: email_keywords = self.process_email(email) conversation.keywords.append(email_keywords) return conversation.keywords
def process_email(self, email)
-
Expand source code
def process_email(self, email): email.keywords = list(filter(None, self.post(self.get_keywords(self.pre(email.body.normalised))))) return email.keywords
Inherited members
class NamedEntityFactory (corpus, preprocessors=[], postprocessors=[])
-
The base class for any specialised factory family, such as VectorFactory, TopicFactory, etc. Use this class as base class to create a new factory family; perhaps for instance a StatsFactory for a family of factories which observe statistics about emails and conversations and attach those to both.
Todo
- default init? -> perhaps for checks if subclassing factory family is well-defined?
- default call function with basic functionality (that factories override if necessary)
- parallelisation (including parameters)
Expand source code
class NamedEntityFactory(Factory): def __init__(self, corpus, preprocessors=[], postprocessors=[]): self.product_type = Entity self.product_name = "entities" self.pre = self.combine_processors(*preprocessors) self.post = self.combine_processors(self.string_to_class, *postprocessors) # super().__init__(self, corpus, preprocessors, postprocessors) @staticmethod def string_to_class(entity_list): label_class_map = {"PERSON": lambda name: Person(name, ""), "ORG": lambda name: Organisation(name, "")} return [(label_class_map[l](e)) if l in label_class_map else NamedEntityFactory.entity_from_NER_label(e, l) for e, l in entity_list] @staticmethod def entity_from_NER_label(entity_string, label): cls = type(label.title(), (StringEntity, ), dict(class_dynamically_created=True)) return cls(entity_string) def process_conversation(self, conversation): conversation.entities = [] for email in conversation: email_entities = self.process_email(email) conversation.entities.append(email_entities) return conversation.entities def process_email(self, email): email.entities = list(filter(None, self.post(self.get_entities_with_labels(self.pre(email.body.normalised))))) return email.entities
Ancestors
Subclasses
Static methods
def entity_from_NER_label(entity_string, label)
-
Expand source code
@staticmethod def entity_from_NER_label(entity_string, label): cls = type(label.title(), (StringEntity, ), dict(class_dynamically_created=True)) return cls(entity_string)
def string_to_class(entity_list)
-
Expand source code
@staticmethod def string_to_class(entity_list): label_class_map = {"PERSON": lambda name: Person(name, ""), "ORG": lambda name: Organisation(name, "")} return [(label_class_map[l](e)) if l in label_class_map else NamedEntityFactory.entity_from_NER_label(e, l) for e, l in entity_list]
Methods
def process_conversation(self, conversation)
-
Expand source code
def process_conversation(self, conversation): conversation.entities = [] for email in conversation: email_entities = self.process_email(email) conversation.entities.append(email_entities) return conversation.entities
def process_email(self, email)
-
Expand source code
def process_email(self, email): email.entities = list(filter(None, self.post(self.get_entities_with_labels(self.pre(email.body.normalised))))) return email.entities
Inherited members
class RakeKeyWordExtractor (preprocessors=[], postprocessors=[])
-
The base class for any specialised factory family, such as VectorFactory, TopicFactory, etc. Use this class as base class to create a new factory family; perhaps for instance a StatsFactory for a family of factories which observe statistics about emails and conversations and attach those to both.
Todo
- default init? -> perhaps for checks if subclassing factory family is well-defined?
- default call function with basic functionality (that factories override if necessary)
- parallelisation (including parameters)
Expand source code
class RakeKeyWordExtractor(KeyWordFactory): def __init__(self, preprocessors=[], postprocessors=[]): self.rake = Rake() postprocessors = [self.remove_low_scores, *postprocessors] # self.combine_processors(self.remove_low_scores, *postprocessors) super().__init__(preprocessors, postprocessors) @staticmethod def remove_low_scores(keyword_list, score=1.0): return [phrase for score, phrase in keyword_list if score > 1.0] def get_keywords(self, text): self.rake.extract_keywords_from_text(text) return self.rake.get_ranked_phrases_with_scores()
Ancestors
Static methods
def remove_low_scores(keyword_list, score=1.0)
-
Expand source code
@staticmethod def remove_low_scores(keyword_list, score=1.0): return [phrase for score, phrase in keyword_list if score > 1.0]
Methods
def get_keywords(self, text)
-
Expand source code
def get_keywords(self, text): self.rake.extract_keywords_from_text(text) return self.rake.get_ranked_phrases_with_scores()
Inherited members
class SKLearnLDA (corpus, n_topics, **kwargs)
-
The base class for any specialised factory family, such as VectorFactory, TopicFactory, etc. Use this class as base class to create a new factory family; perhaps for instance a StatsFactory for a family of factories which observe statistics about emails and conversations and attach those to both.
Todo
- default init? -> perhaps for checks if subclassing factory family is well-defined?
- default call function with basic functionality (that factories override if necessary)
- parallelisation (including parameters)
Expand source code
class SKLearnLDA(TopicFactory): def __init__(self, corpus, n_topics, **kwargs): self.model_args = dict(n_components=n_topics, max_iter=20, learning_method='batch', random_state=0, verbose=1, n_jobs=-1) self.model_args.update(learning_offset=self.model_args["max_iter"]//100*10) self.model_args.update(dict(evaluate_every=self.model_args["max_iter"]//10)) self.model_args.update(kwargs) model = LatentDirichletAllocation(**self.model_args) model.fit(corpus.vectorised) self.word_distribution = model.components_ self.predict = model.transform super().__init__(corpus, n_topics) def determine_n_components(self, ns_to_search, corpus=None): if not corpus: corpus = self.corpus_fitted models = {} for n in ns_to_search: cur_args = {**self.model_args, **dict(n_components=n)} cur_model = LatentDirichletAllocation(**cur_args).fit(corpus.vectorised) models[n] = cur_model print("Perplexities:", [(n, models[n].bound_) for n in ns_to_search]) return models
Ancestors
Methods
def determine_n_components(self, ns_to_search, corpus=None)
-
Expand source code
def determine_n_components(self, ns_to_search, corpus=None): if not corpus: corpus = self.corpus_fitted models = {} for n in ns_to_search: cur_args = {**self.model_args, **dict(n_components=n)} cur_model = LatentDirichletAllocation(**cur_args).fit(corpus.vectorised) models[n] = cur_model print("Perplexities:", [(n, models[n].bound_) for n in ns_to_search]) return models
Inherited members
class SpaCyNER (preprocessors=[], postprocessors=[])
-
The base class for any specialised factory family, such as VectorFactory, TopicFactory, etc. Use this class as base class to create a new factory family; perhaps for instance a StatsFactory for a family of factories which observe statistics about emails and conversations and attach those to both.
Todo
- default init? -> perhaps for checks if subclassing factory family is well-defined?
- default call function with basic functionality (that factories override if necessary)
- parallelisation (including parameters)
Expand source code
class SpaCyNER(NamedEntityFactory): def __init__(self, preprocessors=[], postprocessors=[]): self.nlp = spacy.load("en_core_web_md") super().__init__(preprocessors, postprocessors) def get_entities(self, text): return [str(e) for e in self.nlp(text).ents] def get_entities_with_labels(self, text): return [(str(e), e.label_) for e in self.nlp(text).ents]
Ancestors
Methods
def get_entities(self, text)
-
Expand source code
def get_entities(self, text): return [str(e) for e in self.nlp(text).ents]
def get_entities_with_labels(self, text)
-
Expand source code
def get_entities_with_labels(self, text): return [(str(e), e.label_) for e in self.nlp(text).ents]
Inherited members
class StanzaNER (preprocessors=[], postprocessors=[])
-
The base class for any specialised factory family, such as VectorFactory, TopicFactory, etc. Use this class as base class to create a new factory family; perhaps for instance a StatsFactory for a family of factories which observe statistics about emails and conversations and attach those to both.
Todo
- default init? -> perhaps for checks if subclassing factory family is well-defined?
- default call function with basic functionality (that factories override if necessary)
- parallelisation (including parameters)
Expand source code
class StanzaNER(NamedEntityFactory): def __init__(self, preprocessors=[], postprocessors=[]): self.nlp = stanza.Pipeline('en', processors="tokenize, mwt, ner") super().__init__(preprocessors, postprocessors) def get_entities(self, text): return [d.text for d in self.nlp(text).ents] def get_entities_with_labels(self, text): return [(d.text, d.type) for d in self.nlp(text).ents]
Ancestors
Methods
def get_entities(self, text)
-
Expand source code
def get_entities(self, text): return [d.text for d in self.nlp(text).ents]
def get_entities_with_labels(self, text)
-
Expand source code
def get_entities_with_labels(self, text): return [(d.text, d.type) for d in self.nlp(text).ents]
Inherited members
class TfidfVectorizer (corpus, **vectoriser_kwargs)
-
Wrapper for scikit-learn's equally-named TfidfVectorizer.
Expand source code
class TfidfVectorizer(VectorFactory): """ Wrapper for scikit-learn's equally-named [TfidfVectorizer](https://scikit-learn.org/stable/modules/generated/sklearn.feature_extraction.text.TfidfVectorizer.html). """ def __init__(self, corpus, **vectoriser_kwargs): super().__init__(corpus, tfidf, **vectoriser_kwargs)
Ancestors
Inherited members
class TopicFactory (corpus, n_topics)
-
The base class for any specialised factory family, such as VectorFactory, TopicFactory, etc. Use this class as base class to create a new factory family; perhaps for instance a StatsFactory for a family of factories which observe statistics about emails and conversations and attach those to both.
Todo
- default init? -> perhaps for checks if subclassing factory family is well-defined?
- default call function with basic functionality (that factories override if necessary)
- parallelisation (including parameters)
Expand source code
class TopicFactory(Factory): depends_on = VectorFactory def __init__(self, corpus, n_topics): if not hasattr(self, "word_distribution"): raise AttributeError("TopicFactory needs a word distribution to be well-defined!") if not hasattr(self, "predict") or not callable(self.predict): raise AttributeError("TopicFactory needs a prediction function to be well-defined!") self.fitted_corpus = corpus self.word_distribution = self.word_distribution/self.word_distribution.sum(axis=1)[:, np.newaxis] self.topics = [Topic(i, dist, corpus.vectorised_vocabulary) for i, dist in enumerate(self.word_distribution)] def get_topic(self, topic_dist): max_prob_ind = topic_dist.argmax() return TopicInstance(self.topics[max_prob_ind], topic_dist[max_prob_ind]) def process_conversation(self, conversation): conversation.topic = self.get_topic(next(self.conv_iter)) for email in conversation: email_topic = self.process_email(email) return conversation.topic def process_email(self, email): email.topic = self.get_topic(next(self.email_iter)) return email.topic def __call__(self, corpus=None, **kwargs): if not corpus: corpus = self.fitted_corpus self.conv_iter = iter(self.predict(corpus.conversations_vectorised)) self.email_iter = iter(self.predict(corpus.vectorised)) return super().__call__(corpus, **kwargs)
Ancestors
Subclasses
Class variables
var depends_on
-
The base class for any specialised factory family, such as VectorFactory, TopicFactory, etc. Use this class as base class to create a new factory family; perhaps for instance a StatsFactory for a family of factories which observe statistics about emails and conversations and attach those to both.
Todo
- default init? -> perhaps for checks if subclassing factory family is well-defined?
- default call function with basic functionality (that factories override if necessary)
- parallelisation (including parameters)
Methods
def get_topic(self, topic_dist)
-
Expand source code
def get_topic(self, topic_dist): max_prob_ind = topic_dist.argmax() return TopicInstance(self.topics[max_prob_ind], topic_dist[max_prob_ind])
def process_conversation(self, conversation)
-
Expand source code
def process_conversation(self, conversation): conversation.topic = self.get_topic(next(self.conv_iter)) for email in conversation: email_topic = self.process_email(email) return conversation.topic
def process_email(self, email)
-
Expand source code
def process_email(self, email): email.topic = self.get_topic(next(self.email_iter)) return email.topic
Inherited members
class VectorFactory (corpus, vectoriser_algorithm, add_matrix_to_corpus=True, **vectoriser_kwargs)
-
The base class for any specialised factory family, such as VectorFactory, TopicFactory, etc. Use this class as base class to create a new factory family; perhaps for instance a StatsFactory for a family of factories which observe statistics about emails and conversations and attach those to both.
Todo
- default init? -> perhaps for checks if subclassing factory family is well-defined?
- default call function with basic functionality (that factories override if necessary)
- parallelisation (including parameters)
Expand source code
class VectorFactory(Factory): def __init__(self, corpus, vectoriser_algorithm, add_matrix_to_corpus=True, **vectoriser_kwargs): default_args = dict(max_df=0.7, min_df=5) default_args.update(vectoriser_kwargs) self.fitted_corpus = corpus self.add_matrix = add_matrix_to_corpus self.vectoriser = vectoriser_algorithm(**default_args) self.matrix = self.vectoriser.fit_transform([ email.body.normalised for email in corpus.iter_emails() ]) self.conversation_matrix = self.vectoriser.transform( [conv.get_email_bodies(attr="normalised", join_str=" ") for conv in corpus] ) self.vocabulary = self.vectoriser.get_feature_names() def process_conversation(self, conversation): conversation.vectorised = next(self.conv_iter) for email in conversation: email_vector = self.process_email(email) return conversation.vectorised def process_email(self, email): email.body.vectorised = next(self.email_iter) return email.body.vectorised def __call__(self, corpus=None, **kwargs): if not corpus: corpus = self.fitted_corpus if self.add_matrix: corpus.vectorised_vocabulary = self.vocabulary corpus.vectorised = self.matrix corpus.conversations_vectorised = self.conversation_matrix self.conv_iter = iter(self.conversation_matrix) self.email_iter = iter(self.matrix) return super().__call__(corpus, **kwargs)
Ancestors
Subclasses
Methods
def process_conversation(self, conversation)
-
Expand source code
def process_conversation(self, conversation): conversation.vectorised = next(self.conv_iter) for email in conversation: email_vector = self.process_email(email) return conversation.vectorised
def process_email(self, email)
-
Expand source code
def process_email(self, email): email.body.vectorised = next(self.email_iter) return email.body.vectorised
Inherited members