import os | |
import re | |
import time | |
import logging | |
from pathlib import Path | |
from typing import List, Tuple | |
from fastapi import FastAPI, UploadFile, File, HTTPException | |
from fastapi.middleware.cors import CORSMiddleware | |
import fitz # PyMuPDF | |
import torch | |
import numpy as np | |
import nltk | |
import asyncio | |
from transformers import ( | |
AutoTokenizer, | |
AutoModelForSequenceClassification, | |
AutoModel, | |
) | |
from nltk.tokenize import sent_tokenize | |
from sklearn.metrics.pairwise import cosine_similarity | |
# Setup cache | |
os.environ["TRANSFORMERS_CACHE"] = "/tmp/.cache/huggingface" | |
os.environ["HF_HOME"] = "/tmp/.cache/huggingface" | |
os.environ["NLTK_DATA"] = "/tmp/.cache/nltk" | |
Path("/tmp/.cache/huggingface").mkdir(parents=True, exist_ok=True) | |
Path("/tmp/.cache/nltk").mkdir(parents=True, exist_ok=True) | |
# Logging | |
logging.basicConfig(level=logging.INFO) | |
logger = logging.getLogger(__name__) | |
# NLTK init | |
try: | |
nltk.data.path.append("/tmp/.cache/nltk") | |
nltk.data.find("tokenizers/punkt") | |
except LookupError: | |
nltk.download("punkt", download_dir="/tmp/.cache/nltk") | |
nltk.data.path.append("/tmp/.cache/nltk") | |
# App init | |
app = FastAPI() | |
app.add_middleware( | |
CORSMiddleware, | |
allow_origins=["*"], | |
allow_methods=["POST"], | |
allow_headers=["*"], | |
) | |
# Model configs | |
MODEL_NAME = "Essay-Grader/roberta-ai-detector-20250401_232702" | |
EMBEDDING_MODEL = "sentence-transformers/all-MiniLM-L6-v2" | |
DEVICE = 0 if torch.cuda.is_available() else -1 | |
MAX_TEXT_LENGTH = 10000 | |
AI_CHUNK_SIZE = 512 | |
PLAGIARISM_THRESHOLD = 0.75 | |
TIMEOUT = 25 # total timeout buffer | |
# Load models | |
tokenizer = AutoTokenizer.from_pretrained(MODEL_NAME) | |
ai_model = AutoModelForSequenceClassification.from_pretrained(MODEL_NAME).to( | |
DEVICE if DEVICE != -1 else "cpu" | |
) | |
ai_model.eval() | |
embed_tokenizer = AutoTokenizer.from_pretrained(EMBEDDING_MODEL) | |
embed_model = AutoModel.from_pretrained(EMBEDDING_MODEL).to( | |
DEVICE if DEVICE != -1 else "cpu" | |
) | |
embed_model.eval() | |
# Health check | |
# @app.get("/health") | |
# def health_check(): | |
# return {"status": "healthy"} | |
def extract_text(pdf_bytes: bytes) -> str: | |
try: | |
with fitz.open(stream=pdf_bytes, filetype="pdf") as doc: | |
text = [] | |
for page in doc: | |
page_text = page.get_text().strip() | |
if "reference" in page_text.lower(): | |
break # Exclude reference section | |
text.append(page_text) | |
full_text = re.sub(r"\s+", " ", "\n".join(text))[:MAX_TEXT_LENGTH] | |
if len(full_text) < 150: | |
raise ValueError("Text too short") | |
return full_text | |
except Exception as e: | |
logger.error(f"PDF error: {str(e)}") | |
raise HTTPException(400, "Invalid PDF") | |
def predict_ai(text: str) -> float: | |
inputs = tokenizer( | |
text, | |
truncation=True, | |
max_length=AI_CHUNK_SIZE, | |
return_tensors="pt", | |
).to(ai_model.device) | |
with torch.no_grad(): | |
outputs = ai_model(**inputs) | |
probs = torch.softmax(outputs.logits, dim=1) | |
return float(probs[0][1]) # AI-generated probability | |
def compute_embeddings(sentences: List[str]) -> np.ndarray: | |
inputs = embed_tokenizer( | |
sentences, | |
padding=True, | |
truncation=True, | |
max_length=128, | |
return_tensors="pt", | |
).to(embed_model.device) | |
with torch.no_grad(): | |
outputs = embed_model(**inputs) | |
attention_mask = inputs["attention_mask"] | |
last_hidden = outputs.last_hidden_state | |
return (last_hidden * attention_mask.unsqueeze(-1)).sum(1) / attention_mask.sum( | |
1, keepdim=True | |
) | |
def check_plagiarism(text: str) -> Tuple[float, bool]: | |
try: | |
sentences = [ | |
s for s in sent_tokenize(text) if 5 < len(s.split()) < 100 | |
][:40] # limit | |
if len(sentences) < 2: | |
return 0.0, False | |
embeddings = compute_embeddings(sentences).cpu().numpy() | |
sim_matrix = cosine_similarity(embeddings) | |
np.fill_diagonal(sim_matrix, 0) | |
n = len(sim_matrix) | |
top_k = max(1, int(0.1 * n * (n - 1) / 2)) | |
top_indices = np.argpartition(sim_matrix.flatten(), -top_k)[-top_k:] | |
avg_similarity = float(np.mean(sim_matrix.flatten()[top_indices])) | |
return round(avg_similarity * 100, 2), avg_similarity > PLAGIARISM_THRESHOLD | |
except Exception as e: | |
logger.error(f"Plagiarism check error: {str(e)}") | |
return 0.0, False | |
async def detect_ai_and_plagiarism(file: UploadFile = File(...)): | |
start_time = time.time() | |
try: | |
if not file.filename.lower().endswith(".pdf"): | |
raise HTTPException(400, "Only PDF files allowed") | |
pdf_data = await file.read() | |
text = extract_text(pdf_data) | |
async def run_ai(): | |
return predict_ai(text) | |
async def run_plagiarism(): | |
return check_plagiarism(text) | |
ai_future = asyncio.create_task(run_ai()) | |
plagiarism_future = asyncio.create_task(run_plagiarism()) | |
ai_score, (plag_score, plag_risk) = await asyncio.gather( | |
ai_future, plagiarism_future | |
) | |
total_time = time.time() - start_time | |
if total_time > TIMEOUT: | |
raise HTTPException(500, "Processing timed out") | |
return { | |
"ai_generated_percentage": round(ai_score * 100, 2), | |
"plagiarism_percentage": plag_score, | |
# "plagiarism_risk": plag_risk | |
} | |
except Exception as e: | |
logger.error(f"Error: {str(e)}", exc_info=True) | |
raise HTTPException(500, f"Processing failed: {str(e)}") | |
# # main.py: Optimized AI Detection and Plagiarism Check API | |
# import os | |
# from pathlib import Path | |
# import logging | |
# from typing import List, Tuple | |
# import re | |
# import time | |
# # Configure cache directories first | |
# os.environ["TRANSFORMERS_CACHE"] = "/tmp/.cache/huggingface" | |
# os.environ["HF_HOME"] = "/tmp/.cache/huggingface" | |
# os.environ["NLTK_DATA"] = "/tmp/.cache/nltk" | |
# # Create cache directories | |
# Path("/tmp/.cache/huggingface").mkdir(parents=True, exist_ok=True) | |
# Path("/tmp/.cache/nltk").mkdir(parents=True, exist_ok=True) | |
# # Import remaining dependencies | |
# from fastapi import FastAPI, UploadFile, File, HTTPException | |
# from fastapi.middleware.cors import CORSMiddleware | |
# from transformers import pipeline, AutoTokenizer, AutoModel | |
# import fitz # PyMuPDF | |
# import torch | |
# import numpy as np | |
# import nltk | |
# from nltk.tokenize import sent_tokenize | |
# from sklearn.metrics.pairwise import cosine_similarity | |
# # Configure logging | |
# logging.basicConfig(level=logging.INFO) | |
# logger = logging.getLogger(__name__) | |
# # Initialize NLTK data | |
# try: | |
# nltk.data.path.append("/tmp/.cache/nltk") | |
# nltk.data.find('tokenizers/punkt') | |
# logger.info("NLTK punkt tokenizer available") | |
# except LookupError: | |
# logger.info("Downloading NLTK punkt tokenizer...") | |
# nltk.download('punkt', download_dir="/tmp/.cache/nltk") | |
# nltk.data.path.append("/tmp/.cache/nltk") | |
# app = FastAPI() | |
# app.add_middleware( | |
# CORSMiddleware, | |
# allow_origins=["*"], | |
# allow_methods=["POST"], | |
# allow_headers=["*"], | |
# ) | |
# # Configuration - Optimized for speed and accuracy | |
# # https://huggingface.co/Essay-Grader/roberta-ai-detector-20250401_232702 | |
# # roberta-base-openai-detector | |
# MODEL_NAME = "Essay-Grader/roberta-ai-detector-20250401_232702" # More accurate model | |
# EMBEDDING_MODEL = "sentence-transformers/paraphrase-MiniLM-L3-v2" # Faster model | |
# DEVICE = 0 if torch.cuda.is_available() else -1 | |
# CHUNK_SIZE = 768 # Increased chunk size for better performance | |
# MIN_TEXT_LENGTH = 150 | |
# MAX_TEXT_LENGTH = 8000 # Reduced for faster processing | |
# PLAGIARISM_THRESHOLD = 0.78 # Adjusted threshold | |
# MAX_SENTENCES = 50 # Limit sentences for plagiarism check | |
# TIMEOUT = 25 # Seconds before timeout | |
# # Health check endpoint | |
# @app.get("/health") | |
# def health_check(): | |
# return {"status": "healthy"} | |
# # Load models at startup | |
# try: | |
# logger.info("Loading optimized AI detection model...") | |
# ai_detector = pipeline( | |
# "text-classification", | |
# model=MODEL_NAME, | |
# device=DEVICE, | |
# truncation=True, | |
# max_length=CHUNK_SIZE, | |
# top_k=1 # Only return top prediction | |
# ) | |
# logger.info("Loading optimized embedding model...") | |
# tokenizer = AutoTokenizer.from_pretrained(EMBEDDING_MODEL) | |
# embed_model = AutoModel.from_pretrained(EMBEDDING_MODEL).to(DEVICE if DEVICE != -1 else "cpu") | |
# embed_model.eval() # Set to evaluation mode | |
# logger.info("All models loaded successfully") | |
# except Exception as e: | |
# logger.error(f"Model loading failed: {str(e)}", exc_info=True) | |
# raise RuntimeError(f"Failed to initialize models: {str(e)}") | |
# def extract_text(pdf_bytes: bytes) -> str: | |
# """Optimized PDF text extraction with timeout check.""" | |
# start_time = time.time() | |
# try: | |
# with fitz.open(stream=pdf_bytes, filetype="pdf") as doc: | |
# text = [] | |
# for page in doc: | |
# if time.time() - start_time > TIMEOUT/2: # Half timeout for extraction | |
# raise TimeoutError("PDF extraction taking too long") | |
# text.append(page.get_text().strip()) | |
# full_text = "\n".join(text).strip() | |
# if len(full_text) < MIN_TEXT_LENGTH: | |
# raise ValueError(f"Text too short (min {MIN_TEXT_LENGTH} chars required)") | |
# if len(full_text) > MAX_TEXT_LENGTH: | |
# full_text = full_text[:MAX_TEXT_LENGTH] | |
# return re.sub(r'\s+', ' ', full_text) | |
# except Exception as e: | |
# logger.error(f"PDF processing error: {str(e)}") | |
# raise HTTPException(400, "Invalid PDF content") | |
# def compute_embeddings(sentences: List[str]) -> np.ndarray: | |
# """Optimized embedding computation with batch processing.""" | |
# inputs = tokenizer( | |
# sentences, | |
# padding=True, | |
# truncation=True, | |
# return_tensors="pt", | |
# max_length=128 # Reduced max length for speed | |
# ).to(embed_model.device) | |
# with torch.no_grad(): | |
# model_output = embed_model(**inputs) | |
# # Simplified mean pooling | |
# attention_mask = inputs['attention_mask'] | |
# token_embeddings = model_output[0] | |
# input_mask_expanded = attention_mask.unsqueeze(-1).expand(token_embeddings.size()).float() | |
# return torch.sum(token_embeddings * input_mask_expanded, 1) / torch.clamp(input_mask_expanded.sum(1), min=1e-9) | |
# def check_internal_plagiarism(text: str) -> Tuple[float, bool]: | |
# """Optimized plagiarism check with sentence limit.""" | |
# try: | |
# sentences = [s for s in sent_tokenize(text) if len(s.split()) > 5][:MAX_SENTENCES] | |
# if len(sentences) < 2: | |
# return 0.0, False | |
# embeddings = compute_embeddings(sentences) | |
# sim_matrix = cosine_similarity(embeddings) | |
# np.fill_diagonal(sim_matrix, 0) | |
# # Only check top 10 most similar pairs for speed | |
# top_indices = np.argpartition(sim_matrix.flatten(), -10)[-10:] | |
# avg_similarity = np.mean(sim_matrix.flatten()[top_indices]) | |
# return round(float(avg_similarity) * 100, 2), bool(avg_similarity > PLAGIARISM_THRESHOLD) | |
# except Exception as e: | |
# logger.error(f"Plagiarism check failed: {str(e)}") | |
# return 0.0, False | |
# @app.post("/detect") | |
# async def detect_ai_content(file: UploadFile = File(...)): | |
# """Optimized detection endpoint with timeout.""" | |
# start_time = time.time() | |
# try: | |
# # Validate file type quickly | |
# if not file.filename.lower().endswith('.pdf'): | |
# raise HTTPException(400, "Only PDF files are accepted") | |
# # Extract text with timeout check | |
# text = extract_text(await file.read()) | |
# logger.info(f"Processing document with {len(text)} characters") | |
# # AI Detection with timeout check | |
# ai_score = 0.0 | |
# try: | |
# result = ai_detector(text[:MAX_TEXT_LENGTH]) # Process only first MAX_TEXT_LENGTH chars | |
# ai_score = result[0]['score'] if result[0]['label'] == 'LABEL_1' else 1 - result[0]['score'] | |
# except Exception as e: | |
# logger.error(f"AI detection failed: {str(e)}") | |
# raise HTTPException(500, "AI detection processing error") | |
# # Plagiarism check with timeout | |
# plagiarism_score, plagiarism_detected = 0.0, False | |
# if time.time() - start_time < TIMEOUT - 5: # Leave 5 seconds for response | |
# plagiarism_score, plagiarism_detected = check_internal_plagiarism(text) | |
# # Final timeout check | |
# if time.time() - start_time > TIMEOUT: | |
# raise HTTPException(500, "Analysis timed out") | |
# return { | |
# "ai_generated_percentage": round(float(ai_score) * 100, 2), | |
# "plagiarism_risk": bool(plagiarism_detected), | |
# "plagiarism_score": float(plagiarism_score), | |
# "processing_time": round(time.time() - start_time, 2) | |
# } | |
# except HTTPException as he: | |
# raise | |
# except Exception as e: | |
# logger.error(f"Detection error: {str(e)}", exc_info=True) | |
# raise HTTPException(500, f"Analysis failed: {str(e)}") | |