Spaces:
No application file
No application file
import nltk | |
from transformers import pipeline, AutoModelForSequenceClassification, AutoTokenizer, AutoModelForCausalLM | |
import torch | |
from sentence_transformers import SentenceTransformer, util | |
from bert_score import score | |
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction | |
from rouge import Rouge | |
from tqdm import tqdm | |
from datasets import load_metric | |
# Download necessary NLTK data | |
nltk.download('punkt') | |
nltk.download('stopwords') | |
# --- Model and Metric Loading --- | |
class ContentEvaluator: | |
def __init__(self): | |
self.semantic_similarity_model = SentenceTransformer('neuralmind/bert-large-portuguese-cased') | |
self.perplexity_model_name = "unicamp-dl/ptt5-base-portuguese-vocab" | |
self.perplexity_tokenizer = AutoTokenizer.from_pretrained(self.perplexity_model_name) | |
# Load Hugging Face metrics | |
self.bertscore_metric = load_metric("bertscore") | |
self.bleu_metric = load_metric("bleu") | |
self.rouge_metric = load_metric("rouge") | |
self.meteor_metric = load_metric("meteor") | |
self.sacrebleu_metric = load_metric("sacrebleu") # More robust BLEU implementation | |
# Load a powerful LLM for judging content and detecting hallucinations | |
self.judge_model_name = "gpt-3.5-turbo" # Gemini Or GPT-4 if available | |
self.judge = pipeline("text-generation", model=self.judge_model_name) | |
def calculate_perplexity(self, text): | |
""" | |
Calculates the perplexity of a text using a Portuguese LLM model. | |
Perplexity measures how well the language model understands the text. | |
Lower perplexity indicates that the text is more predictable and likely to be grammatically correct. | |
Higher perplexity suggests the text is more surprising or unusual, potentially indicating errors or nonsensical content. | |
""" | |
try: | |
perplexity_model = AutoModelForCausalLM.from_pretrained(self.perplexity_model_name) | |
with torch.no_grad(): | |
tokenize_input = self.perplexity_tokenizer.tokenize(text) | |
tensor_input = self.perplexity_tokenizer.encode(text, return_tensors='pt') | |
loss = perplexity_model(tensor_input, labels=tensor_input)[0] | |
return torch.exp(loss).item() | |
except Exception as e: | |
print(f"Error calculating perplexity: {e}") | |
return float('inf') | |
def detect_hallucination_with_llm(self, text, window_size=200): | |
""" | |
Detects potential hallucinations using an LLM with a refined prompt. | |
""" | |
hallucinations = [] | |
text_chunks = nltk.word_tokenize(text) | |
for i in range(0, len(text_chunks), window_size): | |
chunk = " ".join(text_chunks[i:i + window_size]) | |
prompt = f""" | |
You are an expert in identifying factual errors and inconsistencies in educational text. | |
Your task is to meticulously analyze the provided text excerpt and pinpoint any potential hallucinations. | |
Focus on identifying claims or statements that exhibit the following characteristics: | |
* **Factual Inaccuracy:** Assertions that are demonstrably false or lack credible supporting evidence. | |
* **Logical Fallacies:** Statements containing flawed reasoning or internal contradictions. | |
* **Nonsensical Claims:** Assertions that are absurd, meaningless, or defy common sense. | |
* **Invented Information:** Fabricated details or events that have no basis in reality. | |
Text Excerpt: | |
``` | |
{chunk} | |
``` | |
For each potential hallucination, provide: | |
- **Hallucination:** The specific text you believe is a hallucination. | |
- **Explanation:** A detailed and precise justification for why you classify it as a hallucination. | |
Return your analysis as a JSON list of dictionaries, strictly adhering to the following format: | |
```json | |
[ | |
{{"hallucination": "[The hallucinated text]", "explanation": "[Your detailed explanation]"}} | |
] | |
``` | |
""" | |
response = self.judge(prompt, max_length=300)[0]['generated_text'].strip() | |
try: | |
chunk_hallucinations = eval(response) | |
for hallucination in chunk_hallucinations: | |
hallucinations.append({ | |
'chunk': chunk, | |
'hallucination': hallucination['hallucination'], | |
'explanation': hallucination['explanation'] | |
}) | |
except Exception as e: | |
print(f"Error parsing LLM response: {e}") | |
print(f"LLM Response: {response}") | |
return hallucinations | |
def calculate_metrics(self, generated_text, reference_text): | |
"""Calculates BERTScore, BLEU, ROUGE, METEOR, and SacreBLEU metrics.""" | |
results = {} | |
try: | |
results['bertscore'] = self.bertscore_metric.compute(predictions=[generated_text], references=[reference_text], lang="pt")['f1'][0] | |
bleu_results = self.bleu_metric.compute(predictions=[generated_text.split()], references=[[reference_text.split()]]) | |
results['bleu'] = bleu_results['bleu'] | |
rouge_results = self.rouge_metric.compute(predictions=[generated_text], references=[reference_text]) | |
results['rougeL'] = rouge_results['rougeL'] | |
meteor_results = self.meteor_metric.compute(predictions=[generated_text], references=[reference_text]) | |
results['meteor'] = meteor_results['meteor'] | |
# SacreBLEU (more robust BLEU implementation) | |
sacrebleu_results = self.sacrebleu_metric.compute(predictions=[generated_text], references=[[reference_text]]) | |
results['sacrebleu'] = sacrebleu_results['score'] | |
except Exception as e: | |
print(f"Error calculating metrics: {e}") | |
results = {'bertscore': None, 'bleu': None, 'rougeL': None, 'meteor': None, 'sacrebleu': None} | |
return results | |
def analyze_text(self, text, perplexity_threshold=40): | |
""" | |
Analyzes a text for perplexity and potential hallucinations. | |
""" | |
results = [] | |
sentences = nltk.sent_tokenize(text) | |
for i, sentence in enumerate(sentences): | |
perplexity = self.calculate_perplexity(sentence) | |
hallucinations = self.detect_hallucination_with_llm(sentence) | |
issues = [] | |
if perplexity > perplexity_threshold: | |
issues.append(f"- **High Perplexity:** ({perplexity:.2f}) The sentence might be grammatically incorrect or nonsensical.") | |
if hallucinations: | |
for hallucination in hallucinations: | |
issues.append(f"- **Potential Hallucination (LLM):** {hallucination['hallucination']} - {hallucination['explanation']}") | |
review_flag = len(issues) > 0 | |
explanation = "\n".join(issues) if issues else "No potential issues detected." | |
results.append({ | |
'sentence_index': i, | |
'review_flag': review_flag, | |
'explanation': explanation, | |
'perplexity': perplexity, | |
'hallucinations': hallucinations, | |
'sentence': sentence | |
}) | |
return results | |
def analyze_content_for_review(self, generated_text, reference_text, | |
similarity_threshold, | |
bertscore_threshold, | |
bleu_threshold, | |
rouge_threshold, | |
meteor_threshold): | |
"""Analyzes content and flags potential issues based on provided thresholds and LLM judgment.""" | |
similarity = self.estimate_semantic_similarity(generated_text, reference_text) | |
metrics = self.calculate_metrics(generated_text, reference_text) | |
llm_judgment = self.get_llm_judgment(generated_text, reference_text) | |
issues = [] | |
if similarity < similarity_threshold: | |
issues.append(f"- **Low Semantic Similarity:** ({similarity:.2f}) Content might be off-topic or not factually aligned.") | |
if metrics['bertscore'] and metrics['bertscore'] < bertscore_threshold: | |
issues.append(f"- **Low BERTScore:** ({metrics['bertscore']:.2f}) There might be factual inaccuracies or significant paraphrasing.") | |
if metrics['bleu'] and metrics['bleu'] < bleu_threshold: | |
issues.append(f"- **Low BLEU Score:** ({metrics['bleu']:.2f}) The generated text might not be fluent or use appropriate wording.") | |
if metrics['rougeL'] and metrics['rougeL'] < rouge_threshold: | |
issues.append(f"- **Low ROUGE-L Score:** ({metrics['rougeL']:.2f}) The generated text might not cover important information from the reference.") | |
if metrics['meteor'] and metrics['meteor'] < meteor_threshold: | |
issues.append(f"- **Low METEOR Score:** ({metrics['meteor']:.2f}) The generated text might have poor word alignment with the reference.") | |
# Use LLM judgment as the primary decision-maker | |
if llm_judgment == "major issues": | |
review_flag = True | |
explanation = f"LLM Judgment: **Major Issues**\n" + "\n".join(issues) | |
elif llm_judgment == "minor issues": | |
review_flag = True | |
explanation = f"LLM Judgment: **Minor Issues**\n" + "\n".join(issues) | |
else: | |
review_flag = False | |
explanation = "LLM Judgment: **No Issues**" | |
return { | |
'review_flag': review_flag, | |
'explanation': explanation, | |
'semantic_similarity': similarity, | |
'metrics': metrics, | |
'llm_judgment': llm_judgment, | |
'generated_text': generated_text, | |
'reference_text': reference_text | |
} | |
# --- Example Usage --- | |
if __name__ == "__main__": | |
evaluator = ContentEvaluator() | |
# Example text (replace with your actual data) | |
text = """ | |
A Terra é plana e o Sol gira em torno dela. | |
A gravidade é uma força fraca. | |
As plantas precisam de água para sobreviver. | |
A Lua é feita de queijo. | |
Os dinossauros ainda vivem na Amazônia. | |
""" | |
analysis_results = evaluator.analyze_text(text) | |
for result in analysis_results: | |
print(f"----- Sentence {result['sentence_index'] + 1} -----") | |
print(f"Review Flag: {result['review_flag']}") | |
print(f"Explanation: {result['explanation']}") | |
print(f"Perplexity: {result['perplexity']:.2f}") | |
print(f"Sentence: {result['sentence']}\n") | |
# 2. Content Evaluation Phase (using the best thresholds) | |
new_generated_text = evaluator.generate_educational_content("Matemática") | |
new_reference_text = "Content from your educational material..." | |
evaluation_result = evaluator.analyze_content_for_review( | |
new_generated_text, new_reference_text, | |
best_thresholds['similarity_threshold'], | |
best_thresholds['bertscore_threshold'], | |
best_thresholds['bleu_threshold'], | |
best_thresholds['rouge_threshold'], | |
best_thresholds['meteor_threshold'] | |
) | |
print("\n----- Evaluation Result -----") | |
print(f"Review Flag: {evaluation_result['review_flag']}") | |
print(f"Explanation: {evaluation_result['explanation']}") | |
####### | |
from typing import List, Tuple, Callable | |
def evaluate_retrieval_precision( | |
questions: List[str], | |
system: Callable[[str], List[str]], | |
evaluator: Callable[[str, str], int], | |
num_chunks_expected: int = 3, | |
verbose: bool = True | |
) -> dict: | |
""" | |
Evaluates the retrieval precision of a system using an LLM evaluator. | |
Args: | |
questions: A list of evaluation questions. | |
system: A function that takes a question as input and returns a list of retrieved chunks. | |
evaluator: A function that takes a question and a chunk as input and returns a relevance score (0 or 1). | |
num_chunks_expected: The number of chunks the system is expected to return. Defaults to 3. | |
verbose: Whether to print warnings for questions with fewer returned chunks than expected. | |
Returns: | |
A dictionary containing: | |
- 'mean_precision': The mean retrieval precision score across all questions. | |
- 'precision_scores': A list of precision scores for each individual question. | |
- 'question_relevance': A list of tuples, where each tuple contains a question and the number of relevant chunks retrieved for that question. | |
""" | |
results = { | |
'mean_precision': 0.0, | |
'precision_scores': [], | |
'question_relevance': [] | |
} | |
for i, question in enumerate(questions): | |
retrieved_chunks = system(question) | |
# Warning if fewer chunks are returned than expected | |
if len(retrieved_chunks) < num_chunks_expected and verbose: | |
print(f"Warning: System returned {len(retrieved_chunks)} chunks (expected {num_chunks_expected}) for question {i+1}: {question}") | |
# Calculate precision for the current question | |
relevant_chunks = sum(evaluator(question, chunk) for chunk in retrieved_chunks) | |
precision = relevant_chunks / len(retrieved_chunks) if retrieved_chunks else 0 | |
results['precision_scores'].append(precision) | |
# Store the question and its relevant chunk count | |
results['question_relevance'].append((question, relevant_chunks)) | |
# Calculate mean precision | |
results['mean_precision'] = sum(results['precision_scores']) / len(questions) if questions else 0 | |
return results | |
# Example usage (assuming you've defined 'questions', 'system', and 'evaluator'): | |
evaluation_results = evaluate_retrieval_precision( | |
questions, system, evaluator, num_chunks_expected=3, verbose=True | |
) | |
print(f"Mean Retrieval Precision: {evaluation_results['mean_precision']:.2f}") | |
print(f"Precision Scores for Each Question: {evaluation_results['precision_scores']}") | |
print(f"Question Relevance: {evaluation_results['question_relevance']}") |