| | """Meta-classifier for Financial PhraseBank (FPB) datasets. |
| | |
| | The script expects the following pre-computed artifacts inside ``outputs/`` |
| | (or custom paths can be supplied): |
| | |
| | * ``FinSent_<split>_raw_probs_prob_features.csv`` – base probabilities and |
| | probability-derived features for FinBERT/RoBERTa |
| | * ``FPB_MultiLLM_<split>.csv`` – expert-signal metrics (KL, L1, agreement) |
| | * ``Sentences_<split>_semantics.csv`` – structured semantics flags |
| | |
| | Example command (50Agree subset):: |
| | |
| | python "FPB Meta Classifier.py" \\ |
| | --dataset 50Agree \\ |
| | --folds 5 \\ |
| | --models logreg xgboost \\ |
| | --artifact_prefix outputs/FinSent_50Agree_meta \\ |
| | --save_predictions --save_models --verbose |
| | |
| | """ |
| |
|
| | from __future__ import annotations |
| |
|
| | import argparse |
| | import os |
| | from dataclasses import dataclass |
| | from typing import Dict, Iterable, List, Optional |
| |
|
| | import joblib |
| | import numpy as np |
| | import pandas as pd |
| | from sklearn.compose import ColumnTransformer |
| | from sklearn.linear_model import LogisticRegression |
| | from sklearn.metrics import classification_report, confusion_matrix |
| | from sklearn.model_selection import StratifiedKFold, cross_val_predict, cross_validate |
| | from sklearn.pipeline import Pipeline |
| | from sklearn.preprocessing import OneHotEncoder, StandardScaler |
| |
|
| | |
| | try: |
| | from tqdm import tqdm |
| | except ImportError: |
| | tqdm = None |
| |
|
| | try: |
| | from xgboost import XGBClassifier |
| | except ImportError: |
| | XGBClassifier = None |
| |
|
| | try: |
| | import torch |
| | from transformers import AutoTokenizer, AutoModelForSequenceClassification |
| | from scipy.stats import entropy |
| | TRANSFORMERS_AVAILABLE = True |
| | except ImportError: |
| | TRANSFORMERS_AVAILABLE = False |
| | print("[!] transformers or torch not available. FinSentLLM feature engineering will be disabled.") |
| |
|
| | from sklearn.base import BaseEstimator, TransformerMixin |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | @dataclass |
| | class DatasetPaths: |
| | dataset: str |
| | prob_features_csv: str |
| | multi_llm_csv: str |
| | semantics_csv: str |
| |
|
| |
|
| | def infer_paths(dataset: str, base_dir: str = "outputs") -> DatasetPaths: |
| | dtag = dataset.strip() |
| | prob_csv = os.path.join(base_dir, "prob features", f"FinSent_{dtag}_raw_probs_prob_features.csv") |
| | multi_csv = os.path.join(base_dir, "MultiLLM", f"FPB_MultiLLM_{dtag}.csv") |
| | |
| | sem_csv = os.path.join(base_dir, "Structures Financial Semantics", f"Sentences_{dtag}_semantics.csv") |
| | return DatasetPaths(dataset=dtag, prob_features_csv=prob_csv, multi_llm_csv=multi_csv, semantics_csv=sem_csv) |
| |
|
| |
|
| | def _merge_features(left: pd.DataFrame, right: pd.DataFrame, key: str = "doc_id") -> pd.DataFrame: |
| | """Merge two DataFrames on ``doc_id`` while dropping duplicate feature columns.""" |
| | overlap = [c for c in right.columns if c in left.columns and c != key] |
| | right_clean = right.drop(columns=overlap, errors="ignore") |
| | merged = left.merge(right_clean, on=key, how="left", validate="one_to_one") |
| | return merged |
| |
|
| |
|
| | def load_feature_table(paths: DatasetPaths) -> pd.DataFrame: |
| | if not os.path.exists(paths.multi_llm_csv): |
| | raise FileNotFoundError(f"Missing Multi-LLM feature CSV: {paths.multi_llm_csv}") |
| | base = pd.read_csv(paths.multi_llm_csv) |
| |
|
| | |
| | if "doc_id" not in base.columns: |
| | raise KeyError("Expected 'doc_id' column in Multi-LLM CSV. Re-run stage 3 feature extraction.") |
| |
|
| | |
| | if os.path.exists(paths.prob_features_csv): |
| | prob = pd.read_csv(paths.prob_features_csv) |
| | if "doc_id" not in prob.columns: |
| | raise KeyError("Probability features CSV must contain 'doc_id'.") |
| | base = _merge_features(base, prob, key="doc_id") |
| | else: |
| | print(f"[!] Probability feature CSV not found ({paths.prob_features_csv}); proceeding without extra columns.") |
| |
|
| | |
| | if not os.path.exists(paths.semantics_csv): |
| | raise FileNotFoundError(f"Missing semantics CSV: {paths.semantics_csv}") |
| | sem = pd.read_csv(paths.semantics_csv) |
| | if "doc_id" not in sem.columns: |
| | if "id" in sem.columns: |
| | sem = sem.rename(columns={"id": "doc_id"}) |
| | else: |
| | raise KeyError("Semantics CSV must contain 'doc_id' or 'id' column.") |
| |
|
| | sem = sem.drop(columns=[c for c in ["label", "sentence", "text"] if c in sem.columns], errors="ignore") |
| | merged = _merge_features(base, sem, key="doc_id") |
| |
|
| | |
| | sem_cols = [c for c in merged.columns if c.startswith("sem_")] |
| | if sem_cols: |
| | missing_sem = merged[sem_cols].isna().any(axis=1) |
| | if missing_sem.any(): |
| | raise ValueError( |
| | f"{int(missing_sem.sum())} rows lack structured semantics after merging. Make sure the semantics file" |
| | " matches the dataset split." |
| | ) |
| |
|
| | return merged |
| |
|
| |
|
| | def load_best_iterations(results_dir: str = "results") -> Dict[str, int]: |
| | """Load previously computed best iterations for XGBoost models. |
| | |
| | Returns: |
| | Dictionary mapping dataset names to best iteration counts. |
| | Returns empty dict if file not found. |
| | """ |
| | best_iters_file = os.path.join(results_dir, "xgb_meta_best_iterations.csv") |
| | |
| | if not os.path.exists(best_iters_file): |
| | print(f"[!] Best iterations file not found: {best_iters_file}") |
| | return {} |
| | |
| | try: |
| | df = pd.read_csv(best_iters_file) |
| | |
| | best_iters = {} |
| | for _, row in df.iterrows(): |
| | dataset = row["meta"] |
| | best_iter = int(row["best_iteration"]) |
| | best_iters[dataset] = best_iter |
| | |
| | print(f"[✓] Loaded best iterations for {len(best_iters)} datasets:") |
| | for dataset, iter_count in best_iters.items(): |
| | print(f" {dataset}: {iter_count} iterations") |
| | |
| | return best_iters |
| | except Exception as e: |
| | print(f"[!] Error loading best iterations: {e}") |
| | return {} |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | class FinSentLLMFeatureEngineering(BaseEstimator, TransformerMixin): |
| | """ |
| | 端到端特征工程转换器,将原始文本转换为FinSentLLM的36个特征。 |
| | 包括FinBERT/RoBERTa推理、概率工程、MultiLLM特征和语义特征。 |
| | """ |
| | |
| | def __init__(self, |
| | finbert_model_id="ProsusAI/finbert", |
| | roberta_model_id="cardiffnlp/twitter-roberta-base-sentiment", |
| | batch_size=16, |
| | max_length=128, |
| | device=None): |
| | self.finbert_model_id = finbert_model_id |
| | self.roberta_model_id = roberta_model_id |
| | self.batch_size = batch_size |
| | self.max_length = max_length |
| | self.device = device |
| | self.class_names = ["negative", "neutral", "positive"] |
| | |
| | |
| | self.finbert_tokenizer = None |
| | self.finbert_model = None |
| | self.roberta_tokenizer = None |
| | self.roberta_model = None |
| | self._device = None |
| | |
| | def _load_models(self): |
| | """加载FinBERT和RoBERTa模型""" |
| | if not TRANSFORMERS_AVAILABLE: |
| | raise ImportError("transformers and torch are required for FinSentLLM feature engineering") |
| | |
| | print("[📥] Loading FinBERT and RoBERTa models...") |
| | |
| | |
| | if self.device is None: |
| | self._device = torch.device("cuda" if torch.cuda.is_available() else "cpu") |
| | else: |
| | self._device = torch.device(self.device) |
| | |
| | |
| | self.finbert_tokenizer = AutoTokenizer.from_pretrained(self.finbert_model_id) |
| | self.finbert_model = AutoModelForSequenceClassification.from_pretrained(self.finbert_model_id) |
| | self.finbert_model.to(self._device).eval() |
| | |
| | |
| | self.roberta_tokenizer = AutoTokenizer.from_pretrained(self.roberta_model_id) |
| | self.roberta_model = AutoModelForSequenceClassification.from_pretrained(self.roberta_model_id) |
| | self.roberta_model.to(self._device).eval() |
| | |
| | print(f"[✅] Models loaded on {self._device}") |
| | |
| | @torch.no_grad() |
| | def _get_probabilities(self, texts, tokenizer, model): |
| | """获取模型的概率预测,带tqdm进度条""" |
| | all_probs = [] |
| | total = len(texts) |
| | batch_iter = range(0, total, self.batch_size) |
| | use_tqdm = tqdm is not None and total > self.batch_size |
| | iterator = tqdm(batch_iter, desc="[tqdm] Encoding & inference", unit="batch") if use_tqdm else batch_iter |
| | for i in iterator: |
| | batch = texts[i:i + self.batch_size] |
| | |
| | encoding = tokenizer( |
| | batch, |
| | return_tensors="pt", |
| | truncation=True, |
| | padding=True, |
| | max_length=self.max_length |
| | ) |
| | |
| | encoding = {k: v.to(self._device) for k, v in encoding.items()} |
| | |
| | logits = model(**encoding).logits |
| | probs = torch.softmax(logits, dim=-1).cpu().numpy() |
| | all_probs.append(probs) |
| | return np.vstack(all_probs) |
| | |
| | def _build_features(self, finbert_probs, roberta_probs): |
| | """构建完整的36个特征""" |
| | eps = 1e-12 |
| | features = {} |
| | n_samples = len(finbert_probs) |
| | |
| | |
| | for i, cls in enumerate(self.class_names): |
| | features[f"fin_p_{cls[:3]}"] = finbert_probs[:, i] |
| | features[f"rob_p_{cls[:3]}"] = roberta_probs[:, i] |
| | |
| | features["fin_score"] = finbert_probs.max(axis=1) |
| | features["rob_score"] = roberta_probs.max(axis=1) |
| | |
| | |
| | features["fin_label"] = finbert_probs.argmax(axis=1) |
| | features["rob_label"] = roberta_probs.argmax(axis=1) |
| | |
| | |
| | |
| | for i, cls in enumerate(self.class_names): |
| | features[f"fin_logit_{cls[:3]}"] = np.log((finbert_probs[:, i] + eps) / (1 - finbert_probs[:, i] + eps)) |
| | features[f"rob_logit_{cls[:3]}"] = np.log((roberta_probs[:, i] + eps) / (1 - roberta_probs[:, i] + eps)) |
| | |
| | |
| | features["fin_max_prob"] = finbert_probs.max(axis=1) |
| | features["rob_max_prob"] = roberta_probs.max(axis=1) |
| | |
| | |
| | fin_sorted = np.sort(finbert_probs, axis=1) |
| | rob_sorted = np.sort(roberta_probs, axis=1) |
| | features["fin_margin"] = fin_sorted[:, -1] - fin_sorted[:, -2] |
| | features["rob_margin"] = rob_sorted[:, -1] - rob_sorted[:, -2] |
| | |
| | |
| | features["fin_entropy"] = entropy(finbert_probs.T) |
| | features["rob_entropy"] = entropy(roberta_probs.T) |
| | |
| | |
| | |
| | l1_dist = np.abs(finbert_probs - roberta_probs).sum(axis=1) |
| | features["MultiLLM_L1_distance"] = l1_dist |
| | features["MultiLLM_L1_similarity"] = 1 / (1 + l1_dist) |
| | |
| | |
| | features["MultiLLM_KL_F_to_R"] = entropy(finbert_probs.T, roberta_probs.T) |
| | features["MultiLLM_KL_R_to_F"] = entropy(roberta_probs.T, finbert_probs.T) |
| | |
| | |
| | fin_pred = finbert_probs.argmax(axis=1) |
| | rob_pred = roberta_probs.argmax(axis=1) |
| | features["MultiLLM_agree"] = (fin_pred == rob_pred).astype(int) |
| | |
| | |
| | |
| | features["sem_compared"] = ((finbert_probs[:, 1] > 0.4) & (roberta_probs[:, 1] > 0.4)).astype(int) |
| | features["sem_loss_improve"] = ((finbert_probs[:, 2] > 0.6) & (roberta_probs[:, 2] > 0.5)).astype(int) |
| | features["sem_loss_worsen"] = ((finbert_probs[:, 0] > 0.6) & (roberta_probs[:, 0] > 0.5)).astype(int) |
| | features["sem_profit_up"] = ((finbert_probs[:, 2] > 0.7) & (l1_dist < 0.3)).astype(int) |
| | features["sem_cost_down"] = ((finbert_probs[:, 2] > 0.5) & (features["MultiLLM_agree"] == 1)).astype(int) |
| | features["sem_contract_fin"] = ((finbert_probs[:, 1] > 0.8)).astype(int) |
| | features["sem_uncertainty"] = ((features["fin_entropy"] > 1.0) | (features["rob_entropy"] > 1.0)).astype(int) |
| | features["sem_stable_guidance"] = ((l1_dist < 0.2) & (finbert_probs[:, 1] > 0.5)).astype(int) |
| | features["sem_operational"] = ((finbert_probs[:, 1] > 0.3) & (roberta_probs[:, 1] > 0.3)).astype(int) |
| | |
| | return pd.DataFrame(features) |
| | |
| | def fit(self, X, y=None): |
| | """训练阶段 - 加载模型""" |
| | self._load_models() |
| | return self |
| | |
| | def transform(self, X): |
| | """转换阶段 - 将文本转换为特征""" |
| | if self.finbert_model is None: |
| | raise RuntimeError("Models not loaded. Call fit() first.") |
| | |
| | |
| | if isinstance(X, pd.DataFrame): |
| | if 'text' in X.columns: |
| | texts = X['text'].tolist() |
| | elif len(X.columns) == 1: |
| | texts = X.iloc[:, 0].tolist() |
| | else: |
| | raise ValueError("DataFrame must have 'text' column or single column") |
| | elif isinstance(X, (list, np.ndarray)): |
| | texts = list(X) |
| | else: |
| | raise ValueError("X must be DataFrame, list, or array") |
| | |
| | print(f"[🔮] Processing {len(texts)} texts...") |
| | |
| | |
| | finbert_probs = self._get_probabilities(texts, self.finbert_tokenizer, self.finbert_model) |
| | roberta_probs = self._get_probabilities(texts, self.roberta_tokenizer, self.roberta_model) |
| | |
| | |
| | features_df = self._build_features(finbert_probs, roberta_probs) |
| | |
| | print(f"[✅] Generated {len(features_df.columns)} features") |
| | return features_df |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def build_preprocessor(numeric_cols: List[str], categorical_cols: List[str]) -> ColumnTransformer: |
| | transformers = [] |
| | if numeric_cols: |
| | transformers.append(("num", StandardScaler(), numeric_cols)) |
| | if categorical_cols: |
| | transformers.append(("cat", OneHotEncoder(handle_unknown="ignore", sparse_output=False), categorical_cols)) |
| | if not transformers: |
| | raise ValueError("No feature columns selected – check your dataset.") |
| | return ColumnTransformer(transformers=transformers, remainder="drop") |
| |
|
| |
|
| | def build_pipelines( |
| | numeric_cols: List[str], |
| | categorical_cols: List[str], |
| | num_classes: int, |
| | random_state: int, |
| | models_requested: Iterable[str], |
| | dataset: str = "", |
| | best_iterations: Dict[str, int] = None, |
| | include_feature_engineering: bool = False, |
| | ) -> Dict[str, Pipeline]: |
| | pipelines: Dict[str, Pipeline] = {} |
| |
|
| |
|
| | |
| | end2end_categorical_features = ["fin_label", "rob_label"] |
| | end2end_numeric_features = [ |
| | 'fin_p_neg', 'fin_p_neu', 'fin_p_pos', 'fin_score', |
| | 'rob_p_neg', 'rob_p_neu', 'rob_p_pos', 'rob_score', |
| | 'fin_logit_neg', 'fin_logit_neu', 'fin_logit_pos', |
| | 'rob_logit_neg', 'rob_logit_neu', 'rob_logit_pos', |
| | 'fin_max_prob', 'rob_max_prob', 'fin_margin', 'rob_margin', |
| | 'fin_entropy', 'rob_entropy', |
| | 'MultiLLM_L1_distance', 'MultiLLM_L1_similarity', |
| | 'MultiLLM_KL_F_to_R', 'MultiLLM_KL_R_to_F', 'MultiLLM_agree', |
| | 'sem_compared', 'sem_loss_improve', 'sem_loss_worsen', |
| | 'sem_profit_up', 'sem_cost_down', 'sem_contract_fin', |
| | 'sem_uncertainty', 'sem_stable_guidance', 'sem_operational' |
| | ] |
| |
|
| | if "logreg" in models_requested: |
| | logreg = LogisticRegression(max_iter=1000, solver="lbfgs") |
| | if include_feature_engineering: |
| | preprocessor = build_preprocessor(end2end_numeric_features, end2end_categorical_features) |
| | pipelines["logreg"] = Pipeline([ |
| | ("feature_engineering", FinSentLLMFeatureEngineering()), |
| | ("preprocess", preprocessor), |
| | ("clf", logreg), |
| | ]) |
| | else: |
| | preprocessor = build_preprocessor(numeric_cols, categorical_cols) |
| | pipelines["logreg"] = Pipeline([ |
| | ("preprocess", preprocessor), |
| | ("clf", logreg), |
| | ]) |
| |
|
| | if "xgboost" in models_requested: |
| | if XGBClassifier is None: |
| | raise ImportError( |
| | "xgboost is not installed. Install it with 'pip install xgboost' or remove 'xgboost' from --models." |
| | ) |
| | if best_iterations and dataset in best_iterations: |
| | n_estimators = best_iterations[dataset] |
| | print(f"[✓] Using pre-computed best iterations for {dataset}: {n_estimators}") |
| | else: |
| | n_estimators = 1000 |
| | print(f"[!] No pre-computed iterations found for {dataset}, using default: {n_estimators}") |
| | xgb = XGBClassifier( |
| | objective="multi:softprob", |
| | num_class=num_classes, |
| | learning_rate=0.05, |
| | max_depth=6, |
| | subsample=0.8, |
| | colsample_bytree=0.8, |
| | n_estimators=n_estimators, |
| | min_child_weight=2, |
| | reg_lambda=1.0, |
| | reg_alpha=0.0, |
| | tree_method="hist", |
| | eval_metric="mlogloss", |
| | random_state=random_state, |
| | n_jobs=0, |
| | verbosity=0, |
| | ) |
| | if include_feature_engineering: |
| | feature_preprocessor = build_preprocessor(end2end_numeric_features, end2end_categorical_features) |
| | pipelines["xgboost"] = Pipeline([ |
| | ("feature_engineering", FinSentLLMFeatureEngineering()), |
| | ("preprocess", feature_preprocessor), |
| | ("clf", xgb), |
| | ]) |
| | print(f"[🤖] Created end-to-end XGBoost pipeline with feature engineering") |
| | else: |
| | preprocessor = build_preprocessor(numeric_cols, categorical_cols) |
| | pipelines["xgboost"] = Pipeline([ |
| | ("preprocess", preprocessor), |
| | ("clf", xgb), |
| | ]) |
| |
|
| | return pipelines |
| |
|
| | if "logreg" in models_requested: |
| | preprocessor = build_preprocessor(numeric_cols, categorical_cols) |
| | logreg = LogisticRegression(max_iter=1000, solver="lbfgs") |
| | pipelines["logreg"] = Pipeline([ |
| | ("preprocess", preprocessor), |
| | ("clf", logreg), |
| | ]) |
| |
|
| | if "xgboost" in models_requested: |
| | if XGBClassifier is None: |
| | raise ImportError( |
| | "xgboost is not installed. Install it with 'pip install xgboost' or remove 'xgboost' from --models." |
| | ) |
| | preprocessor = build_preprocessor(numeric_cols, categorical_cols) |
| | |
| | |
| | if best_iterations and dataset in best_iterations: |
| | n_estimators = best_iterations[dataset] |
| | print(f"[✓] Using pre-computed best iterations for {dataset}: {n_estimators}") |
| | else: |
| | n_estimators = 1000 |
| | print(f"[!] No pre-computed iterations found for {dataset}, using default: {n_estimators}") |
| | |
| | xgb = XGBClassifier( |
| | objective="multi:softprob", |
| | num_class=num_classes, |
| | learning_rate=0.05, |
| | max_depth=6, |
| | subsample=0.8, |
| | colsample_bytree=0.8, |
| | n_estimators=n_estimators, |
| | min_child_weight=2, |
| | reg_lambda=1.0, |
| | reg_alpha=0.0, |
| | tree_method="hist", |
| | eval_metric="mlogloss", |
| | random_state=random_state, |
| | n_jobs=0, |
| | verbosity=0, |
| | ) |
| | pipelines["xgboost"] = Pipeline([ |
| | ("preprocess", preprocessor), |
| | ("clf", xgb), |
| | ]) |
| |
|
| | return pipelines |
| |
|
| |
|
| | def evaluate_model( |
| | name: str, |
| | pipeline: Pipeline, |
| | X: pd.DataFrame, |
| | y_train: pd.Series, |
| | y_eval: pd.Series, |
| | cv: StratifiedKFold, |
| | class_labels: List[str], |
| | label_decoder: Optional[Dict[int, str]] = None, |
| | ) -> Dict[str, object]: |
| | scoring = {"accuracy": "accuracy", "macro_f1": "f1_macro"} |
| |
|
| | scores = cross_validate( |
| | pipeline, |
| | X, |
| | y_train, |
| | scoring=scoring, |
| | cv=cv, |
| | n_jobs=None, |
| | return_estimator=False, |
| | ) |
| |
|
| | preds = cross_val_predict(pipeline, X, y_train, cv=cv, method="predict") |
| | probas = cross_val_predict(pipeline, X, y_train, cv=cv, method="predict_proba") |
| |
|
| | |
| | fitted = pipeline.fit(X, y_train) |
| |
|
| | clf_raw_classes = list(fitted.named_steps["clf"].classes_) |
| |
|
| | if label_decoder: |
| | preds_decoded = np.array([label_decoder[int(p)] for p in preds]) |
| | proba_labels = [label_decoder[int(c)] for c in clf_raw_classes] |
| | else: |
| | preds_decoded = preds |
| | proba_labels = [str(c) for c in clf_raw_classes] |
| |
|
| | if proba_labels != class_labels: |
| | reorder_idx = [proba_labels.index(lbl) for lbl in class_labels] |
| | probas = probas[:, reorder_idx] |
| | proba_labels = class_labels |
| |
|
| | y_eval_array = y_eval.to_numpy() |
| |
|
| | report = classification_report(y_eval_array, preds_decoded, labels=class_labels, digits=4) |
| | cm = confusion_matrix(y_eval_array, preds_decoded, labels=class_labels) |
| |
|
| | metrics = { |
| | "name": name, |
| | "accuracy_mean": float(np.mean(scores["test_accuracy"])), |
| | "accuracy_std": float(np.std(scores["test_accuracy"])), |
| | "macro_f1_mean": float(np.mean(scores["test_macro_f1"])), |
| | "macro_f1_std": float(np.std(scores["test_macro_f1"])), |
| | "classification_report": report, |
| | "confusion_matrix": cm, |
| | "classes": class_labels, |
| | "preds": preds_decoded, |
| | "probas": probas, |
| | "final_model": fitted, |
| | } |
| | |
| | |
| | if name == "xgboost": |
| | if hasattr(fitted.named_steps["clf"], "best_iteration"): |
| | metrics["best_iteration"] = fitted.named_steps["clf"].best_iteration |
| | elif hasattr(fitted.named_steps["clf"], "n_estimators"): |
| | metrics["best_iteration"] = fitted.named_steps["clf"].n_estimators |
| | metrics["best_ntree_limit"] = metrics.get("best_iteration", 0) + 1 |
| | |
| | return metrics |
| |
|
| |
|
| | def print_metrics(metrics: Dict[str, object], verbose: bool = False) -> None: |
| | name = metrics["name"] |
| | print(f"\n=== {name.upper()} meta-classifier ===") |
| | print( |
| | f"Accuracy: {metrics['accuracy_mean']*100:.2f}% ± {metrics['accuracy_std']*100:.2f}%\n" |
| | f"Macro-F1: {metrics['macro_f1_mean']*100:.2f}% ± {metrics['macro_f1_std']*100:.2f}%" |
| | ) |
| | if verbose: |
| | print("\nClassification report:\n", metrics["classification_report"], sep="") |
| | print("Confusion matrix (rows=true, cols=pred):") |
| | classes = metrics["classes"] |
| | header = " " + " ".join(f"{c[:7]:>7}" for c in classes) |
| | print(header) |
| | for c, row in zip(classes, metrics["confusion_matrix"]): |
| | row_fmt = " ".join(f"{int(v):>7}" for v in row) |
| | print(f"{c[:7]:>4} {row_fmt}") |
| |
|
| |
|
| | def save_predictions(base: pd.DataFrame, metrics: Dict[str, object], path: str) -> None: |
| | pred_df = base[["doc_id"]].copy() |
| | pred_df["true_label"] = base["label"] |
| | pred_df["meta_pred"] = metrics["preds"] |
| | for idx, cls in enumerate(metrics["classes"]): |
| | pred_df[f"meta_proba_{cls}"] = metrics["probas"][:, idx] |
| | pred_df.to_csv(path, index=False) |
| | print(f"Saved predictions: {path}") |
| |
|
| |
|
| | |
| | |
| | |
| |
|
| | def parse_args() -> argparse.Namespace: |
| | parser = argparse.ArgumentParser(description="Train FinSentLLM meta-classifiers (LogReg/XGBoost).") |
| | parser.add_argument("--dataset", required=True, help="Dataset tag, e.g. 50Agree | 66Agree | 75Agree | AllAgree") |
| | parser.add_argument("--prob_features_csv", help="Override path to probability feature CSV") |
| | parser.add_argument("--multi_llm_csv", help="Override path to Multi-LLM feature CSV") |
| | parser.add_argument("--semantics_csv", help="Override path to structured semantics CSV") |
| | parser.add_argument("--folds", type=int, default=5, help="Number of stratified CV folds (default: 5)") |
| | parser.add_argument("--seed", type=int, default=7, help="Random seed for CV shuffling (default: 7)") |
| | parser.add_argument( |
| | "--models", |
| | nargs="+", |
| | default=["logreg", "xgboost"], |
| | choices=["logreg", "xgboost"], |
| | help="Which meta-models to evaluate (default: both)", |
| | ) |
| | parser.add_argument("--artifact_prefix", help="If set, saves artifacts using this filepath prefix") |
| | parser.add_argument("--out_dir", default="outputs", help="Base output directory") |
| | parser.add_argument("--meta_xgb_dir", default="Meta-Classifier_XG_boost_es_optimized", help="Subdir for xgboost artifacts") |
| | parser.add_argument("--meta_logreg_dir", default="Meta-Classifier-log_regression", help="Subdir for logreg artifacts") |
| | parser.add_argument("--save_predictions", action="store_true", help="Write out-of-fold predictions per model") |
| | parser.add_argument("--save_models", action="store_true", help="Persist fitted pipelines per model") |
| | parser.add_argument("--verbose", action="store_true", help="Print full reports and confusion matrices") |
| | |
| | parser.add_argument("--end_to_end", action="store_true", default=False, help="[慢] 用大模型重新生成特征 (不建议,除非你需要全流程推理)") |
| | return parser.parse_args() |
| |
|
| |
|
| | def main() -> None: |
| |
|
| | args = parse_args() |
| |
|
| | |
| | best_iterations = load_best_iterations() |
| |
|
| | |
| | if args.end_to_end and (args.prob_features_csv or args.multi_llm_csv or args.semantics_csv): |
| | print("[警告] --end_to_end 模式下会忽略所有预处理特征文件,全部重新推理,速度极慢!") |
| |
|
| | if args.end_to_end: |
| | print("[🤖] Creating end-to-end pipelines with feature engineering... (速度极慢,仅用于全流程推理)") |
| | if not TRANSFORMERS_AVAILABLE: |
| | raise ImportError("transformers and torch are required for end-to-end feature engineering. Install with: pip install transformers torch") |
| |
|
| | |
| | paths = infer_paths(args.dataset) |
| | data = load_feature_table(paths) |
| |
|
| | |
| | text_col = None |
| | for col in ['text', 'sentence', 'content']: |
| | if col in data.columns: |
| | text_col = col |
| | break |
| |
|
| | if text_col is None: |
| | raise ValueError("End-to-end mode requires text data, but no text column found in dataset") |
| |
|
| | X_text = data[[text_col]] |
| | target_col = "label" |
| | if target_col not in data.columns: |
| | raise KeyError("Target column 'label' not found after merging.") |
| |
|
| | y = data[target_col].astype(str) |
| |
|
| | default_order = ["negative", "neutral", "positive"] |
| | observed = list(pd.unique(y)) |
| | class_labels = [lbl for lbl in default_order if lbl in observed] |
| | class_labels += [lbl for lbl in observed if lbl not in class_labels] |
| |
|
| | label_to_int = {lbl: idx for idx, lbl in enumerate(class_labels)} |
| | int_to_label = {idx: lbl for lbl, idx in label_to_int.items()} |
| | y_encoded = y.map(label_to_int).astype(int) |
| |
|
| | pipelines = build_pipelines( |
| | numeric_cols=[], |
| | categorical_cols=[], |
| | num_classes=len(class_labels), |
| | random_state=args.seed, |
| | models_requested=args.models, |
| | dataset=args.dataset, |
| | best_iterations=best_iterations, |
| | include_feature_engineering=True, |
| | ) |
| | X = X_text |
| | else: |
| | |
| | paths = infer_paths(args.dataset) |
| | if args.prob_features_csv: |
| | paths.prob_features_csv = args.prob_features_csv |
| | if args.multi_llm_csv: |
| | paths.multi_llm_csv = args.multi_llm_csv |
| | if args.semantics_csv: |
| | paths.semantics_csv = args.semantics_csv |
| |
|
| | data = load_feature_table(paths) |
| |
|
| | target_col = "label" |
| | if target_col not in data.columns: |
| | raise KeyError("Target column 'label' not found after merging.") |
| |
|
| | categorical_cols = [c for c in ["fin_label", "rob_label"] if c in data.columns] |
| | numeric_cols = [ |
| | c for c in data.select_dtypes(include=[np.number]).columns |
| | if c not in {"doc_id"} |
| | ] |
| |
|
| | X = data[numeric_cols + categorical_cols] |
| | y = data[target_col].astype(str) |
| |
|
| | default_order = ["negative", "neutral", "positive"] |
| | observed = list(pd.unique(y)) |
| | class_labels = [lbl for lbl in default_order if lbl in observed] |
| | class_labels += [lbl for lbl in observed if lbl not in class_labels] |
| |
|
| | label_to_int = {lbl: idx for idx, lbl in enumerate(class_labels)} |
| | int_to_label = {idx: lbl for lbl, idx in label_to_int.items()} |
| | y_encoded = y.map(label_to_int).astype(int) |
| |
|
| | pipelines = build_pipelines( |
| | numeric_cols=numeric_cols, |
| | categorical_cols=categorical_cols, |
| | num_classes=len(class_labels), |
| | random_state=args.seed, |
| | models_requested=args.models, |
| | dataset=args.dataset, |
| | best_iterations=best_iterations, |
| | include_feature_engineering=False, |
| | ) |
| |
|
| | cv = StratifiedKFold(n_splits=args.folds, shuffle=True, random_state=args.seed) |
| |
|
| | results = {} |
| | for name, pipeline in pipelines.items(): |
| | if name == "xgboost": |
| | metrics = evaluate_model( |
| | name, |
| | pipeline, |
| | X, |
| | y_encoded, |
| | y, |
| | cv=cv, |
| | class_labels=class_labels, |
| | label_decoder=int_to_label, |
| | ) |
| | else: |
| | metrics = evaluate_model( |
| | name, |
| | pipeline, |
| | X, |
| | y, |
| | y, |
| | cv=cv, |
| | class_labels=class_labels, |
| | ) |
| | print_metrics(metrics, verbose=args.verbose) |
| | results[name] = metrics |
| |
|
| | if args.artifact_prefix and args.save_predictions: |
| | pred_path = f"{args.artifact_prefix}_{name}_predictions.csv" |
| | save_predictions(data, metrics, pred_path) |
| |
|
| | if args.artifact_prefix and args.save_models: |
| | model_path = f"{args.artifact_prefix}_{name}_model.joblib" |
| | |
| | if name == "xgboost": |
| | model_dict = { |
| | "pipeline": metrics["final_model"], |
| | "feature_columns": list(X.columns), |
| | "label_map": label_to_int, |
| | "labels": class_labels, |
| | "best_iteration": metrics.get("best_iteration", 0), |
| | "best_ntree_limit": metrics.get("best_ntree_limit", 1), |
| | } |
| | joblib.dump(model_dict, model_path) |
| | else: |
| | joblib.dump(metrics["final_model"], model_path) |
| | print(f"Saved model: {model_path}") |
| |
|
| | |
| | if not args.artifact_prefix and args.save_predictions: |
| | if name == "xgboost": |
| | save_dir = os.path.join(args.out_dir, args.meta_xgb_dir) |
| | else: |
| | save_dir = os.path.join(args.out_dir, args.meta_logreg_dir) |
| | os.makedirs(save_dir, exist_ok=True) |
| | pred_path = os.path.join(save_dir, f"FinSent_{args.dataset}_meta_{name}_predictions.csv") |
| | save_predictions(data, metrics, pred_path) |
| |
|
| | if not args.artifact_prefix and args.save_models: |
| | if name == "xgboost": |
| | save_dir = os.path.join(args.out_dir, args.meta_xgb_dir) |
| | else: |
| | save_dir = os.path.join(args.out_dir, args.meta_logreg_dir) |
| | os.makedirs(save_dir, exist_ok=True) |
| | model_path = os.path.join(save_dir, f"FinSent_{args.dataset}_meta_{name}_model.joblib") |
| | |
| | if name == "xgboost": |
| | model_dict = { |
| | "pipeline": metrics["final_model"], |
| | "feature_columns": list(X.columns), |
| | "label_map": label_to_int, |
| | "labels": class_labels, |
| | "best_iteration": metrics.get("best_iteration", 0), |
| | "best_ntree_limit": metrics.get("best_ntree_limit", 1), |
| | } |
| | joblib.dump(model_dict, model_path) |
| | else: |
| | joblib.dump(metrics["final_model"], model_path) |
| | print(f"Saved model: {model_path}") |
| |
|
| |
|
| | if __name__ == "__main__": |
| | main() |
| |
|