Spaces:
Sleeping
Sleeping
import pandas as pd | |
import numpy as np | |
from sklearn.base import BaseEstimator, TransformerMixin | |
from sklearn.model_selection import StratifiedGroupKFold | |
from skopt import BayesSearchCV | |
from sklearn.ensemble import RandomForestClassifier | |
from sklearn.svm import SVC | |
from xgboost import XGBClassifier | |
import joblib | |
from skopt.space import Real, Integer, Categorical | |
from sklearn.metrics import classification_report, accuracy_score | |
import json | |
from sklearn.preprocessing import LabelEncoder | |
#from _config import config | |
class TrainModel(BaseEstimator, TransformerMixin): | |
def __init__(self, classifier, train_label, target): | |
#self.config = config | |
#self.target = config.get("target_label", None) # User-defined target label in config | |
self.classifier = classifier | |
self.train_label = train_label | |
self.target = target | |
self.label_encoder = LabelEncoder() | |
#self.selected_domains = self.config.get("selected_domains", "All domains") # Default to all domains if None | |
#if not self.target: | |
# raise ValueError("No target label specified in the config. Please set 'target_label'.") | |
def get_default_param_space(self, classifier): | |
""" Returns the default hyperparameter space for a given classifier. """ | |
if classifier == 'xgboost': | |
return { | |
'learning_rate': Real(0.01, 0.3, prior='log-uniform'), | |
'n_estimators': Integer(100, 1000), | |
'max_depth': Integer(3, 10), | |
'min_child_weight': (1, 10), | |
'subsample': (0.5, 1.0), | |
'colsample_bytree': (0.5, 1.0), | |
'gamma': (0, 10), | |
'reg_alpha': (0, 10), | |
'reg_lambda': (0, 10), | |
} | |
elif classifier == 'svm': | |
return { | |
'C': Real(0.1, 10, prior='log-uniform'), | |
'kernel': Categorical(['linear', 'rbf']) | |
} | |
elif classifier == 'randomforest': | |
return { | |
'n_estimators': Integer(100, 1000), | |
'max_depth': Integer(3, 10) | |
} | |
else: | |
raise ValueError(f"Unsupported classifier type: {classifier}") | |
def fit(self, X, y=None): | |
# Ensure the target column exists in the dataset | |
if self.target not in X.columns: | |
raise ValueError(f"Target label '{self.target}' not found in the dataset.") | |
# Fit the label encoder on the target column | |
print(f"Encoding the target labels for '{self.target}'...") | |
self.label_encoder.fit(X[self.target]) | |
# Print the mapping between original labels and encoded labels | |
original_labels = list(self.label_encoder.classes_) | |
encoded_labels = list(range(len(original_labels))) | |
label_mapping = dict(zip(encoded_labels, original_labels)) | |
print(f"Label encoding complete. Mapping: {label_mapping}") | |
# Transform the target column and add it as 'encoded_target' | |
X['encoded_target'] = self.label_encoder.transform(X[self.target]) | |
# Value counts for the encoded target | |
value_counts = X['encoded_target'].value_counts().to_dict() | |
print(f"Value counts for encoded target: {value_counts}") | |
print(X.columns) | |
# Pop unnecessary columns (groupid, emotion labels not being used, etc.) | |
groups = X.pop('groupid') | |
print(f"Group IDs popped from the dataset.") | |
# Pop the label columns which aren't used | |
self.train_label = self.train_label.split(",") | |
for label in self.train_label: | |
X.pop(label) | |
print(f"Label columns popped from the dataset.") | |
# Pop the encoded target as Y | |
y = X.pop('encoded_target') | |
print(f"Encoded target column popped from the dataset.") | |
print(X.columns) | |
# Store the feature names for later use | |
feature_names = X.columns.tolist() | |
print(f"hallo") | |
# Choose classifier | |
classifier = self.classifier | |
if classifier == 'xgboost': | |
model = XGBClassifier(objective='multi:softmax', random_state=42) | |
elif classifier == 'svm': | |
model = SVC(probability=True) | |
elif classifier == 'randomforest': | |
model = RandomForestClassifier(random_state=42) | |
else: | |
raise ValueError(f"Unsupported classifier type: {classifier}") | |
print(f"Training the model using {classifier}...") | |
# Use user-defined param_space if provided, otherwise use default | |
print(f"Classifier: {classifier}") | |
default_param_space = self.get_default_param_space(classifier) | |
param_space = default_param_space | |
# Hyperparameter tuning using Bayesian optimization | |
sgkf = StratifiedGroupKFold(n_splits=5) | |
print(f"Parameter space being used: {param_space}") | |
if param_space is None: | |
raise ValueError("Parameter space cannot be None. Please check the classifier configuration.") | |
opt = BayesSearchCV( | |
estimator=model, | |
search_spaces=param_space, | |
cv=sgkf, | |
n_iter=5, | |
n_jobs=-1, | |
n_points=1, | |
verbose=1, | |
scoring='accuracy' | |
) | |
print("Hyperparameter tuning in progress...") | |
print(X.describe(),X.columns) | |
print(f"stop") | |
# Fit the model using the encoded target | |
opt.fit(X, y, groups=groups) | |
self.best_model = opt.best_estimator_ | |
print(f"Best parameters found: {opt.best_params_}") | |
# Print classification metrics | |
y_pred = self.best_model.predict(X) | |
accuracy = accuracy_score(y, y_pred) | |
report = classification_report(y, y_pred, target_names=self.label_encoder.classes_, output_dict=True) | |
# Save classification report | |
classification_report_json = report | |
with open(f'classification_report_{self.target}.json', 'w') as f: | |
json.dump(classification_report_json, f, indent=4) | |
print(f"Accuracy: {accuracy}") | |
print(f"Classification Report:\n{report}") | |
# Save the best model with the target label in the file name | |
model_name = f"{classifier}_best_model_{self.target}.pkl" | |
joblib.dump(self.best_model, model_name) | |
print("Model saved successfully.") | |
# Save model metadata | |
model_metadata = { | |
"best_params": opt.best_params_, | |
"accuracy": accuracy, | |
"classification_report": classification_report_json, | |
"label_mapping": label_mapping, | |
"model_name": model_name, | |
"value_counts": value_counts, | |
#"selected_domains": self.selected_domains, | |
#"include_magnitude": self.config.get("include_magnitude", True) | |
} | |
if hasattr(self.best_model, "feature_importances_"): | |
feature_importances = self.best_model.feature_importances_ | |
# Convert feature importances to native Python floats | |
feature_importance_dict = {feature: float(importance) for feature, importance in zip(feature_names, feature_importances)} | |
model_metadata["feature_importances"] = feature_importance_dict | |
print("Feature Importances:") | |
for feature, importance in feature_importance_dict.items(): | |
print(f"{feature}: {importance:.4f}") | |
# Save metadata with the target name in the file name | |
metadata_file = f"{classifier}_model_metadata_{self.target}.json" | |
with open(metadata_file, "w") as f: | |
json.dump(model_metadata, f, indent=4) | |
print(f"Model metadata saved to {metadata_file}.") | |
# Save file paths internally for later retrieval | |
self.model_file = f"{classifier}_best_model_{self.target}.pkl" | |
self.metadata_file = f"{classifier}_model_metadata_{self.target}.json" | |
return self | |
def get_output_files(self): | |
return self.model_file, self.metadata_file | |
def transform(self, X): | |
return X # Placeholder for transform step (not needed for training) |