matchaSentiment / scripts /train_classical.py
seedflora's picture
Initial Space upload from matchaSentiment repo
4ce2b3e verified
from __future__ import annotations
import argparse
import json
import sys
from pathlib import Path
import joblib
import numpy as np
import pandas as pd
from sklearn.base import clone
from sklearn.ensemble import ExtraTreesClassifier, GradientBoostingClassifier, RandomForestClassifier
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import StratifiedKFold
from sklearn.naive_bayes import MultinomialNB
from sklearn.pipeline import Pipeline
from sklearn.svm import LinearSVC
sys.path.append(str(Path(__file__).resolve().parents[1] / "src"))
from matcha_sentiment.classical import DenseTransformer, W2VBundle, tfidf_tokenizer, vectorize_with_w2v
from matcha_sentiment.config import ARTIFACT_DIR, DATA_PATH, ID2LABEL, MODEL_DIR, STOPWORDS
from matcha_sentiment.data import load_binary_dataset
from matcha_sentiment.metrics import binary_metrics, report_dict
from matcha_sentiment.plots import plot_confusion, plot_roc_curve, plot_top_words, write_wordcloud
from matcha_sentiment.text import tokenize, tokenized_documents
ROOT = Path(__file__).resolve().parents[1]
KEYWORD_SEEDS = [
"enak",
"lezat",
"nikmat",
"nyaman",
"ramah",
"bagus",
"terbaik",
"mantap",
"autentik",
"direkomendasikan",
"murah",
"mahal",
"harga",
"harganya",
"buruk",
"kecewa",
"lama",
"antrean",
"menunggu",
"kurang",
"tidak",
"biasa",
"pahit",
"manis",
]
def make_tfidf(max_features: int, min_df: int) -> TfidfVectorizer:
return TfidfVectorizer(
tokenizer=tfidf_tokenizer,
token_pattern=None,
lowercase=False,
ngram_range=(1, 2),
min_df=min_df,
max_features=max_features,
sublinear_tf=True,
)
def classifiers(random_state: int) -> dict[str, object]:
return {
"logistic_regression": LogisticRegression(
max_iter=2500,
class_weight="balanced",
solver="liblinear",
random_state=random_state,
),
"linear_svm": LinearSVC(class_weight="balanced", random_state=random_state),
"random_forest": RandomForestClassifier(
n_estimators=450,
min_samples_leaf=2,
class_weight="balanced_subsample",
n_jobs=-1,
random_state=random_state,
),
"extra_trees": ExtraTreesClassifier(
n_estimators=500,
min_samples_leaf=2,
class_weight="balanced",
n_jobs=-1,
random_state=random_state,
),
"gradient_boosting": GradientBoostingClassifier(random_state=random_state),
}
def positive_score(model, features):
if hasattr(model, "predict_proba"):
proba = model.predict_proba(features)
if proba.shape[1] == 2:
return proba[:, 1]
if hasattr(model, "decision_function"):
return model.decision_function(features)
return None
def import_word2vec():
try:
from gensim.models import Word2Vec
except Exception as exc: # pragma: no cover - dependency/runtime guard
raise RuntimeError(
"Word2Vec needs gensim and compatible scipy. Install requirements.txt or use the Docker image."
) from exc
return Word2Vec
def train_word2vec(texts: list[str], *, vector_size: int, random_state: int):
Word2Vec = import_word2vec()
docs = tokenized_documents(texts, remove_stopwords=False)
return Word2Vec(
sentences=docs,
vector_size=vector_size,
window=5,
min_count=1,
workers=1,
sg=1,
seed=random_state,
epochs=60,
)
def make_tfidf_pipeline(estimator, model_name: str, max_features: int, min_df: int) -> Pipeline:
steps = [("tfidf", make_tfidf(max_features=max_features, min_df=min_df))]
if model_name == "gradient_boosting":
steps.append(("dense", DenseTransformer()))
steps.append(("classifier", estimator))
return Pipeline(steps)
def evaluate_feature_model(
*,
feature_name: str,
model_name: str,
estimator,
texts: np.ndarray,
y: np.ndarray,
folds: int,
random_state: int,
max_features: int,
min_df: int,
w2v_size: int,
out_dir: Path,
) -> tuple[dict, pd.DataFrame]:
splitter = StratifiedKFold(n_splits=folds, shuffle=True, random_state=random_state)
oof_pred = np.zeros_like(y)
oof_score = np.full(len(y), np.nan, dtype=float)
fold_rows: list[dict] = []
for fold, (train_idx, valid_idx) in enumerate(splitter.split(texts, y), start=1):
train_texts = texts[train_idx].tolist()
valid_texts = texts[valid_idx].tolist()
y_train, y_valid = y[train_idx], y[valid_idx]
clf = clone(estimator)
if feature_name == "tfidf":
model = make_tfidf_pipeline(clf, model_name, max_features, min_df)
model.fit(train_texts, y_train)
pred = model.predict(valid_texts)
score = positive_score(model, valid_texts)
elif feature_name == "word2vec":
w2v = train_word2vec(train_texts, vector_size=w2v_size, random_state=random_state + fold)
x_train = vectorize_with_w2v(w2v, train_texts, w2v_size)
x_valid = vectorize_with_w2v(w2v, valid_texts, w2v_size)
clf.fit(x_train, y_train)
pred = clf.predict(x_valid)
score = positive_score(clf, x_valid)
else:
raise ValueError(f"Unsupported feature: {feature_name}")
oof_pred[valid_idx] = pred
if score is not None:
oof_score[valid_idx] = score
fold_metrics = binary_metrics(
y_valid,
pred,
None if score is None else score,
)
fold_metrics.update(
{
"feature": feature_name,
"model": model_name,
"fold": fold,
"n_valid": int(len(valid_idx)),
}
)
fold_rows.append(fold_metrics)
usable_score = None if np.isnan(oof_score).any() else oof_score
aggregate = binary_metrics(y, oof_pred, usable_score)
aggregate.update(
{
"feature": feature_name,
"model": model_name,
"folds": folds,
"n": int(len(y)),
}
)
pred_frame = pd.DataFrame(
{
"text": texts,
"label": y,
"label_name": [ID2LABEL[int(v)] for v in y],
"prediction": oof_pred,
"prediction_name": [ID2LABEL[int(v)] for v in oof_pred],
"score": oof_score,
}
)
pred_frame.to_csv(out_dir / f"oof_{feature_name}_{model_name}.csv", index=False)
(out_dir / f"report_{feature_name}_{model_name}.json").write_text(
json.dumps(report_dict(y, oof_pred), indent=2, ensure_ascii=False),
encoding="utf-8",
)
return aggregate, pd.DataFrame(fold_rows)
def fit_final_model(
*,
feature_name: str,
model_name: str,
estimator,
texts: list[str],
y: np.ndarray,
random_state: int,
max_features: int,
min_df: int,
w2v_size: int,
):
if feature_name == "tfidf":
model = make_tfidf_pipeline(clone(estimator), model_name, max_features, min_df)
model.fit(texts, y)
return model
w2v = train_word2vec(texts, vector_size=w2v_size, random_state=random_state)
features = vectorize_with_w2v(w2v, texts, w2v_size)
clf = clone(estimator)
clf.fit(features, y)
return W2VBundle(word2vec=w2v, classifier=clf, vector_size=w2v_size)
def extract_tfidf_top_words(texts: list[str], y: np.ndarray, max_features: int, min_df: int, out_dir: Path) -> pd.DataFrame:
vectorizer = make_tfidf(max_features=max_features, min_df=min_df)
x = vectorizer.fit_transform(texts)
model = LogisticRegression(
max_iter=2500,
class_weight="balanced",
solver="liblinear",
random_state=42,
)
model.fit(x, y)
terms = np.array(vectorizer.get_feature_names_out())
weights = model.coef_[0]
def meaningful(term: str) -> bool:
parts = term.split()
return any(part not in STOPWORDS and len(part) >= 3 for part in parts)
candidates = pd.DataFrame({"term": terms, "weight": weights})
candidates = candidates[candidates["term"].map(meaningful)].copy()
pos = candidates.sort_values("weight", ascending=False).head(60)
pos["label_name"] = "Positif"
neg = candidates.sort_values("weight", ascending=True).head(60)
neg["label_name"] = "Negatif"
top_words = pd.concat([pos, neg], ignore_index=True)
top_words.to_csv(out_dir / "top_words_tfidf.csv", index=False)
return top_words
def write_keyword_counts(df: pd.DataFrame, out_dir: Path) -> pd.DataFrame:
token_sets = df["text"].map(lambda text: set(tokenize(text, remove_stopwords=False, min_len=2)))
rows: list[dict] = []
pos_total = int(df["label"].eq(1).sum())
neg_total = int(df["label"].eq(0).sum())
for term in KEYWORD_SEEDS:
pos_count = int(((df["label"].eq(1)) & token_sets.map(lambda tokens: term in tokens)).sum())
neg_count = int(((df["label"].eq(0)) & token_sets.map(lambda tokens: term in tokens)).sum())
if pos_count == 0 and neg_count == 0:
continue
pos_rate = pos_count / pos_total if pos_total else 0.0
neg_rate = neg_count / neg_total if neg_total else 0.0
dominant = "Positif" if pos_rate >= neg_rate else "Negatif"
rows.append(
{
"term": term,
"positif_docs": pos_count,
"negatif_docs": neg_count,
"positif_rate": pos_rate,
"negatif_rate": neg_rate,
"dominant_label": dominant,
"lift": (pos_rate + 1e-9) / (neg_rate + 1e-9),
}
)
result = pd.DataFrame(rows).sort_values(
["dominant_label", "lift", "positif_docs", "negatif_docs"],
ascending=[False, False, False, False],
)
result.to_csv(out_dir / "keyword_counts.csv", index=False)
return result
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Train TF-IDF and Word2Vec classical baselines with 10-fold CV.")
parser.add_argument("--data", default=str(DATA_PATH), help="Prepared binary CSV.")
parser.add_argument("--out-dir", default=str(ARTIFACT_DIR / "classical"), help="Output artifact directory.")
parser.add_argument("--fig-dir", default=str(ARTIFACT_DIR / "figures"), help="Shared figure directory.")
parser.add_argument("--folds", type=int, default=10, help="Number of stratified CV folds.")
parser.add_argument("--random-state", type=int, default=42)
parser.add_argument("--max-features", type=int, default=12000)
parser.add_argument("--min-df", type=int, default=2)
parser.add_argument("--w2v-size", type=int, default=150)
parser.add_argument(
"--features",
nargs="+",
default=["tfidf", "word2vec"],
choices=["tfidf", "word2vec"],
)
return parser.parse_args()
def main() -> None:
args = parse_args()
out_dir = Path(args.out_dir)
fig_dir = Path(args.fig_dir)
model_dir = MODEL_DIR / "classical"
out_dir.mkdir(parents=True, exist_ok=True)
fig_dir.mkdir(parents=True, exist_ok=True)
model_dir.mkdir(parents=True, exist_ok=True)
df = load_binary_dataset(args.data)
texts = df["text"].to_numpy()
text_list = df["text"].tolist()
y = df["label"].to_numpy(dtype=int)
estimators = classifiers(args.random_state)
results: list[dict] = []
fold_frames: list[pd.DataFrame] = []
for feature in args.features:
for model_name, estimator in estimators.items():
if feature == "tfidf":
model_estimator = estimator if model_name != "gradient_boosting" else estimator
if model_name == "multinomial_nb":
model_estimator = MultinomialNB()
else:
model_estimator = estimator
print(f"Training {feature} + {model_name} with {args.folds}-fold CV")
aggregate, folds = evaluate_feature_model(
feature_name=feature,
model_name=model_name,
estimator=model_estimator,
texts=texts,
y=y,
folds=args.folds,
random_state=args.random_state,
max_features=args.max_features,
min_df=args.min_df,
w2v_size=args.w2v_size,
out_dir=out_dir,
)
results.append(aggregate)
fold_frames.append(folds)
results_df = pd.DataFrame(results).sort_values(["f1", "roc_auc", "accuracy"], ascending=False)
results_df.to_csv(out_dir / "results.csv", index=False)
pd.concat(fold_frames, ignore_index=True).to_csv(out_dir / "fold_metrics.csv", index=False)
best = results_df.iloc[0].to_dict()
best_feature = best["feature"]
best_model_name = best["model"]
best_estimator = estimators[best_model_name]
final_model = fit_final_model(
feature_name=best_feature,
model_name=best_model_name,
estimator=best_estimator,
texts=text_list,
y=y,
random_state=args.random_state,
max_features=args.max_features,
min_df=args.min_df,
w2v_size=args.w2v_size,
)
joblib.dump(final_model, model_dir / "best_model.joblib")
(model_dir / "metadata.json").write_text(
json.dumps(best, indent=2, ensure_ascii=False),
encoding="utf-8",
)
best_oof = pd.read_csv(out_dir / f"oof_{best_feature}_{best_model_name}.csv")
score = None if best_oof["score"].isna().any() else best_oof["score"].to_numpy()
plot_confusion(
best_oof["label"].to_numpy(),
best_oof["prediction"].to_numpy(),
fig_dir / "classical_best_confusion_matrix.png",
title=f"Classical best: {best_feature} + {best_model_name}",
)
plot_roc_curve(
best_oof["label"].to_numpy(),
score,
fig_dir / "classical_best_roc_auc.png",
title=f"Classical ROC AUC: {best_feature} + {best_model_name}",
)
top_words = extract_tfidf_top_words(text_list, y, args.max_features, args.min_df, out_dir)
write_keyword_counts(df, out_dir)
plot_top_words(top_words, fig_dir / "top_words_tfidf.png", title="Top words TF-IDF")
write_wordcloud(
df.loc[df["label"].eq(1), "text"].tolist(),
fig_dir / "wordcloud_positif.png",
title="Word cloud Positif",
)
write_wordcloud(
df.loc[df["label"].eq(0), "text"].tolist(),
fig_dir / "wordcloud_negatif.png",
title="Word cloud Negatif",
)
print("Best classical model:")
print(json.dumps(best, indent=2, ensure_ascii=False))
if __name__ == "__main__":
main()