Spaces:
Sleeping
Sleeping
import pandas as pd | |
import numpy as np | |
from sklearn.feature_extraction.text import TfidfVectorizer | |
from sklearn.preprocessing import MultiLabelBinarizer | |
from sklearn.model_selection import train_test_split, StratifiedKFold, GridSearchCV | |
from sklearn.multiclass import OneVsRestClassifier | |
from sklearn.linear_model import LogisticRegression | |
from sklearn.metrics import accuracy_score, jaccard_score, hamming_loss | |
from sklearn.svm import SVC | |
from sklearn.ensemble import RandomForestClassifier, GradientBoostingClassifier | |
from sklearn.neighbors import KNeighborsClassifier | |
import xgboost as xgb | |
from sklearn.metrics import precision_score, recall_score, f1_score | |
import warnings | |
import joblib | |
import random | |
import os | |
import torch | |
import tensorflow as tf | |
def set_all_seeds(seed=42): | |
"""Set all seeds to make results reproducible""" | |
random.seed(seed) # Python | |
np.random.seed(seed) # Numpy | |
random.seed(seed) # Sklearn | |
tf.random.set_seed(seed) # Tensorflow | |
torch.manual_seed(seed) # Torch | |
if torch.cuda.is_available(): | |
torch.cuda.manual_seed(seed) | |
os.environ['PYTHONHASHSEED'] = str(seed) # Environment | |
class MultiLabelThresholdOptimizer: | |
def __init__(self, n_splits=5, random_state=42): | |
self.n_splits = n_splits | |
self.random_state = random_state | |
self.optimal_thresholds = {} | |
def find_optimal_thresholds(self, y_true, y_pred_proba): | |
"""Find optimal threshold for each label using F1 score""" | |
n_labels = y_true.shape[1] | |
thresholds = np.zeros(n_labels) | |
for label in range(n_labels): | |
best_f1 = 0 | |
best_threshold = 0.5 | |
# Use fixed thresholds to ensure reproducibility | |
for threshold in np.arange(0.1, 0.9, 0.05): | |
y_pred = (y_pred_proba[:, label] >= threshold).astype(int) | |
f1 = f1_score(y_true[:, label], y_pred, zero_division=0) | |
if f1 > best_f1: | |
best_f1 = f1 | |
best_threshold = threshold | |
thresholds[label] = best_threshold | |
return thresholds | |
def fit(self, X, y, model, model_name): | |
"""Find and save optimal thresholds using cross validation""" | |
kf = StratifiedKFold(n_splits=self.n_splits, shuffle=True, random_state=self.random_state) | |
fold_thresholds = [] | |
for train_idx, val_idx in kf.split(X, y[:, 0]): | |
X_val = X[val_idx] | |
y_val = y[val_idx] | |
if isinstance(X, np.ndarray): | |
y_pred_proba = model.predict_proba(X_val) | |
else: | |
y_pred_proba = model.predict_proba(X_val) | |
fold_thresholds.append(self.find_optimal_thresholds(y_val, y_pred_proba)) | |
final_thresholds = np.median(fold_thresholds, axis=0) | |
self.optimal_thresholds[model_name] = final_thresholds | |
return final_thresholds | |
def predict(self, model, X, model_name): | |
if model_name not in self.optimal_thresholds: | |
raise ValueError(f"No thresholds found for model: {model_name}") | |
if isinstance(X, np.ndarray): | |
y_pred_proba = model.predict_proba(X) | |
else: | |
y_pred_proba = model.predict_proba(X) | |
thresholds = self.optimal_thresholds[model_name] | |
y_pred = np.zeros_like(y_pred_proba) | |
for label in range(y_pred_proba.shape[1]): | |
y_pred[:, label] = (y_pred_proba[:, label] >= thresholds[label]).astype(int) | |
return y_pred | |
def compare_models(results): | |
""" Compare models across all metrics and provide rankings. | |
Now includes rankings for: | |
- Precision | |
- Recall | |
- F1 Score | |
- Subset Accuracy | |
- Hamming Accuracy | |
- Jaccard Score """ | |
metrics = ['precision', 'recall', 'f1', 'subset_accuracy', 'hamming_accuracy', 'jaccard_score'] | |
rankings = {metric: {} for metric in metrics} | |
# Rank models for each metric | |
for metric in metrics: | |
sorted_models = sorted(results.items(), key=lambda x: x[1][metric], reverse=True) | |
for rank, (model_name, _) in enumerate(sorted_models, 1): | |
rankings[metric][model_name] = rank | |
# Compute average ranking across all metrics | |
average_rankings = {} | |
for model_name in results.keys(): | |
model_ranks = [rankings[metric][model_name] for metric in metrics] | |
average_rankings[model_name] = sum(model_ranks) / len(metrics) | |
# Sort models by average ranking (lower is better) | |
final_ranking = sorted(average_rankings.items(), key=lambda x: x[1]) | |
# Print detailed comparison | |
print("\n๐ Model Comparison Results:") | |
print("\n๐ Detailed Metrics and Rankings:") | |
headers = ['Model', 'Precision', 'Recall', 'F1 Score', 'Subset Acc', 'Hamming Acc', 'Jaccard', 'Avg Rank'] | |
print('-' * 120) | |
print(f"{headers[0]:<24} {headers[1]:<12} {headers[2]:<11} {headers[3]:<10} {headers[4]:<10} {headers[5]:<12} {headers[6]:<10} {headers[7]:<8}") | |
print('-' * 120) | |
for model_name in results.keys(): | |
metrics = results[model_name] | |
print(f"{model_name:<20} " | |
f"{metrics['precision']:>11.3f} " | |
f"{metrics['recall']:>11.3f} " | |
f"{metrics['f1']:>11.3f} " | |
f"{metrics['subset_accuracy']:>11.3f} " | |
f"{metrics['hamming_accuracy']:>11.3f} " | |
f"{metrics['jaccard_score']:>11.3f} " | |
f"{average_rankings[model_name]:>8.2f}") | |
print('-' * 120) | |
# Print final rankings | |
print("\n๐ฏ Final Model Rankings (based on average performance across all metrics):") | |
for rank, (model_name, avg_rank) in enumerate(final_ranking, 1): | |
print(f"{rank}. {model_name:<20} (Average Rank: {avg_rank:.2f})") | |
# Identify best model | |
best_model = final_ranking[0][0] | |
print(f"\n๐ฅ Best Overall Model: {best_model}") | |
print("\n๐ Detailed strengths of the best model:") | |
print(f" - Precision: {results[best_model]['precision']:.3f}") | |
print(f" - Recall: {results[best_model]['recall']:.3f}") | |
print(f" - F1 Score: {results[best_model]['f1']:.3f}") | |
print(f" - Subset Accuracy: {results[best_model]['subset_accuracy']:.3f}") | |
print(f" - Hamming Accuracy: {results[best_model]['hamming_accuracy']:.3f}") | |
print(f" - Jaccard Score: {results[best_model]['jaccard_score']:.3f}") | |
return best_model, results[best_model] | |
def save_best_model_info(best_model_name, model_metrics, threshold): | |
""" Save information about the best model """ | |
best_model_info = { | |
'model_name': best_model_name, | |
'metrics': model_metrics, | |
'threshold': threshold | |
} | |
joblib.dump(best_model_info, 'best_model_related_topics_info.pkl') | |
def evaluate_model_related(y_test, y_pred, model_name): | |
"""Evaluate model performance with additional accuracy metrics""" | |
precision_weighted = precision_score(y_test, y_pred, average='weighted', zero_division=0) | |
recall_weighted = recall_score(y_test, y_pred, average='weighted', zero_division=0) | |
f1_weighted = f1_score(y_test, y_pred, average='weighted', zero_division=0) | |
# Subset accuracy (Exact match ratio) | |
subset_accuracy = accuracy_score(y_test, y_pred) | |
# Hamming accuracy (1 - Hamming loss) | |
hamming_acc = 1 - hamming_loss(y_test, y_pred) | |
# Jaccard similarity score (macro averaged across all labels) | |
jaccard_macro = jaccard_score(y_test, y_pred, average='samples', zero_division=0) | |
return { | |
'precision': precision_weighted, | |
'recall': recall_weighted, | |
'f1': f1_weighted, | |
'subset_accuracy': subset_accuracy, | |
'hamming_accuracy': hamming_acc, | |
'jaccard_score': jaccard_macro | |
} | |
def related_topics_prediction(): | |
# Set all seeds for reproducibility | |
SEED = 42 | |
set_all_seeds(SEED) | |
warnings.filterwarnings("ignore", category=UserWarning) | |
# Load and preprocess data | |
print("Loading and preprocessing data...") | |
df = pd.read_csv("data.csv") | |
df = df.dropna(subset=['related_topics']) | |
df['description'] = df['description'].str.lower().fillna('') | |
df['related_topics'] = df['related_topics'].apply(lambda x: x.split(',') if isinstance(x, str) else []) | |
# Extract unique topics | |
all_possible_topics = sorted(set(topic for topics in df['related_topics'] for topic in topics)) | |
print(f"\nโ Found {len(all_possible_topics)} unique topics.") | |
# Prepare features and labels with deterministic behavior | |
vectorizer = TfidfVectorizer( | |
max_features=5000, | |
ngram_range=(1, 3), | |
stop_words='english' | |
) | |
X = vectorizer.fit_transform(df['description']) | |
joblib.dump(vectorizer, 'related_topics_vectorizer.pkl') | |
mlb = MultiLabelBinarizer(classes=all_possible_topics) | |
y = mlb.fit_transform(df['related_topics']) | |
joblib.dump(mlb, 'related_topics_label_binarizer.pkl') | |
# Split dataset with fixed random state | |
X_train, X_test, y_train, y_test, desc_train, desc_test = train_test_split( | |
X, y, df['description'], test_size=0.2, random_state=SEED, shuffle=True | |
) | |
# Initialize models with fixed random states | |
models = { | |
'SVM': OneVsRestClassifier(SVC(kernel='linear', probability=True, random_state=SEED)), | |
'Logistic_Regression': OneVsRestClassifier(LogisticRegression(max_iter=1000, random_state=SEED)), | |
'Random_Forest': OneVsRestClassifier(RandomForestClassifier(n_estimators=100, random_state=SEED)), | |
'KNN': OneVsRestClassifier(KNeighborsClassifier(n_neighbors=5)), | |
'Gradient_Boosting': OneVsRestClassifier(GradientBoostingClassifier(n_estimators=100, random_state=SEED)), | |
'XGBoost': xgb.XGBClassifier( | |
n_estimators=100, | |
use_label_encoder=False, | |
eval_metric='mlogloss', | |
random_state=SEED, | |
seed=SEED | |
) | |
} | |
# Initialize threshold optimizer | |
optimizer = MultiLabelThresholdOptimizer(random_state=SEED) | |
results = {} | |
results_threshold = {} | |
# Train and optimize each model | |
for model_name, model in models.items(): | |
print(f"\nโณ Training {model_name} model...") | |
model.fit(X_train, y_train) | |
print(f"Finding optimal thresholds for {model_name}...") | |
thresholds = optimizer.fit(X_train.toarray() if not isinstance(X_train, np.ndarray) else X_train, | |
y_train, model, model_name) | |
results_threshold[model_name] = thresholds | |
y_pred = optimizer.predict(model, X_test, model_name) | |
results[model_name] = evaluate_model_related(y_test, y_pred, model_name) | |
print("\nSelecting best model...") | |
best_model_name, best_model_metrics = compare_models(results) | |
save_best_model_info(best_model_name, best_model_metrics, results_threshold[best_model_name]) | |
trained_best_model = models[best_model_name] | |
# If it's a GridSearchCV model, extract the best estimator | |
if isinstance(trained_best_model, GridSearchCV): | |
trained_best_model = trained_best_model.best_estimator_ | |
joblib.dump(trained_best_model, "best_related_topics_model.pkl") | |
print(f"โ Best trained model saved as best_related_topics_model.pkl") | |
# Display sample predictions with fixed indices | |
print("\n๐ Sample Predictions with Optimized Thresholds:") | |
num_samples = 5 | |
# Use fixed indices instead of random sampling | |
sample_indices = list(range(min(5, len(X_test.toarray())))) | |
for idx in sample_indices: | |
print(f"\nDescription: {desc_test.iloc[idx][:100]}...") | |
print(f"โ True Topics: {', '.join(mlb.inverse_transform(np.array([y_test[idx]]))[0])}") | |
for model_name in models.keys(): | |
y_pred = optimizer.predict(models[model_name], X_test[idx], model_name) | |
predicted_labels = mlb.inverse_transform(y_pred)[0] | |
print(f"๐ฎ Predicted ({model_name}): {', '.join(predicted_labels) if predicted_labels else 'None'}") | |
print("\nโ Training and evaluation completed. Models and thresholds saved.") | |
if __name__ == "__main__": | |
related_topics_prediction() | |