LeetCodePredictions / related_topics_prediction.py
noa151's picture
Upload related_topics_prediction.py
4505891 verified
import pandas as pd
import numpy as np
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.preprocessing import MultiLabelBinarizer
from sklearn.model_selection import train_test_split, StratifiedKFold, GridSearchCV
from sklearn.multiclass import OneVsRestClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, jaccard_score, hamming_loss
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier
from sklearn.neighbors import KNeighborsClassifier
import xgboost as xgb
from sklearn.metrics import precision_score, recall_score, f1_score
import warnings
import joblib
import random
import os
import torch
import tensorflow as tf
def set_all_seeds(seed=42):
"""Set all seeds to make results reproducible"""
random.seed(seed) # Python
np.random.seed(seed) # Numpy
random.seed(seed) # Sklearn
tf.random.set_seed(seed) # Tensorflow
torch.manual_seed(seed) # Torch
if torch.cuda.is_available():
torch.cuda.manual_seed(seed)
os.environ['PYTHONHASHSEED'] = str(seed) # Environment
class MultiLabelThresholdOptimizer:
def __init__(self, n_splits=5, random_state=42):
self.n_splits = n_splits
self.random_state = random_state
self.optimal_thresholds = {}
def find_optimal_thresholds(self, y_true, y_pred_proba):
"""Find optimal threshold for each label using F1 score"""
n_labels = y_true.shape[1]
thresholds = np.zeros(n_labels)
for label in range(n_labels):
best_f1 = 0
best_threshold = 0.5
# Use fixed thresholds to ensure reproducibility
for threshold in np.arange(0.1, 0.9, 0.05):
y_pred = (y_pred_proba[:, label] >= threshold).astype(int)
f1 = f1_score(y_true[:, label], y_pred, zero_division=0)
if f1 > best_f1:
best_f1 = f1
best_threshold = threshold
thresholds[label] = best_threshold
return thresholds
def fit(self, X, y, model, model_name):
"""Find and save optimal thresholds using cross validation"""
kf = StratifiedKFold(n_splits=self.n_splits, shuffle=True, random_state=self.random_state)
fold_thresholds = []
for train_idx, val_idx in kf.split(X, y[:, 0]):
X_val = X[val_idx]
y_val = y[val_idx]
if isinstance(X, np.ndarray):
y_pred_proba = model.predict_proba(X_val)
else:
y_pred_proba = model.predict_proba(X_val)
fold_thresholds.append(self.find_optimal_thresholds(y_val, y_pred_proba))
final_thresholds = np.median(fold_thresholds, axis=0)
self.optimal_thresholds[model_name] = final_thresholds
return final_thresholds
def predict(self, model, X, model_name):
if model_name not in self.optimal_thresholds:
raise ValueError(f"No thresholds found for model: {model_name}")
if isinstance(X, np.ndarray):
y_pred_proba = model.predict_proba(X)
else:
y_pred_proba = model.predict_proba(X)
thresholds = self.optimal_thresholds[model_name]
y_pred = np.zeros_like(y_pred_proba)
for label in range(y_pred_proba.shape[1]):
y_pred[:, label] = (y_pred_proba[:, label] >= thresholds[label]).astype(int)
return y_pred
def compare_models(results):
""" Compare models across all metrics and provide rankings.
Now includes rankings for:
- Precision
- Recall
- F1 Score
- Subset Accuracy
- Hamming Accuracy
- Jaccard Score """
metrics = ['precision', 'recall', 'f1', 'subset_accuracy', 'hamming_accuracy', 'jaccard_score']
rankings = {metric: {} for metric in metrics}
# Rank models for each metric
for metric in metrics:
sorted_models = sorted(results.items(), key=lambda x: x[1][metric], reverse=True)
for rank, (model_name, _) in enumerate(sorted_models, 1):
rankings[metric][model_name] = rank
# Compute average ranking across all metrics
average_rankings = {}
for model_name in results.keys():
model_ranks = [rankings[metric][model_name] for metric in metrics]
average_rankings[model_name] = sum(model_ranks) / len(metrics)
# Sort models by average ranking (lower is better)
final_ranking = sorted(average_rankings.items(), key=lambda x: x[1])
# Print detailed comparison
print("\n๐Ÿ† Model Comparison Results:")
print("\n๐Ÿ“Š Detailed Metrics and Rankings:")
headers = ['Model', 'Precision', 'Recall', 'F1 Score', 'Subset Acc', 'Hamming Acc', 'Jaccard', 'Avg Rank']
print('-' * 120)
print(f"{headers[0]:<24} {headers[1]:<12} {headers[2]:<11} {headers[3]:<10} {headers[4]:<10} {headers[5]:<12} {headers[6]:<10} {headers[7]:<8}")
print('-' * 120)
for model_name in results.keys():
metrics = results[model_name]
print(f"{model_name:<20} "
f"{metrics['precision']:>11.3f} "
f"{metrics['recall']:>11.3f} "
f"{metrics['f1']:>11.3f} "
f"{metrics['subset_accuracy']:>11.3f} "
f"{metrics['hamming_accuracy']:>11.3f} "
f"{metrics['jaccard_score']:>11.3f} "
f"{average_rankings[model_name]:>8.2f}")
print('-' * 120)
# Print final rankings
print("\n๐ŸŽฏ Final Model Rankings (based on average performance across all metrics):")
for rank, (model_name, avg_rank) in enumerate(final_ranking, 1):
print(f"{rank}. {model_name:<20} (Average Rank: {avg_rank:.2f})")
# Identify best model
best_model = final_ranking[0][0]
print(f"\n๐Ÿฅ‡ Best Overall Model: {best_model}")
print("\n๐Ÿ“Œ Detailed strengths of the best model:")
print(f" - Precision: {results[best_model]['precision']:.3f}")
print(f" - Recall: {results[best_model]['recall']:.3f}")
print(f" - F1 Score: {results[best_model]['f1']:.3f}")
print(f" - Subset Accuracy: {results[best_model]['subset_accuracy']:.3f}")
print(f" - Hamming Accuracy: {results[best_model]['hamming_accuracy']:.3f}")
print(f" - Jaccard Score: {results[best_model]['jaccard_score']:.3f}")
return best_model, results[best_model]
def save_best_model_info(best_model_name, model_metrics, threshold):
""" Save information about the best model """
best_model_info = {
'model_name': best_model_name,
'metrics': model_metrics,
'threshold': threshold
}
joblib.dump(best_model_info, 'best_model_related_topics_info.pkl')
def evaluate_model_related(y_test, y_pred, model_name):
"""Evaluate model performance with additional accuracy metrics"""
precision_weighted = precision_score(y_test, y_pred, average='weighted', zero_division=0)
recall_weighted = recall_score(y_test, y_pred, average='weighted', zero_division=0)
f1_weighted = f1_score(y_test, y_pred, average='weighted', zero_division=0)
# Subset accuracy (Exact match ratio)
subset_accuracy = accuracy_score(y_test, y_pred)
# Hamming accuracy (1 - Hamming loss)
hamming_acc = 1 - hamming_loss(y_test, y_pred)
# Jaccard similarity score (macro averaged across all labels)
jaccard_macro = jaccard_score(y_test, y_pred, average='samples', zero_division=0)
return {
'precision': precision_weighted,
'recall': recall_weighted,
'f1': f1_weighted,
'subset_accuracy': subset_accuracy,
'hamming_accuracy': hamming_acc,
'jaccard_score': jaccard_macro
}
def related_topics_prediction():
# Set all seeds for reproducibility
SEED = 42
set_all_seeds(SEED)
warnings.filterwarnings("ignore", category=UserWarning)
# Load and preprocess data
print("Loading and preprocessing data...")
df = pd.read_csv("data.csv")
df = df.dropna(subset=['related_topics'])
df['description'] = df['description'].str.lower().fillna('')
df['related_topics'] = df['related_topics'].apply(lambda x: x.split(',') if isinstance(x, str) else [])
# Extract unique topics
all_possible_topics = sorted(set(topic for topics in df['related_topics'] for topic in topics))
print(f"\nโœ… Found {len(all_possible_topics)} unique topics.")
# Prepare features and labels with deterministic behavior
vectorizer = TfidfVectorizer(
max_features=5000,
ngram_range=(1, 3),
stop_words='english'
)
X = vectorizer.fit_transform(df['description'])
joblib.dump(vectorizer, 'related_topics_vectorizer.pkl')
mlb = MultiLabelBinarizer(classes=all_possible_topics)
y = mlb.fit_transform(df['related_topics'])
joblib.dump(mlb, 'related_topics_label_binarizer.pkl')
# Split dataset with fixed random state
X_train, X_test, y_train, y_test, desc_train, desc_test = train_test_split(
X, y, df['description'], test_size=0.2, random_state=SEED, shuffle=True
)
# Initialize models with fixed random states
models = {
'SVM': OneVsRestClassifier(SVC(kernel='linear', probability=True, random_state=SEED)),
'Logistic_Regression': OneVsRestClassifier(LogisticRegression(max_iter=1000, random_state=SEED)),
'Random_Forest': OneVsRestClassifier(RandomForestClassifier(n_estimators=100, random_state=SEED)),
'KNN': OneVsRestClassifier(KNeighborsClassifier(n_neighbors=5)),
'Gradient_Boosting': OneVsRestClassifier(GradientBoostingClassifier(n_estimators=100, random_state=SEED)),
'XGBoost': xgb.XGBClassifier(
n_estimators=100,
use_label_encoder=False,
eval_metric='mlogloss',
random_state=SEED,
seed=SEED
)
}
# Initialize threshold optimizer
optimizer = MultiLabelThresholdOptimizer(random_state=SEED)
results = {}
results_threshold = {}
# Train and optimize each model
for model_name, model in models.items():
print(f"\nโณ Training {model_name} model...")
model.fit(X_train, y_train)
print(f"Finding optimal thresholds for {model_name}...")
thresholds = optimizer.fit(X_train.toarray() if not isinstance(X_train, np.ndarray) else X_train,
y_train, model, model_name)
results_threshold[model_name] = thresholds
y_pred = optimizer.predict(model, X_test, model_name)
results[model_name] = evaluate_model_related(y_test, y_pred, model_name)
print("\nSelecting best model...")
best_model_name, best_model_metrics = compare_models(results)
save_best_model_info(best_model_name, best_model_metrics, results_threshold[best_model_name])
trained_best_model = models[best_model_name]
# If it's a GridSearchCV model, extract the best estimator
if isinstance(trained_best_model, GridSearchCV):
trained_best_model = trained_best_model.best_estimator_
joblib.dump(trained_best_model, "best_related_topics_model.pkl")
print(f"โœ… Best trained model saved as best_related_topics_model.pkl")
# Display sample predictions with fixed indices
print("\n๐Ÿ“Œ Sample Predictions with Optimized Thresholds:")
num_samples = 5
# Use fixed indices instead of random sampling
sample_indices = list(range(min(5, len(X_test.toarray()))))
for idx in sample_indices:
print(f"\nDescription: {desc_test.iloc[idx][:100]}...")
print(f"โœ… True Topics: {', '.join(mlb.inverse_transform(np.array([y_test[idx]]))[0])}")
for model_name in models.keys():
y_pred = optimizer.predict(models[model_name], X_test[idx], model_name)
predicted_labels = mlb.inverse_transform(y_pred)[0]
print(f"๐Ÿ”ฎ Predicted ({model_name}): {', '.join(predicted_labels) if predicted_labels else 'None'}")
print("\nโœ… Training and evaluation completed. Models and thresholds saved.")
if __name__ == "__main__":
related_topics_prediction()