OptimAbstract / model.py
tlemagueresse
First model in WIP
c53eedd
raw
history blame contribute delete
8.17 kB
import pickle
import time
from collections import Counter
from copy import deepcopy
import nltk
import numpy as np
import spacy
from nltk.corpus import stopwords
from textstat import textstat
from transformers import AutoModelForSeq2SeqLM, AutoTokenizer
from bert_score import score
from scipy.stats import entropy
nltk.download("punkt")
nltk.download("averaged_perceptron_tagger")
nltk.download("stopwords")
nlp = spacy.load("en_core_web_sm")
class T5Model:
"""
A class to encapsulate a T5 summarization model.
Parameters
----------
model_name : str
The name of the pretrained T5 model.
"""
def __init__(self, model_name):
self.model_name = model_name
self.tokenizer = AutoTokenizer.from_pretrained(model_name)
self.model = AutoModelForSeq2SeqLM.from_pretrained(model_name)
def summarize(self, text):
"""
Generate a summary for the given text.
Tokenize -> generate the summary -> decode the text.
Parameters
----------
text : str
The input text to summarize.
Returns
-------
summary : str
The generated summary.
elapsed_time : float
The time taken for summarization in seconds.
"""
inputs = self.tokenizer(text, return_tensors="pt", max_length=512, truncation=True)
start_time = time.time()
outputs = self.model.generate(**inputs, max_length=150, num_beams=4, early_stopping=True)
end_time = time.time()
summary = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
return summary, end_time - start_time
class MetaModel:
"""
A meta model that selects the best T5Model based on extracted features and a base classifier.
Parameters
----------
model_names : list of str
List of pretrained T5 model names.
base_classifier : object
A classifier instance used to predict the best model.
tolerance : float, optional
Tolerance threshold for model selection (default is 0.01).
"""
def __init__(self, model_names, base_classifier, tolerance=0.01):
self.models = {name: T5Model(name) for name in model_names}
self.base_classifier = deepcopy(base_classifier)
self.tolerance = tolerance
def fit(self, texts, summaries):
"""
Fit the base classifier using extracted features and best model labels.
Parameters
----------
texts : list of str
List of input texts.
summaries : list of str
List of reference summaries.
"""
X = np.array([list(extract_features(text).values()) for text in texts])
y = get_best_model(self.models, texts, summaries, self.tolerance)
self.base_classifier.fit(X, y)
def summarize(self, text):
"""
Summarize text using the predicted best model.
Parameters
----------
text : str
The input text to summarize.
Returns
-------
summary : str
The generated summary.
elapsed_time : float
The time taken for summarization in seconds.
"""
features = np.array(list(extract_features(text).values()))[np.newaxis, :]
predicted_model_index = self.base_classifier.predict(features)[0]
predicted_model_name = list(self.models.keys())[predicted_model_index]
return self.models[predicted_model_name].summarize(text)
def save_object(obj, filename):
with open(filename, "wb") as f:
pickle.dump(obj, f)
def load_object(filename):
with open(filename, "rb") as f:
return pickle.load(f)
def get_best_model(models, texts, summaries, tolerance):
"""
Determine the best model for each text based on BERTScore and summarization time.
Parameters
----------
models : dict
Dictionary mapping model names to T5Model instances.
texts : list of str
List of input texts.
summaries : list of str
List of reference summaries.
tolerance : float
Tolerance threshold for model selection.
Returns
-------
y : np.ndarray
Array of indices corresponding to the best model for each text.
"""
best_model_labels = []
for i, text in enumerate(texts):
model_results = []
for model_name, model in models.items():
summary, elapsed_time = model.summarize(text)
P, R, F1 = score([summary], [summaries[i]], lang="en", verbose=False)
f1_score = F1.item()
model_results.append((model_name, f1_score, elapsed_time))
model_results.sort(key=lambda x: (-x[1], x[2]))
# Select best model based on tolerance rule
best_model, best_score, best_time = model_results[0]
for model_name, f1_score, elapsed_time in model_results[1:]:
if best_score - f1_score <= tolerance and elapsed_time < best_time:
best_model, best_score, best_time = model_name, f1_score, elapsed_time
best_model_labels.append(best_model)
y = np.array([list(models.keys()).index(m) for m in best_model_labels])
return y
def extract_features(text):
"""
Extract linguistic and statistical features from a text.
Parameters
----------
text : str
The input text.
Returns
-------
features : dict
Dictionary of extracted features:
- num_words : int
- avg_word_length : float
- num_sentences : int
- avg_sentence_length : float
- avg_syntax_depth : float
- num_subordinates : int
- num_verbs : int
- num_passive : int
- type_token_ratio : float
- lexical_entropy : float
- syllables_per_word : float
- complex_words : int
- stopword_ratio : float
"""
doc = nlp(text)
num_words = len(doc)
avg_word_length = (
np.mean([len(token.text) for token in doc if token.is_alpha]) if num_words > 0 else 0
)
sentences = list(doc.sents)
num_sentences = len(sentences)
avg_sentence_length = num_words / num_sentences if num_sentences > 0 else 0
# Profondeur syntax
depths = [token.head.i - token.i for token in doc if token.head != token]
avg_syntax_depth = np.mean(depths) if depths else 0
subordinate_conjunctions = {
"because",
"although",
"since",
"unless",
"whereas",
"while",
"though",
"if",
}
num_subordinates = sum(1 for token in doc if token.text.lower() in subordinate_conjunctions)
num_verbs = sum(1 for token in doc if token.pos_ == "VERB")
num_passive = sum(1 for token in doc if token.dep_ == "auxpass")
words = [token.text.lower() for token in doc if token.is_alpha]
unique_words = set(words)
type_token_ratio = len(unique_words) / len(words) if len(words) > 0 else 0
word_freqs = Counter(words)
word_probs = np.array(list(word_freqs.values())) / num_words if num_words > 0 else [1]
lexical_entropy = entropy(word_probs)
syllables_per_word = (
np.mean([textstat.syllable_count(token.text) for token in doc if token.is_alpha])
if num_words > 0
else 0
)
complex_words = sum(1 for token in doc if textstat.syllable_count(token.text) >= 3)
stop_words = set(stopwords.words("english"))
stopword_ratio = (
sum(1 for word in words if word in stop_words) / num_words if num_words > 0 else 0
)
return {
"num_words": num_words,
"avg_word_length": avg_word_length,
"num_sentences": num_sentences,
"avg_sentence_length": avg_sentence_length,
"avg_syntax_depth": avg_syntax_depth,
"num_subordinates": num_subordinates,
"num_verbs": num_verbs,
"num_passive": num_passive,
"type_token_ratio": type_token_ratio,
"lexical_entropy": lexical_entropy,
"syllables_per_word": syllables_per_word,
"complex_words": complex_words,
"stopword_ratio": stopword_ratio,
}