wordify / src /wordifier.py
Pietro Lesci
reproducibility
11bd087
raw
history blame
No virus
4.98 kB
from typing import Dict, List, Tuple
import numpy as np
import pandas as pd
import streamlit as st
from pandas.core.frame import DataFrame
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import LabelEncoder
from sklearn.utils import resample
from .configs import InputTransformConfigs, ModelConfigs
def input_transform(
text: pd.Series, labels: pd.Series, configs=InputTransformConfigs
) -> Dict[str, np.ndarray]:
"""
Encodes text in mathematical object ameanable to training algorithm
"""
tfidf_vectorizer = TfidfVectorizer(
input="content", # default: file already in memory
encoding="utf-8", # default
decode_error="strict", # default
strip_accents=None, # do nothing
lowercase=False, # do nothing
preprocessor=None, # do nothing - default
tokenizer=None, # default
stop_words=None, # do nothing
analyzer="word",
ngram_range=configs.NGRAM_RANGE.value, # maximum 3-ngrams
min_df=configs.MIN_DF.value,
max_df=configs.MAX_DF.value,
sublinear_tf=configs.SUBLINEAR.value,
)
label_encoder = LabelEncoder()
X = tfidf_vectorizer.fit_transform(text.values)
y = label_encoder.fit_transform(labels.values)
return {
"X": X,
"y": y,
"X_names": np.array(tfidf_vectorizer.get_feature_names_out()),
"y_names": label_encoder.classes_,
}
def wordifier(
X: np.ndarray,
y: np.ndarray,
X_names: List[str],
y_names: List[str],
configs=ModelConfigs,
) -> List[Tuple[str, float, str]]:
n_instances, n_features = X.shape
n_classes = len(y_names)
# NOTE: the * 10 / 10 trick is to have "nice" round-ups
sample_fraction = np.ceil((n_features / n_instances) * 10) / 10
sample_size = min(
# this is the maximum supported
configs.MAX_SELECTION.value,
# at minimum you want MIN_SELECTION but in general you want
# n_instances * sample_fraction
max(configs.MIN_SELECTION.value, int(n_instances * sample_fraction)),
# however if previous one is bigger the the available instances take
# the number of available instances
n_instances,
)
# TODO: might want to try out something to subsample features at each iteration
# initialize coefficient matrices
pos_scores = np.zeros((n_classes, n_features), dtype=int)
neg_scores = np.zeros((n_classes, n_features), dtype=int)
pbar = st.progress(0)
for i, _ in enumerate(range(configs.NUM_ITERS.value)):
# run randomized regression
clf = LogisticRegression(
penalty="l1",
C=configs.PENALTIES.value[np.random.randint(len(configs.PENALTIES.value))],
solver="liblinear",
multi_class="auto",
max_iter=500,
class_weight="balanced",
random_state=42,
)
# sample indices to subsample matrix
selection = resample(
np.arange(n_instances), replace=True, stratify=y, n_samples=sample_size
)
# fit
try:
clf.fit(X[selection], y[selection])
except ValueError:
continue
# record coefficients
if n_classes == 2:
pos_scores[1] = pos_scores[1] + (clf.coef_ > 0.0)
neg_scores[1] = neg_scores[1] + (clf.coef_ < 0.0)
pos_scores[0] = pos_scores[0] + (clf.coef_ < 0.0)
neg_scores[0] = neg_scores[0] + (clf.coef_ > 0.0)
else:
pos_scores += clf.coef_ > 0
neg_scores += clf.coef_ < 0
pbar.progress(round(i / configs.NUM_ITERS.value, 1))
# normalize
pos_scores = pos_scores / configs.NUM_ITERS.value
neg_scores = neg_scores / configs.NUM_ITERS.value
# get only active features
pos_positions = np.where(
pos_scores >= configs.SELECTION_THRESHOLD.value, pos_scores, 0
)
neg_positions = np.where(
neg_scores >= configs.SELECTION_THRESHOLD.value, neg_scores, 0
)
# prepare DataFrame
pos = [
(X_names[i], pos_scores[c, i], y_names[c])
for c, i in zip(*pos_positions.nonzero())
]
neg = [
(X_names[i], neg_scores[c, i], y_names[c])
for c, i in zip(*neg_positions.nonzero())
]
return pos, neg
def output_transform(
pos: List[Tuple[str, float, str]], neg: List[Tuple[str, float, str]]
) -> DataFrame:
posdf = pd.DataFrame(pos, columns="word score label".split()).sort_values(
["label", "score"], ascending=False
)
posdf["correlation"] = "positive"
negdf = pd.DataFrame(neg, columns="word score label".split()).sort_values(
["label", "score"], ascending=False
)
negdf["correlation"] = "negative"
output = pd.concat([posdf, negdf], ignore_index=False, axis=0)
output.columns = output.columns.str.title()
return output