File size: 4,983 Bytes
b748dad
 
02c2d7e
 
 
b748dad
 
02c2d7e
b748dad
02c2d7e
 
b748dad
 
 
e48d543
 
 
b748dad
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
02c2d7e
b748dad
 
 
 
 
 
02c2d7e
 
b748dad
e48d543
 
 
 
 
b748dad
02c2d7e
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
b748dad
 
 
 
 
 
 
 
 
 
 
11bd087
51cab9d
02c2d7e
b748dad
e48d543
 
 
b748dad
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
e48d543
 
 
 
 
 
b748dad
 
e48d543
 
 
 
 
 
 
 
b748dad
 
 
 
e48d543
 
 
 
 
 
b748dad
e48d543
 
 
b748dad
 
 
 
02c2d7e
b748dad
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
from typing import Dict, List, Tuple

import numpy as np
import pandas as pd
import streamlit as st
from pandas.core.frame import DataFrame
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.linear_model import LogisticRegression
from sklearn.preprocessing import LabelEncoder
from sklearn.utils import resample

from .configs import InputTransformConfigs, ModelConfigs


def input_transform(
    text: pd.Series, labels: pd.Series, configs=InputTransformConfigs
) -> Dict[str, np.ndarray]:
    """
    Encodes text in mathematical object ameanable to training algorithm
    """
    tfidf_vectorizer = TfidfVectorizer(
        input="content",  # default: file already in memory
        encoding="utf-8",  # default
        decode_error="strict",  # default
        strip_accents=None,  # do nothing
        lowercase=False,  # do nothing
        preprocessor=None,  # do nothing - default
        tokenizer=None,  # default
        stop_words=None,  # do nothing
        analyzer="word",
        ngram_range=configs.NGRAM_RANGE.value,  # maximum 3-ngrams
        min_df=configs.MIN_DF.value,
        max_df=configs.MAX_DF.value,
        sublinear_tf=configs.SUBLINEAR.value,
    )
    label_encoder = LabelEncoder()

    X = tfidf_vectorizer.fit_transform(text.values)
    y = label_encoder.fit_transform(labels.values)

    return {
        "X": X,
        "y": y,
        "X_names": np.array(tfidf_vectorizer.get_feature_names_out()),
        "y_names": label_encoder.classes_,
    }


def wordifier(
    X: np.ndarray,
    y: np.ndarray,
    X_names: List[str],
    y_names: List[str],
    configs=ModelConfigs,
) -> List[Tuple[str, float, str]]:

    n_instances, n_features = X.shape
    n_classes = len(y_names)

    # NOTE: the * 10 / 10 trick is to have "nice" round-ups
    sample_fraction = np.ceil((n_features / n_instances) * 10) / 10

    sample_size = min(
        # this is the maximum supported
        configs.MAX_SELECTION.value,
        # at minimum you want MIN_SELECTION but in general you want
        # n_instances * sample_fraction
        max(configs.MIN_SELECTION.value, int(n_instances * sample_fraction)),
        # however if previous one is bigger the the available instances take
        # the number of available instances
        n_instances,
    )

    # TODO: might want to try out something to subsample features at each iteration

    # initialize coefficient matrices
    pos_scores = np.zeros((n_classes, n_features), dtype=int)
    neg_scores = np.zeros((n_classes, n_features), dtype=int)

    pbar = st.progress(0)
    for i, _ in enumerate(range(configs.NUM_ITERS.value)):

        # run randomized regression
        clf = LogisticRegression(
            penalty="l1",
            C=configs.PENALTIES.value[np.random.randint(len(configs.PENALTIES.value))],
            solver="liblinear",
            multi_class="auto",
            max_iter=500,
            class_weight="balanced",
            random_state=42,
        )

        # sample indices to subsample matrix
        selection = resample(
            np.arange(n_instances), replace=True, stratify=y, n_samples=sample_size
        )

        # fit
        try:
            clf.fit(X[selection], y[selection])
        except ValueError:
            continue

        # record coefficients
        if n_classes == 2:
            pos_scores[1] = pos_scores[1] + (clf.coef_ > 0.0)
            neg_scores[1] = neg_scores[1] + (clf.coef_ < 0.0)
            pos_scores[0] = pos_scores[0] + (clf.coef_ < 0.0)
            neg_scores[0] = neg_scores[0] + (clf.coef_ > 0.0)
        else:
            pos_scores += clf.coef_ > 0
            neg_scores += clf.coef_ < 0

        pbar.progress(round(i / configs.NUM_ITERS.value, 1))

    # normalize
    pos_scores = pos_scores / configs.NUM_ITERS.value
    neg_scores = neg_scores / configs.NUM_ITERS.value

    # get only active features
    pos_positions = np.where(
        pos_scores >= configs.SELECTION_THRESHOLD.value, pos_scores, 0
    )
    neg_positions = np.where(
        neg_scores >= configs.SELECTION_THRESHOLD.value, neg_scores, 0
    )

    # prepare DataFrame
    pos = [
        (X_names[i], pos_scores[c, i], y_names[c])
        for c, i in zip(*pos_positions.nonzero())
    ]
    neg = [
        (X_names[i], neg_scores[c, i], y_names[c])
        for c, i in zip(*neg_positions.nonzero())
    ]

    return pos, neg


def output_transform(
    pos: List[Tuple[str, float, str]], neg: List[Tuple[str, float, str]]
) -> DataFrame:
    posdf = pd.DataFrame(pos, columns="word score label".split()).sort_values(
        ["label", "score"], ascending=False
    )
    posdf["correlation"] = "positive"
    negdf = pd.DataFrame(neg, columns="word score label".split()).sort_values(
        ["label", "score"], ascending=False
    )
    negdf["correlation"] = "negative"

    output = pd.concat([posdf, negdf], ignore_index=False, axis=0)
    output.columns = output.columns.str.title()

    return output