Spaces:
Sleeping
Sleeping
from flask import Flask, request, jsonify, render_template | |
from flask_cors import CORS | |
from transformers import AutoModelForSequenceClassification, AutoTokenizer | |
from sentence_transformers import SentenceTransformer | |
from sklearn.metrics.pairwise import cosine_similarity | |
import torch | |
import numpy as np | |
import pickle | |
import os | |
import json | |
import logging | |
import csv | |
import re | |
import nltk | |
from nltk.corpus import stopwords | |
from nltk.stem import WordNetLemmatizer | |
from datetime import datetime | |
# Download necessary NLTK resources | |
nltk.download('stopwords', quiet=True) | |
nltk.download('punkt', quiet=True) | |
nltk.download('wordnet', quiet=True) | |
# Configure logging | |
logging.basicConfig(level=logging.INFO, | |
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s', | |
handlers=[logging.FileHandler("combined_api.log"), | |
logging.StreamHandler()]) | |
logger = logging.getLogger(__name__) | |
app = Flask(__name__) | |
CORS(app) # Enable Cross-Origin Resource Sharing | |
# Global variables and constants | |
BASE_DIR = os.path.dirname(os.path.abspath(__file__)) | |
INTENT_MODEL_PATH = os.path.join(BASE_DIR, "model") | |
RECOMMENDER_MODEL_PATH = os.path.join(BASE_DIR, "recommender_model") | |
EVAL_CSV = "model_evaluation.csv" | |
# Global model variables | |
intent_model = None | |
intent_tokenizer = None | |
intent_classes = None | |
intent_thresholds = None | |
recommender = None | |
recommender_model_loaded = False | |
################################################# | |
# Book Recommender System | |
################################################# | |
class BookRecommender: | |
def __init__(self, model_name='all-minilm-l6-v2'): | |
"""Initialize the book recommender with specified model.""" | |
self.model_name = model_name | |
self.model = None | |
self.book_embeddings = None | |
self.df = None | |
self.stop_words = set(stopwords.words('english')) | |
self.lemmatizer = WordNetLemmatizer() | |
logger.info(f"BookRecommender initialized with model: {model_name}") | |
def preprocess_text(self, text): | |
"""Advanced text preprocessing with stopword removal and lemmatization.""" | |
if not isinstance(text, str): | |
return "" | |
# Convert to lowercase and remove special characters | |
text = text.lower() | |
text = re.sub(r'[^\w\s]', ' ', text) | |
# Tokenize, remove stopwords, and lemmatize | |
tokens = nltk.word_tokenize(text) | |
tokens = [self.lemmatizer.lemmatize(word) for word in tokens if word not in self.stop_words] | |
return ' '.join(tokens) | |
def load_model(self, folder_path=RECOMMENDER_MODEL_PATH): | |
"""Load a previously saved model and embeddings for inference.""" | |
try: | |
# Check if folder exists | |
if not os.path.exists(folder_path): | |
logger.error(f"Model folder {folder_path} does not exist.") | |
return False | |
# Load configuration | |
config_path = os.path.join(folder_path, "config.pkl") | |
with open(config_path, 'rb') as f: | |
config = pickle.load(f) | |
self.model_name = config['model_name'] | |
logger.info(f"Loaded configuration: model_name={self.model_name}") | |
# Load the sentence transformer model | |
model_path = os.path.join(folder_path, "sentence_transformer") | |
self.model = SentenceTransformer(model_path) | |
logger.info(f"Model loaded from {model_path}") | |
# Load book embeddings | |
embeddings_path = os.path.join(folder_path, "book_embeddings.pkl") | |
with open(embeddings_path, 'rb') as f: | |
self.book_embeddings = pickle.load(f) | |
logger.info(f"Embeddings loaded: {len(self.book_embeddings)} book vectors") | |
# Load the DataFrame | |
df_path = os.path.join(folder_path, "books_data.pkl") | |
with open(df_path, 'rb') as f: | |
self.df = pickle.load(f) | |
logger.info(f"DataFrame loaded: {len(self.df)} books") | |
return True | |
except Exception as e: | |
logger.error(f"Error loading model: {str(e)}", exc_info=True) | |
return False | |
def recommend_books(self, user_query, top_n=5, include_description=True): | |
"""Recommend books based on user query.""" | |
if self.model is None or self.book_embeddings is None or self.df is None: | |
logger.error("Model not initialized. Cannot make recommendations.") | |
return [] | |
logger.info(f"Finding books similar to: '{user_query}'") | |
try: | |
# Preprocess the query the same way as the book text | |
processed_query = self.preprocess_text(user_query) | |
# Encode user query | |
user_embedding = self.model.encode([processed_query]) | |
# Compute similarity between query and books | |
similarities = cosine_similarity(user_embedding, self.book_embeddings)[0] | |
# Get top N most similar books | |
similar_books_idx = np.argsort(similarities)[-top_n:][::-1] | |
recommendations = [] | |
for i, idx in enumerate(similar_books_idx): | |
book_data = {} | |
# Extract book information | |
if 'Title' in self.df.columns: | |
book_data['title'] = self.df.iloc[idx]['Title'] | |
if 'Authors' in self.df.columns: | |
book_data['author'] = self.df.iloc[idx]['Authors'] | |
if 'Category' in self.df.columns: | |
book_data['category'] = self.df.iloc[idx]['Category'] | |
if 'Publish Date (Year)' in self.df.columns: | |
book_data['year'] = self.df.iloc[idx]['Publish Date (Year)'] | |
if include_description and 'Description' in self.df.columns: | |
# Truncate long descriptions | |
description = self.df.iloc[idx]['Description'] | |
if len(description) > 200: | |
description = description[:197] + "..." | |
book_data['description'] = description | |
# Add similarity score | |
book_data['relevance_score'] = float(similarities[idx]) | |
book_data['rank'] = i + 1 | |
recommendations.append(book_data) | |
logger.info(f"Successfully generated {len(recommendations)} recommendations") | |
return recommendations | |
except Exception as e: | |
logger.error(f"Error generating recommendations: {str(e)}", exc_info=True) | |
return [] | |
################################################# | |
# Intent Classification | |
################################################# | |
def setup_evaluation_csv(): | |
"""Set up the CSV file for tracking model performance""" | |
if not os.path.exists(EVAL_CSV): | |
with open(EVAL_CSV, 'w', newline='') as f: | |
writer = csv.writer(f) | |
writer.writerow([ | |
'timestamp', | |
'input_text', | |
'predicted_intent', | |
'is_ood', | |
'confidence', | |
'energy_score', | |
'detection_method' | |
]) | |
logger.info(f"Created evaluation CSV file: {EVAL_CSV}") | |
def save_prediction_to_csv(input_text, result, method): | |
"""Save prediction results to CSV for later analysis""" | |
with open(EVAL_CSV, 'a', newline='') as f: | |
writer = csv.writer(f) | |
writer.writerow([ | |
datetime.now().strftime("%Y-%m-%d %H:%M:%S"), | |
input_text, | |
result['intent'], | |
result['is_ood'], | |
result['confidence'], | |
result['energy_score'], | |
method | |
]) | |
def load_ood_thresholds(model_path): | |
"""Load the OOD thresholds from the model directory""" | |
threshold_path = os.path.join(model_path, "ood_thresholds.json") | |
if os.path.exists(threshold_path): | |
with open(threshold_path, "r") as f: | |
return json.load(f) | |
else: | |
# Provide default thresholds if file not found | |
logger.warning(f"Threshold file not found at {threshold_path}. Using default values.") | |
return { | |
"energy_threshold": 0.0, # Replace with your default value | |
"msp_threshold": 0.5 # Replace with your default value | |
} | |
def load_intent_resources(): | |
"""Load model, tokenizer, intent classes, and thresholds for intent classification.""" | |
global intent_model, intent_tokenizer, intent_classes, intent_thresholds | |
logger.info(f"Loading intent resources from {INTENT_MODEL_PATH}...") | |
try: | |
# Load model and tokenizer | |
intent_model = AutoModelForSequenceClassification.from_pretrained(INTENT_MODEL_PATH) | |
intent_tokenizer = AutoTokenizer.from_pretrained(INTENT_MODEL_PATH) | |
# Load intent classes | |
intent_classes_path = os.path.join(INTENT_MODEL_PATH, "intent_classes.pkl") | |
if os.path.exists(intent_classes_path): | |
with open(intent_classes_path, "rb") as f: | |
intent_classes = pickle.load(f) | |
else: | |
raise FileNotFoundError(f"Intent classes file not found at {intent_classes_path}") | |
# Load OOD thresholds | |
intent_thresholds = load_ood_thresholds(INTENT_MODEL_PATH) | |
logger.info("Intent resources loaded successfully") | |
logger.info(f"Loaded {len(intent_classes)} intent classes") | |
logger.info(f"Thresholds: {intent_thresholds}") | |
return True | |
except Exception as e: | |
logger.error(f"Failed to load intent resources: {str(e)}", exc_info=True) | |
return False | |
def predict_intent_with_enhanced_ood(text, model, tokenizer, intent_classes, | |
energy_threshold, msp_threshold, method='combined'): | |
""" | |
Predict intent with enhanced out-of-distribution detection and detailed logging. | |
""" | |
logger.info("\n========== INTENT PREDICTION DEBUG ==========") | |
logger.info(f"Input Text: {text}") | |
logger.info(f"Detection Method: {method}") | |
# Tokenize input | |
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512) | |
# Get model outputs | |
with torch.no_grad(): | |
outputs = model(**inputs) | |
logits = outputs.logits | |
logger.info(f"Logits: {logits.numpy().tolist()}") | |
# Get probabilities | |
probs = torch.nn.functional.softmax(logits, dim=-1) | |
max_prob, pred_idx = torch.max(probs, dim=-1) | |
logger.info(f"Softmax Probabilities: {probs.numpy().tolist()}") | |
logger.info(f"Max Probability (Confidence): {max_prob.item():.4f}") | |
logger.info(f"Predicted Index: {pred_idx.item()}") | |
# Calculate energy score | |
energy = -torch.logsumexp(logits, dim=-1) | |
logger.info(f"Energy Score: {energy.item():.4f}") | |
# OOD detection | |
is_ood = False | |
if method == 'energy': | |
is_ood = energy.item() > energy_threshold | |
elif method == 'msp': | |
is_ood = max_prob.item() < msp_threshold | |
elif method == 'combined': | |
is_ood = (energy.item() > energy_threshold) and (max_prob.item() < msp_threshold) | |
logger.info(f"OOD Detection -> is_ood: {is_ood}") | |
if is_ood: | |
logger.info("Prediction marked as OUT-OF-DISTRIBUTION.") | |
else: | |
logger.info("Prediction marked as IN-DISTRIBUTION.") | |
# Get intent label | |
predicted_intent = intent_classes[pred_idx.item()] | |
logger.info(f"Predicted Intent: {predicted_intent}") | |
logger.info("=============================================\n") | |
return { | |
"intent": predicted_intent, | |
"is_ood": is_ood, | |
"confidence": max_prob.item(), | |
"energy_score": energy.item(), | |
# Add all class probabilities for detailed analysis | |
"class_probabilities": { | |
intent_classes[i]: float(prob) | |
for i, prob in enumerate(probs[0].numpy()) | |
} | |
} | |
################################################# | |
# Server Initialization | |
################################################# | |
def initialize_models(): | |
"""Load all required models on startup.""" | |
global recommender, recommender_model_loaded | |
# Create evaluation CSV if it doesn't exist | |
setup_evaluation_csv() | |
# Load intent classification model | |
intent_model_loaded = load_intent_resources() | |
if intent_model_loaded: | |
logger.info("Intent classification model loaded successfully!") | |
else: | |
logger.error("Failed to load intent model.") | |
# Initialize book recommender | |
recommender = BookRecommender() | |
recommender_model_loaded = recommender.load_model() | |
if recommender_model_loaded: | |
logger.info("Book recommendation model loaded successfully!") | |
else: | |
logger.error("Failed to load book recommendation model.") | |
return intent_model_loaded and recommender_model_loaded | |
################################################# | |
# API Routes | |
################################################# | |
def health_check(): | |
"""Endpoint to check if the API is running and models are loaded.""" | |
intent_models_loaded = intent_model is not None and intent_tokenizer is not None | |
return jsonify({ | |
"status": "healthy" if (intent_models_loaded and recommender_model_loaded) else "partially_healthy" if (intent_models_loaded or recommender_model_loaded) else "unhealthy", | |
"intent_model_loaded": intent_models_loaded, | |
"recommender_model_loaded": recommender_model_loaded, | |
"available_endpoints": [ | |
"/api/health", | |
"/api/analyze", | |
"/api/recommend", | |
"/api/stats", | |
"/api/download_eval_data" | |
] | |
}) | |
################################################# | |
# Intent Classification Routes | |
################################################# | |
def analyze(): | |
"""Endpoint to predict intent from text.""" | |
# Check if request contains JSON | |
if not request.is_json: | |
return jsonify({"error": "Request must be JSON"}), 400 | |
# Get text from request | |
data = request.get_json() | |
if 'text' not in data: | |
return jsonify({"error": "Missing 'text' field in request"}), 400 | |
text = data['text'] | |
# Default to combined method unless specified | |
method = data.get('method', 'combined') | |
if method not in ['energy', 'msp', 'combined']: | |
return jsonify({"error": "Invalid method. Must be 'energy', 'msp', or 'combined'"}), 400 | |
# Make prediction | |
result = predict_intent_with_enhanced_ood( | |
text, | |
intent_model, | |
intent_tokenizer, | |
intent_classes, | |
intent_thresholds["energy_threshold"], | |
intent_thresholds["msp_threshold"], | |
method=method | |
) | |
# Save result to CSV for evaluation | |
save_prediction_to_csv(text, result, method) | |
# Return prediction as JSON | |
return jsonify(result) | |
def get_stats(): | |
"""Get statistics about model usage and predictions.""" | |
try: | |
stats = { | |
"intent_model_info": { | |
"num_intent_classes": len(intent_classes) if intent_classes else 0, | |
"model_path": INTENT_MODEL_PATH, | |
"thresholds": intent_thresholds | |
}, | |
"recommender_model_info": { | |
"model_name": recommender.model_name if recommender else None, | |
"num_books": len(recommender.df) if recommender and recommender.df is not None else 0 | |
}, | |
"usage": {} | |
} | |
# Read CSV to generate statistics if it exists | |
if os.path.exists(EVAL_CSV): | |
with open(EVAL_CSV, 'r') as f: | |
reader = csv.DictReader(f) | |
rows = list(reader) | |
stats["usage"] = { | |
"total_queries": len(rows), | |
"ood_count": sum(1 for row in rows if row["is_ood"] == "True"), | |
"top_intents": {} | |
} | |
# Count intents for statistical analysis | |
intent_counts = {} | |
for row in rows: | |
intent = row["predicted_intent"] | |
if intent not in intent_counts: | |
intent_counts[intent] = 0 | |
intent_counts[intent] += 1 | |
# Get top 5 intents | |
top_intents = sorted(intent_counts.items(), key=lambda x: x[1], reverse=True)[:5] | |
stats["usage"]["top_intents"] = dict(top_intents) | |
return jsonify(stats) | |
except Exception as e: | |
logger.error(f"Error in stats endpoint: {str(e)}", exc_info=True) | |
return jsonify({ | |
"error": "Processing error", | |
"message": f"An error occurred while retrieving stats: {str(e)}" | |
}), 500 | |
def download_eval_data(): | |
"""Return the evaluation data as JSON for analysis""" | |
try: | |
if not os.path.exists(EVAL_CSV): | |
return jsonify({"error": "No evaluation data available yet"}), 404 | |
with open(EVAL_CSV, 'r') as f: | |
reader = csv.DictReader(f) | |
rows = list(reader) | |
return jsonify({ | |
"count": len(rows), | |
"data": rows | |
}) | |
except Exception as e: | |
logger.error(f"Error downloading evaluation data: {str(e)}", exc_info=True) | |
return jsonify({ | |
"error": "Processing error", | |
"message": f"An error occurred: {str(e)}" | |
}), 500 | |
################################################# | |
# Book Recommender Routes | |
################################################# | |
def recommend(): | |
"""Endpoint to get book recommendations based on user query.""" | |
global recommender_model_loaded | |
if not recommender_model_loaded: | |
return jsonify({ | |
"error": "Model not loaded", | |
"message": "The recommendation model is not properly loaded." | |
}), 503 | |
data = request.get_json() | |
if not data: | |
return jsonify({ | |
"error": "Invalid request", | |
"message": "No JSON data provided." | |
}), 400 | |
query = data.get('query') | |
top_n = data.get('top_n', 5) | |
include_description = data.get('include_description', True) | |
threshold = data.get('threshold', 0.5) # default threshold | |
if not query: | |
return jsonify({ | |
"error": "Missing parameter", | |
"message": "Query parameter is required." | |
}), 400 | |
try: | |
# Get recommendations | |
recommendations = recommender.recommend_books( | |
user_query=query, | |
top_n=int(top_n), | |
include_description=bool(include_description) | |
) | |
# Clean recommendations to make it JSON serializable | |
def clean_np(obj): | |
if isinstance(obj, np.integer): | |
return int(obj) | |
elif isinstance(obj, np.floating): | |
return float(obj) | |
elif isinstance(obj, np.ndarray): | |
return obj.tolist() | |
elif isinstance(obj, dict): | |
return {k: clean_np(v) for k, v in obj.items()} | |
elif isinstance(obj, list): | |
return [clean_np(i) for i in obj] | |
else: | |
return obj | |
recommendations_clean = clean_np(recommendations) | |
# Split based on threshold | |
high_score = [rec for rec in recommendations_clean if rec['relevance_score'] >= threshold] | |
low_score = [rec for rec in recommendations_clean if rec['relevance_score'] < threshold] | |
return jsonify({ | |
"query": query, | |
"threshold": threshold, | |
"high_recommendations": high_score, | |
"low_recommendations": low_score, | |
"total_count": len(recommendations_clean), | |
"high_count": len(high_score), | |
"low_count": len(low_score) | |
}) | |
except Exception as e: | |
logger.error(f"Error in recommendation endpoint: {str(e)}", exc_info=True) | |
return jsonify({ | |
"error": "Processing error", | |
"message": f"An error occurred while processing your request: {str(e)}" | |
}), 500 | |
################################################# | |
# Main | |
################################################# | |
if __name__ == '__main__': | |
# Initialize models when the app starts | |
models_loaded = initialize_models() | |
# Set port from environment variable or default to 5000 | |
port = int(os.environ.get('PORT', 5000)) | |
# For development use debug=True, for production use debug=False | |
app.run(host='0.0.0.0', port=port, debug=False, use_reloader=False) | |
#curl -X POST http://localhost:5000/api/analyze \-H "Content-Type: application/json" \-d '{"text": "cariin buku", "method": "combined"}' | |
#curl -X POST http://localhost:5000/api/recommend \-H "Content-Type: application/json" \-d '{"query": "programming for begginers","top_n": 10,"include_description": true}' | |