|
from sentence_transformers import util |
|
from nltk.corpus import stopwords |
|
from nltk.tokenize import word_tokenize |
|
from gensim.models import KeyedVectors |
|
import numpy as np |
|
import nltk |
|
from gensim import corpora |
|
from gensim.models import FastText |
|
from gensim.similarities import SparseTermSimilarityMatrix, WordEmbeddingSimilarityIndex |
|
from gensim.downloader import load |
|
import sys |
|
import os |
|
import tempfile |
|
sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(__file__)))) |
|
from all_models import models, get_cache_dir, check_directory_permissions |
|
import torch |
|
import logging |
|
from utils import log_print |
|
|
|
|
|
logging.basicConfig(level=logging.INFO) |
|
logger = logging.getLogger(__name__) |
|
|
|
def verify_model_file(model_path): |
|
"""Verify that model file exists and is readable""" |
|
try: |
|
if not os.path.exists(model_path): |
|
logger.error(f"Model file does not exist: {model_path}") |
|
return False |
|
|
|
|
|
size = os.path.getsize(model_path) |
|
if size == 0: |
|
logger.error(f"Model file is empty: {model_path}") |
|
return False |
|
|
|
|
|
with open(model_path, 'rb') as f: |
|
|
|
f.read(1024) |
|
logger.info(f"Model file is readable: {model_path}") |
|
return True |
|
except Exception as e: |
|
logger.error(f"Error verifying model file {model_path}: {e}") |
|
return False |
|
|
|
def get_fasttext_cache_dir(): |
|
"""Get cache directory for FastText model""" |
|
cache_dir = get_cache_dir() |
|
fasttext_dir = os.path.join(cache_dir, 'fasttext') |
|
|
|
logger.info(f"Setting up FastText cache directory: {fasttext_dir}") |
|
|
|
try: |
|
os.makedirs(fasttext_dir, mode=0o755, exist_ok=True) |
|
if check_directory_permissions(fasttext_dir): |
|
logger.info(f"FastText cache directory is ready: {fasttext_dir}") |
|
return fasttext_dir |
|
except Exception as e: |
|
logger.error(f"Error creating FastText cache directory: {e}") |
|
|
|
|
|
temp_dir = tempfile.mkdtemp() |
|
logger.info(f"Using temporary directory for FastText: {temp_dir}") |
|
return temp_dir |
|
|
|
def ensure_full_permissions(path): |
|
"""Grant full permissions to a file or directory""" |
|
try: |
|
if os.path.isdir(path): |
|
|
|
os.chmod(path, 0o777) |
|
|
|
for root, dirs, files in os.walk(path): |
|
for d in dirs: |
|
os.chmod(os.path.join(root, d), 0o777) |
|
for f in files: |
|
os.chmod(os.path.join(root, f), 0o666) |
|
else: |
|
|
|
os.chmod(path, 0o666) |
|
return True |
|
except Exception as e: |
|
logger.error(f"Error setting permissions for {path}: {e}") |
|
return False |
|
|
|
def load_fasttext_model(): |
|
"""Load FastText model with proper error handling""" |
|
try: |
|
|
|
model_dir = os.getenv('GENSIM_DATA_DIR') |
|
if not model_dir: |
|
model_dir = os.path.join(os.path.expanduser('~'), '.cache', 'answer_grading_app', 'gensim') |
|
os.makedirs(model_dir, mode=0o777, exist_ok=True) |
|
ensure_full_permissions(model_dir) |
|
|
|
model_path = os.path.join(model_dir, 'fasttext-wiki-news-subwords-300.gz') |
|
logger.info(f"Attempting to load FastText model from: {model_path}") |
|
|
|
if os.path.exists(model_path): |
|
|
|
ensure_full_permissions(model_path) |
|
|
|
logger.info("Loading FastText model from cache...") |
|
try: |
|
model = KeyedVectors.load_word2vec_format(model_path) |
|
logger.info("Successfully loaded FastText model from cache") |
|
return model |
|
except Exception as e: |
|
logger.error(f"Error loading cached model: {str(e)}") |
|
|
|
try: |
|
os.remove(model_path) |
|
logger.info("Removed corrupted model file, will try downloading again") |
|
except Exception as rm_error: |
|
logger.error(f"Could not remove corrupted model file: {rm_error}") |
|
|
|
|
|
logger.info("Downloading FastText model...") |
|
try: |
|
import gensim.downloader as api |
|
model = api.load('fasttext-wiki-news-subwords-300') |
|
logger.info("Successfully downloaded FastText model") |
|
|
|
|
|
try: |
|
os.makedirs(os.path.dirname(model_path), mode=0o777, exist_ok=True) |
|
model.save_word2vec_format(model_path) |
|
ensure_full_permissions(model_path) |
|
logger.info(f"Saved FastText model to: {model_path}") |
|
except Exception as save_error: |
|
logger.warning(f"Could not save model to cache: {str(save_error)}") |
|
|
|
return model |
|
except Exception as e: |
|
logger.error(f"Error downloading FastText model: {str(e)}") |
|
return DummyFasttext() |
|
|
|
except Exception as e: |
|
logger.error(f"Error in load_fasttext_model: {str(e)}") |
|
return DummyFasttext() |
|
|
|
class DummyFasttext: |
|
"""Fallback class when FastText model fails to load""" |
|
def __init__(self): |
|
self.vector_size = 300 |
|
log_print("Using dummy FastText model due to loading error", "WARNING") |
|
|
|
def get_vector(self, word): |
|
return np.zeros(self.vector_size) |
|
|
|
def __getitem__(self, word): |
|
return self.get_vector(word) |
|
|
|
def most_similar(self, word, topn=10): |
|
return [] |
|
|
|
def to(self, device): |
|
"""Add dummy to() method to prevent errors""" |
|
return self |
|
|
|
|
|
try: |
|
fasttext_model = load_fasttext_model() |
|
|
|
logger.info("FastText model loaded successfully") |
|
except Exception as e: |
|
logger.error(f"Error loading FastText model at module level: {e}") |
|
fasttext_model = DummyFasttext() |
|
|
|
def question_vector_sentence(correct_answer): |
|
"""Get sentence embedding using shared model""" |
|
try: |
|
|
|
model = models.get_similarity_model() |
|
|
|
embedding = model.encode(correct_answer, convert_to_tensor=True, device=models.device) |
|
return embedding |
|
except Exception as e: |
|
logger.error(f"Error in question_vector_sentence: {str(e)}") |
|
return None |
|
finally: |
|
|
|
models.release_similarity_model() |
|
|
|
def similarity_model_score(sentence_vectors, answer): |
|
"""Calculate similarity score using sentence transformer""" |
|
try: |
|
|
|
model = models.get_similarity_model() |
|
|
|
|
|
answer_embedding = model.encode(answer, convert_to_tensor=True, device=models.device) |
|
|
|
|
|
similarities = [] |
|
for vec in sentence_vectors: |
|
if vec is not None: |
|
similarity = util.pytorch_cos_sim(answer_embedding, vec).item() |
|
similarities.append(similarity) |
|
|
|
if not similarities: |
|
return 0.0 |
|
|
|
return max(similarities) |
|
except Exception as e: |
|
logger.error(f"Error in similarity_model_score: {str(e)}") |
|
return 0.0 |
|
finally: |
|
|
|
models.release_similarity_model() |
|
|
|
def preprocess(sentence): |
|
"""Preprocess text by tokenizing and removing stopwords""" |
|
try: |
|
|
|
sentence = sentence.lower() |
|
|
|
words = word_tokenize(sentence) |
|
|
|
words = [word for word in words if word not in stopwords.words('english')] |
|
return words |
|
except Exception as e: |
|
logger.error(f"Error in preprocess: {str(e)}") |
|
return [] |
|
|
|
def sentence_to_vec(tokens, model): |
|
"""Convert sentence tokens to vector using the model""" |
|
try: |
|
|
|
valid_words = [word for word in tokens if word in model] |
|
|
|
|
|
if not valid_words: |
|
return np.zeros(model.vector_size) |
|
|
|
|
|
word_vectors = [model[word] for word in valid_words] |
|
sentence_vector = np.mean(word_vectors, axis=0) |
|
|
|
return sentence_vector |
|
except Exception as e: |
|
logger.error(f"Error in sentence_to_vec: {str(e)}") |
|
return np.zeros(300) |
|
|
|
def compute_scm(tokens1, tokens2, model): |
|
"""Compute semantic similarity between token sets""" |
|
try: |
|
dictionary = corpora.Dictionary([tokens1, tokens2]) |
|
tokens1 = dictionary.doc2bow(tokens1) |
|
tokens2 = dictionary.doc2bow(tokens2) |
|
termsim_index = WordEmbeddingSimilarityIndex(model) |
|
termsim_matrix = SparseTermSimilarityMatrix(termsim_index, dictionary) |
|
similarity = termsim_matrix.inner_product(tokens1, tokens2, normalized=(True, True)) |
|
return float(similarity) |
|
except Exception as e: |
|
logger.error(f"Error in compute_scm: {str(e)}") |
|
return 0.5 |
|
|
|
def question_vector_word(correct_answer): |
|
"""Get word vectors using FastText model""" |
|
try: |
|
|
|
tokens = preprocess(correct_answer) |
|
if not tokens: |
|
return None |
|
|
|
|
|
vectors = [] |
|
for token in tokens: |
|
try: |
|
vector = fasttext_model[token] |
|
vectors.append(vector) |
|
except KeyError: |
|
|
|
continue |
|
|
|
if not vectors: |
|
return None |
|
|
|
|
|
return np.mean(vectors, axis=0) |
|
except Exception as e: |
|
logger.error(f"Error in question_vector_word: {str(e)}") |
|
return None |
|
|
|
def fasttext_similarity(word_vectors, answer): |
|
"""Calculate similarity score using FastText word embeddings""" |
|
try: |
|
|
|
answer_embedding = question_vector_word(answer) |
|
|
|
|
|
similarities = [] |
|
for vec in word_vectors: |
|
if vec is not None: |
|
similarity = np.dot(answer_embedding, vec) / (np.linalg.norm(answer_embedding) * np.linalg.norm(vec)) |
|
similarities.append(similarity) |
|
|
|
if not similarities: |
|
return 0.0 |
|
|
|
return max(similarities) |
|
except Exception as e: |
|
logger.error(f"Error in fasttext_similarity: {str(e)}") |
|
return 0.0 |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|