ZEROTSUDIOS's picture
Upload folder using huggingface_hub
c3637f5 verified
from flask import Flask, request, jsonify, render_template
from flask_cors import CORS
from transformers import AutoModelForSequenceClassification, AutoTokenizer
from sentence_transformers import SentenceTransformer
from sklearn.metrics.pairwise import cosine_similarity
import torch
import numpy as np
import pickle
import os
import json
import logging
import csv
import re
import nltk
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from datetime import datetime
# Download necessary NLTK resources
nltk.download('stopwords', quiet=True)
nltk.download('punkt', quiet=True)
nltk.download('wordnet', quiet=True)
# Configure logging
logging.basicConfig(level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s',
handlers=[logging.FileHandler("combined_api.log"),
logging.StreamHandler()])
logger = logging.getLogger(__name__)
app = Flask(__name__)
CORS(app) # Enable Cross-Origin Resource Sharing
# Global variables and constants
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
INTENT_MODEL_PATH = os.path.join(BASE_DIR, "model")
RECOMMENDER_MODEL_PATH = os.path.join(BASE_DIR, "recommender_model")
EVAL_CSV = "model_evaluation.csv"
# Global model variables
intent_model = None
intent_tokenizer = None
intent_classes = None
intent_thresholds = None
recommender = None
recommender_model_loaded = False
#################################################
# Book Recommender System
#################################################
class BookRecommender:
def __init__(self, model_name='all-minilm-l6-v2'):
"""Initialize the book recommender with specified model."""
self.model_name = model_name
self.model = None
self.book_embeddings = None
self.df = None
self.stop_words = set(stopwords.words('english'))
self.lemmatizer = WordNetLemmatizer()
logger.info(f"BookRecommender initialized with model: {model_name}")
def preprocess_text(self, text):
"""Advanced text preprocessing with stopword removal and lemmatization."""
if not isinstance(text, str):
return ""
# Convert to lowercase and remove special characters
text = text.lower()
text = re.sub(r'[^\w\s]', ' ', text)
# Tokenize, remove stopwords, and lemmatize
tokens = nltk.word_tokenize(text)
tokens = [self.lemmatizer.lemmatize(word) for word in tokens if word not in self.stop_words]
return ' '.join(tokens)
def load_model(self, folder_path=RECOMMENDER_MODEL_PATH):
"""Load a previously saved model and embeddings for inference."""
try:
# Check if folder exists
if not os.path.exists(folder_path):
logger.error(f"Model folder {folder_path} does not exist.")
return False
# Load configuration
config_path = os.path.join(folder_path, "config.pkl")
with open(config_path, 'rb') as f:
config = pickle.load(f)
self.model_name = config['model_name']
logger.info(f"Loaded configuration: model_name={self.model_name}")
# Load the sentence transformer model
model_path = os.path.join(folder_path, "sentence_transformer")
self.model = SentenceTransformer(model_path)
logger.info(f"Model loaded from {model_path}")
# Load book embeddings
embeddings_path = os.path.join(folder_path, "book_embeddings.pkl")
with open(embeddings_path, 'rb') as f:
self.book_embeddings = pickle.load(f)
logger.info(f"Embeddings loaded: {len(self.book_embeddings)} book vectors")
# Load the DataFrame
df_path = os.path.join(folder_path, "books_data.pkl")
with open(df_path, 'rb') as f:
self.df = pickle.load(f)
logger.info(f"DataFrame loaded: {len(self.df)} books")
return True
except Exception as e:
logger.error(f"Error loading model: {str(e)}", exc_info=True)
return False
def recommend_books(self, user_query, top_n=5, include_description=True):
"""Recommend books based on user query."""
if self.model is None or self.book_embeddings is None or self.df is None:
logger.error("Model not initialized. Cannot make recommendations.")
return []
logger.info(f"Finding books similar to: '{user_query}'")
try:
# Preprocess the query the same way as the book text
processed_query = self.preprocess_text(user_query)
# Encode user query
user_embedding = self.model.encode([processed_query])
# Compute similarity between query and books
similarities = cosine_similarity(user_embedding, self.book_embeddings)[0]
# Get top N most similar books
similar_books_idx = np.argsort(similarities)[-top_n:][::-1]
recommendations = []
for i, idx in enumerate(similar_books_idx):
book_data = {}
# Extract book information
if 'Title' in self.df.columns:
book_data['title'] = self.df.iloc[idx]['Title']
if 'Authors' in self.df.columns:
book_data['author'] = self.df.iloc[idx]['Authors']
if 'Category' in self.df.columns:
book_data['category'] = self.df.iloc[idx]['Category']
if 'Publish Date (Year)' in self.df.columns:
book_data['year'] = self.df.iloc[idx]['Publish Date (Year)']
if include_description and 'Description' in self.df.columns:
# Truncate long descriptions
description = self.df.iloc[idx]['Description']
if len(description) > 200:
description = description[:197] + "..."
book_data['description'] = description
# Add similarity score
book_data['relevance_score'] = float(similarities[idx])
book_data['rank'] = i + 1
recommendations.append(book_data)
logger.info(f"Successfully generated {len(recommendations)} recommendations")
return recommendations
except Exception as e:
logger.error(f"Error generating recommendations: {str(e)}", exc_info=True)
return []
#################################################
# Intent Classification
#################################################
def setup_evaluation_csv():
"""Set up the CSV file for tracking model performance"""
if not os.path.exists(EVAL_CSV):
with open(EVAL_CSV, 'w', newline='') as f:
writer = csv.writer(f)
writer.writerow([
'timestamp',
'input_text',
'predicted_intent',
'is_ood',
'confidence',
'energy_score',
'detection_method'
])
logger.info(f"Created evaluation CSV file: {EVAL_CSV}")
def save_prediction_to_csv(input_text, result, method):
"""Save prediction results to CSV for later analysis"""
with open(EVAL_CSV, 'a', newline='') as f:
writer = csv.writer(f)
writer.writerow([
datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
input_text,
result['intent'],
result['is_ood'],
result['confidence'],
result['energy_score'],
method
])
def load_ood_thresholds(model_path):
"""Load the OOD thresholds from the model directory"""
threshold_path = os.path.join(model_path, "ood_thresholds.json")
if os.path.exists(threshold_path):
with open(threshold_path, "r") as f:
return json.load(f)
else:
# Provide default thresholds if file not found
logger.warning(f"Threshold file not found at {threshold_path}. Using default values.")
return {
"energy_threshold": 0.0, # Replace with your default value
"msp_threshold": 0.5 # Replace with your default value
}
def load_intent_resources():
"""Load model, tokenizer, intent classes, and thresholds for intent classification."""
global intent_model, intent_tokenizer, intent_classes, intent_thresholds
logger.info(f"Loading intent resources from {INTENT_MODEL_PATH}...")
try:
# Load model and tokenizer
intent_model = AutoModelForSequenceClassification.from_pretrained(INTENT_MODEL_PATH)
intent_tokenizer = AutoTokenizer.from_pretrained(INTENT_MODEL_PATH)
# Load intent classes
intent_classes_path = os.path.join(INTENT_MODEL_PATH, "intent_classes.pkl")
if os.path.exists(intent_classes_path):
with open(intent_classes_path, "rb") as f:
intent_classes = pickle.load(f)
else:
raise FileNotFoundError(f"Intent classes file not found at {intent_classes_path}")
# Load OOD thresholds
intent_thresholds = load_ood_thresholds(INTENT_MODEL_PATH)
logger.info("Intent resources loaded successfully")
logger.info(f"Loaded {len(intent_classes)} intent classes")
logger.info(f"Thresholds: {intent_thresholds}")
return True
except Exception as e:
logger.error(f"Failed to load intent resources: {str(e)}", exc_info=True)
return False
def predict_intent_with_enhanced_ood(text, model, tokenizer, intent_classes,
energy_threshold, msp_threshold, method='combined'):
"""
Predict intent with enhanced out-of-distribution detection and detailed logging.
"""
logger.info("\n========== INTENT PREDICTION DEBUG ==========")
logger.info(f"Input Text: {text}")
logger.info(f"Detection Method: {method}")
# Tokenize input
inputs = tokenizer(text, return_tensors="pt", padding=True, truncation=True, max_length=512)
# Get model outputs
with torch.no_grad():
outputs = model(**inputs)
logits = outputs.logits
logger.info(f"Logits: {logits.numpy().tolist()}")
# Get probabilities
probs = torch.nn.functional.softmax(logits, dim=-1)
max_prob, pred_idx = torch.max(probs, dim=-1)
logger.info(f"Softmax Probabilities: {probs.numpy().tolist()}")
logger.info(f"Max Probability (Confidence): {max_prob.item():.4f}")
logger.info(f"Predicted Index: {pred_idx.item()}")
# Calculate energy score
energy = -torch.logsumexp(logits, dim=-1)
logger.info(f"Energy Score: {energy.item():.4f}")
# OOD detection
is_ood = False
if method == 'energy':
is_ood = energy.item() > energy_threshold
elif method == 'msp':
is_ood = max_prob.item() < msp_threshold
elif method == 'combined':
is_ood = (energy.item() > energy_threshold) and (max_prob.item() < msp_threshold)
logger.info(f"OOD Detection -> is_ood: {is_ood}")
if is_ood:
logger.info("Prediction marked as OUT-OF-DISTRIBUTION.")
else:
logger.info("Prediction marked as IN-DISTRIBUTION.")
# Get intent label
predicted_intent = intent_classes[pred_idx.item()]
logger.info(f"Predicted Intent: {predicted_intent}")
logger.info("=============================================\n")
return {
"intent": predicted_intent,
"is_ood": is_ood,
"confidence": max_prob.item(),
"energy_score": energy.item(),
# Add all class probabilities for detailed analysis
"class_probabilities": {
intent_classes[i]: float(prob)
for i, prob in enumerate(probs[0].numpy())
}
}
#################################################
# Server Initialization
#################################################
def initialize_models():
"""Load all required models on startup."""
global recommender, recommender_model_loaded
# Create evaluation CSV if it doesn't exist
setup_evaluation_csv()
# Load intent classification model
intent_model_loaded = load_intent_resources()
if intent_model_loaded:
logger.info("Intent classification model loaded successfully!")
else:
logger.error("Failed to load intent model.")
# Initialize book recommender
recommender = BookRecommender()
recommender_model_loaded = recommender.load_model()
if recommender_model_loaded:
logger.info("Book recommendation model loaded successfully!")
else:
logger.error("Failed to load book recommendation model.")
return intent_model_loaded and recommender_model_loaded
#################################################
# API Routes
#################################################
@app.route('/api/health', methods=['GET'])
def health_check():
"""Endpoint to check if the API is running and models are loaded."""
intent_models_loaded = intent_model is not None and intent_tokenizer is not None
return jsonify({
"status": "healthy" if (intent_models_loaded and recommender_model_loaded) else "partially_healthy" if (intent_models_loaded or recommender_model_loaded) else "unhealthy",
"intent_model_loaded": intent_models_loaded,
"recommender_model_loaded": recommender_model_loaded,
"available_endpoints": [
"/api/health",
"/api/analyze",
"/api/recommend",
"/api/stats",
"/api/download_eval_data"
]
})
#################################################
# Intent Classification Routes
#################################################
@app.route('/api/analyze', methods=['POST'])
def analyze():
"""Endpoint to predict intent from text."""
# Check if request contains JSON
if not request.is_json:
return jsonify({"error": "Request must be JSON"}), 400
# Get text from request
data = request.get_json()
if 'text' not in data:
return jsonify({"error": "Missing 'text' field in request"}), 400
text = data['text']
# Default to combined method unless specified
method = data.get('method', 'combined')
if method not in ['energy', 'msp', 'combined']:
return jsonify({"error": "Invalid method. Must be 'energy', 'msp', or 'combined'"}), 400
# Make prediction
result = predict_intent_with_enhanced_ood(
text,
intent_model,
intent_tokenizer,
intent_classes,
intent_thresholds["energy_threshold"],
intent_thresholds["msp_threshold"],
method=method
)
# Save result to CSV for evaluation
save_prediction_to_csv(text, result, method)
# Return prediction as JSON
return jsonify(result)
@app.route('/api/stats', methods=['GET'])
def get_stats():
"""Get statistics about model usage and predictions."""
try:
stats = {
"intent_model_info": {
"num_intent_classes": len(intent_classes) if intent_classes else 0,
"model_path": INTENT_MODEL_PATH,
"thresholds": intent_thresholds
},
"recommender_model_info": {
"model_name": recommender.model_name if recommender else None,
"num_books": len(recommender.df) if recommender and recommender.df is not None else 0
},
"usage": {}
}
# Read CSV to generate statistics if it exists
if os.path.exists(EVAL_CSV):
with open(EVAL_CSV, 'r') as f:
reader = csv.DictReader(f)
rows = list(reader)
stats["usage"] = {
"total_queries": len(rows),
"ood_count": sum(1 for row in rows if row["is_ood"] == "True"),
"top_intents": {}
}
# Count intents for statistical analysis
intent_counts = {}
for row in rows:
intent = row["predicted_intent"]
if intent not in intent_counts:
intent_counts[intent] = 0
intent_counts[intent] += 1
# Get top 5 intents
top_intents = sorted(intent_counts.items(), key=lambda x: x[1], reverse=True)[:5]
stats["usage"]["top_intents"] = dict(top_intents)
return jsonify(stats)
except Exception as e:
logger.error(f"Error in stats endpoint: {str(e)}", exc_info=True)
return jsonify({
"error": "Processing error",
"message": f"An error occurred while retrieving stats: {str(e)}"
}), 500
@app.route('/api/download_eval_data', methods=['GET'])
def download_eval_data():
"""Return the evaluation data as JSON for analysis"""
try:
if not os.path.exists(EVAL_CSV):
return jsonify({"error": "No evaluation data available yet"}), 404
with open(EVAL_CSV, 'r') as f:
reader = csv.DictReader(f)
rows = list(reader)
return jsonify({
"count": len(rows),
"data": rows
})
except Exception as e:
logger.error(f"Error downloading evaluation data: {str(e)}", exc_info=True)
return jsonify({
"error": "Processing error",
"message": f"An error occurred: {str(e)}"
}), 500
#################################################
# Book Recommender Routes
#################################################
@app.route('/api/recommend', methods=['POST'])
def recommend():
"""Endpoint to get book recommendations based on user query."""
global recommender_model_loaded
if not recommender_model_loaded:
return jsonify({
"error": "Model not loaded",
"message": "The recommendation model is not properly loaded."
}), 503
data = request.get_json()
if not data:
return jsonify({
"error": "Invalid request",
"message": "No JSON data provided."
}), 400
query = data.get('query')
top_n = data.get('top_n', 5)
include_description = data.get('include_description', True)
threshold = data.get('threshold', 0.5) # default threshold
if not query:
return jsonify({
"error": "Missing parameter",
"message": "Query parameter is required."
}), 400
try:
# Get recommendations
recommendations = recommender.recommend_books(
user_query=query,
top_n=int(top_n),
include_description=bool(include_description)
)
# Clean recommendations to make it JSON serializable
def clean_np(obj):
if isinstance(obj, np.integer):
return int(obj)
elif isinstance(obj, np.floating):
return float(obj)
elif isinstance(obj, np.ndarray):
return obj.tolist()
elif isinstance(obj, dict):
return {k: clean_np(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [clean_np(i) for i in obj]
else:
return obj
recommendations_clean = clean_np(recommendations)
# Split based on threshold
high_score = [rec for rec in recommendations_clean if rec['relevance_score'] >= threshold]
low_score = [rec for rec in recommendations_clean if rec['relevance_score'] < threshold]
return jsonify({
"query": query,
"threshold": threshold,
"high_recommendations": high_score,
"low_recommendations": low_score,
"total_count": len(recommendations_clean),
"high_count": len(high_score),
"low_count": len(low_score)
})
except Exception as e:
logger.error(f"Error in recommendation endpoint: {str(e)}", exc_info=True)
return jsonify({
"error": "Processing error",
"message": f"An error occurred while processing your request: {str(e)}"
}), 500
#################################################
# Main
#################################################
if __name__ == '__main__':
# Initialize models when the app starts
models_loaded = initialize_models()
# Set port from environment variable or default to 5000
port = int(os.environ.get('PORT', 5000))
# For development use debug=True, for production use debug=False
app.run(host='0.0.0.0', port=port, debug=False, use_reloader=False)
#curl -X POST http://localhost:5000/api/analyze \-H "Content-Type: application/json" \-d '{"text": "cariin buku", "method": "combined"}'
#curl -X POST http://localhost:5000/api/recommend \-H "Content-Type: application/json" \-d '{"query": "programming for begginers","top_n": 10,"include_description": true}'