NLP_Homework_1 / src /classical_classifiers.py
Kolesnikov Dmitry
feat: Попытка навайбкодить 3 и 4 лабораторные
68545bc
"""
Классические методы классификации текстов: логистическая регрессия, SVM,
случайный лес, градиентный бустинг, ансамбли и AutoML подходы.
"""
from __future__ import annotations
import time
from dataclasses import dataclass
from typing import List, Dict, Any, Optional, Tuple, Union
import numpy as np
import pandas as pd
from sklearn.linear_model import LogisticRegression
from sklearn.svm import SVC
from sklearn.ensemble import RandomForestClassifier, VotingClassifier, BaggingClassifier
from sklearn.tree import DecisionTreeClassifier
from sklearn.model_selection import cross_val_score, StratifiedKFold
from sklearn.multioutput import MultiOutputClassifier
from sklearn.multiclass import OneVsRestClassifier
from sklearn.metrics import (
accuracy_score, precision_score, recall_score, f1_score,
roc_auc_score, classification_report, confusion_matrix,
precision_recall_curve, roc_curve
)
try:
import xgboost as xgb
XGBOOST_AVAILABLE = True
except ImportError:
XGBOOST_AVAILABLE = False
try:
import lightgbm as lgb
LIGHTGBM_AVAILABLE = True
except ImportError:
LIGHTGBM_AVAILABLE = False
try:
import catboost as cb
CATBOOST_AVAILABLE = True
except ImportError:
CATBOOST_AVAILABLE = False
try:
import autosklearn.classification
AUTOSKLEARN_AVAILABLE = True
except ImportError:
AUTOSKLEARN_AVAILABLE = False
try:
from tpot import TPOTClassifier
TPOT_AVAILABLE = True
except ImportError:
TPOT_AVAILABLE = False
try:
import h2o
from h2o.automl import H2OAutoML
H2O_AVAILABLE = True
except ImportError:
H2O_AVAILABLE = False
@dataclass
class ClassifierConfig:
"""Конфигурация классификатора."""
name: str
model_type: str # lr, svm, rf, xgb, lgb, catboost, ensemble, autosklearn, tpot, h2o
params: Dict[str, Any] = None
use_class_weight: bool = True
multilabel: bool = False # Использовать MultiOutputClassifier для multilabel
class ClassicalClassifiers:
"""Класс для работы с классическими методами классификации."""
def __init__(self, config: ClassifierConfig):
self.config = config
self.model = self._create_model()
self.train_time = 0.0
self.predict_time = 0.0
def _create_model(self):
"""Создает модель на основе конфигурации."""
model_type = self.config.model_type.lower()
params = self.config.params or {}
base_model = None
if model_type == "lr":
base_model = LogisticRegression(
max_iter=1000,
random_state=42,
class_weight="balanced" if self.config.use_class_weight else None,
**params
)
elif model_type == "svm":
base_model = SVC(
probability=True,
random_state=42,
class_weight="balanced" if self.config.use_class_weight else None,
**params
)
elif model_type == "rf":
base_model = RandomForestClassifier(
n_estimators=100,
random_state=42,
class_weight="balanced" if self.config.use_class_weight else None,
**params
)
# Обертываем в MultiOutputClassifier для multilabel
if self.config.multilabel and base_model is not None:
return MultiOutputClassifier(base_model)
if base_model is not None:
return base_model
if model_type == "xgb" and XGBOOST_AVAILABLE:
model = xgb.XGBClassifier(
random_state=42,
eval_metric='mlogloss',
**params
)
return MultiOutputClassifier(model) if self.config.multilabel else model
if model_type == "lgb" and LIGHTGBM_AVAILABLE:
model = lgb.LGBMClassifier(
random_state=42,
verbose=-1,
**params
)
return MultiOutputClassifier(model) if self.config.multilabel else model
if model_type == "catboost" and CATBOOST_AVAILABLE:
model = cb.CatBoostClassifier(
random_state=42,
verbose=False,
**params
)
return MultiOutputClassifier(model) if self.config.multilabel else model
if model_type == "ensemble":
# Voting Classifier
estimators = [
('lr', LogisticRegression(max_iter=1000, random_state=42)),
('svm', SVC(probability=True, random_state=42)),
('rf', RandomForestClassifier(n_estimators=50, random_state=42))
]
model = VotingClassifier(estimators=estimators, voting='soft')
return MultiOutputClassifier(model) if self.config.multilabel else model
if model_type == "bagging":
base = DecisionTreeClassifier(random_state=42)
model = BaggingClassifier(
base_estimator=base,
n_estimators=10,
random_state=42,
**params
)
return MultiOutputClassifier(model) if self.config.multilabel else model
if model_type == "autosklearn" and AUTOSKLEARN_AVAILABLE:
model = autosklearn.classification.AutoSklearnClassifier(
time_left_for_this_task=300, # 5 минут
memory_limit=4096,
**params
)
# AutoSklearn может не поддерживать multilabel напрямую
return model
if model_type == "tpot" and TPOT_AVAILABLE:
model = TPOTClassifier(
generations=5,
population_size=20,
verbosity=2,
random_state=42,
**params
)
# TPOT может не поддерживать multilabel напрямую
return model
raise ValueError(f"Неизвестный тип модели: {model_type} или библиотека недоступна")
def fit(self, X, y):
"""Обучение модели."""
start = time.time()
self.model.fit(X, y)
self.train_time = time.time() - start
return self
def predict(self, X):
"""Предсказание классов."""
start = time.time()
predictions = self.model.predict(X)
self.predict_time = time.time() - start
return predictions
def predict_proba(self, X):
"""Предсказание вероятностей."""
if hasattr(self.model, 'predict_proba'):
return self.model.predict_proba(X)
return None
def get_feature_importance(self):
"""Получение важности признаков (если доступно)."""
if hasattr(self.model, 'feature_importances_'):
return self.model.feature_importances_
elif hasattr(self.model, 'coef_'):
return np.abs(self.model.coef_[0]) if len(self.model.coef_.shape) > 1 else np.abs(self.model.coef_)
return None
def evaluate_classifier(y_true, y_pred, y_proba=None,
task_type: str = "multiclass") -> Dict[str, Any]:
"""
Оценка качества классификатора.
Args:
y_true: Истинные метки
y_pred: Предсказанные метки
y_proba: Вероятности классов (опционально)
task_type: Тип задачи (binary, multiclass, multilabel)
Returns:
Словарь с метриками
"""
metrics = {
"accuracy": accuracy_score(y_true, y_pred),
"precision_macro": precision_score(y_true, y_pred, average='macro', zero_division=0),
"recall_macro": recall_score(y_true, y_pred, average='macro', zero_division=0),
"f1_macro": f1_score(y_true, y_pred, average='macro', zero_division=0),
"precision_micro": precision_score(y_true, y_pred, average='micro', zero_division=0),
"recall_micro": recall_score(y_true, y_pred, average='micro', zero_division=0),
"f1_micro": f1_score(y_true, y_pred, average='micro', zero_division=0),
}
# ROC-AUC для бинарной классификации
if task_type == "binary" and y_proba is not None and y_proba.shape[1] == 2:
try:
metrics["roc_auc"] = roc_auc_score(y_true, y_proba[:, 1])
except:
metrics["roc_auc"] = np.nan
# ROC-AUC для многоклассовой (macro)
elif task_type == "multiclass" and y_proba is not None:
try:
metrics["roc_auc_macro"] = roc_auc_score(y_true, y_proba, average='macro', multi_class='ovr')
except:
metrics["roc_auc_macro"] = np.nan
# Метрики для многометочной классификации
elif task_type == "multilabel":
# Для multilabel используем специальные метрики
from sklearn.metrics import hamming_loss, jaccard_score
try:
metrics["hamming_loss"] = hamming_loss(y_true, y_pred)
metrics["jaccard_score"] = jaccard_score(y_true, y_pred, average='macro', zero_division=0)
# ROC-AUC для multilabel (каждый класс отдельно, затем усреднение)
if y_proba is not None:
try:
metrics["roc_auc_macro"] = roc_auc_score(y_true, y_proba, average='macro')
except:
metrics["roc_auc_macro"] = np.nan
except Exception as e:
print(f"Ошибка при вычислении метрик multilabel: {e}")
return metrics
def cross_validate_classifier(model, X, y, cv=5, scoring='f1_macro'):
"""Кросс-валидация классификатора."""
cv_scores = cross_val_score(model, X, y, cv=StratifiedKFold(n_splits=cv, shuffle=True, random_state=42),
scoring=scoring)
return {
"mean": cv_scores.mean(),
"std": cv_scores.std(),
"scores": cv_scores.tolist()
}
def compare_classifiers(X_train, y_train, X_test, y_test,
configs: List[ClassifierConfig],
task_type: str = "multiclass",
cv: Optional[int] = None) -> pd.DataFrame:
"""
Сравнение нескольких классификаторов.
Args:
X_train: Обучающие признаки
y_train: Обучающие метки
X_test: Тестовые признаки
y_test: Тестовые метки
configs: Список конфигураций классификаторов
task_type: Тип задачи (binary, multiclass, multilabel)
cv: Количество фолдов для кросс-валидации (опционально)
Returns:
DataFrame с результатами сравнения
"""
# Определяем, является ли задача multilabel
is_multilabel = task_type == "multilabel"
if is_multilabel:
# Обновляем конфигурации для multilabel
for cfg in configs:
cfg.multilabel = True
results = []
for cfg in configs:
try:
classifier = ClassicalClassifiers(cfg)
# Обучение
classifier.fit(X_train, y_train)
# Предсказания
y_pred = classifier.predict(X_test)
y_proba = classifier.predict_proba(X_test)
# Для multilabel y_pred может быть 2D, нужно преобразовать
if is_multilabel and len(y_pred.shape) == 2:
# y_pred уже в правильном формате для multilabel
pass
elif is_multilabel:
# Если модель вернула 1D, преобразуем
y_pred = y_pred.reshape(-1, 1) if len(y_pred.shape) == 1 else y_pred
# Метрики
metrics = evaluate_classifier(y_test, y_pred, y_proba, task_type)
# Кросс-валидация (если запрошена)
cv_results = None
if cv:
cv_results = cross_validate_classifier(classifier.model, X_train, y_train, cv=cv)
result = {
"Модель": cfg.name,
"Тип": cfg.model_type,
"Точность": round(metrics["accuracy"], 4),
"Precision (macro)": round(metrics["precision_macro"], 4),
"Recall (macro)": round(metrics["recall_macro"], 4),
"F1 (macro)": round(metrics["f1_macro"], 4),
"F1 (micro)": round(metrics["f1_micro"], 4),
"Время обучения (с)": round(classifier.train_time, 2),
"Время предсказания (с)": round(classifier.predict_time, 4),
}
if "roc_auc" in metrics:
result["ROC-AUC"] = round(metrics["roc_auc"], 4)
elif "roc_auc_macro" in metrics:
result["ROC-AUC (macro)"] = round(metrics["roc_auc_macro"], 4)
# Дополнительные метрики для multilabel
if task_type == "multilabel":
if "hamming_loss" in metrics:
result["Hamming Loss"] = round(metrics["hamming_loss"], 4)
if "jaccard_score" in metrics:
result["Jaccard Score"] = round(metrics["jaccard_score"], 4)
if cv_results:
result["CV F1 (mean)"] = round(cv_results["mean"], 4)
result["CV F1 (std)"] = round(cv_results["std"], 4)
results.append(result)
except Exception as e:
print(f"Ошибка при обучении {cfg.name}: {e}")
results.append({
"Модель": cfg.name,
"Тип": cfg.model_type,
"Ошибка": str(e)
})
return pd.DataFrame(results)
if __name__ == "__main__":
# Тестирование
from sklearn.datasets import make_classification
from sklearn.model_selection import train_test_split
X, y = make_classification(n_samples=1000, n_features=20, n_classes=3, random_state=42)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
configs = [
ClassifierConfig(name="Logistic Regression", model_type="lr"),
ClassifierConfig(name="SVM", model_type="svm", params={"kernel": "linear"}),
ClassifierConfig(name="Random Forest", model_type="rf"),
]
if XGBOOST_AVAILABLE:
configs.append(ClassifierConfig(name="XGBoost", model_type="xgb"))
results_df = compare_classifiers(X_train, y_train, X_test, y_test, configs)
print(results_df)