File size: 4,810 Bytes
3698678
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
from datasets import DatasetDict, Dataset
import numpy as np
import pandas as pd
import torch
from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, f1_score
from transformers import AutoModel, AutoTokenizer

from .utils import serialize_data, load_data


class PreProcessor:
    def __init__(self, model_name, train_path:str, test_path:str, output_path:str):
        self.device = torch.device("cuda") if torch.cuda.is_available() else torch.device("cpu")
        self.model = AutoModel.from_pretrained(model_name).to(self.device)
        self.tokenizer = AutoTokenizer.from_pretrained(model_name)
        self.df_train = pd.read_csv(train_path, sep="\t")
        self.df_test = pd.read_csv(test_path, sep="\t")
        self.output_path = output_path

    def _get_datasetdict_object(self):
        mapper = {"#2_tweet": "tweet", "#3_country_label": "label"}
        columns_to_keep = ["tweet", "label"]

        df_train = self.df_train.rename(columns=mapper)[columns_to_keep]
        df_test = self.df_test.rename(columns=mapper)[columns_to_keep]

        train_dataset = Dataset.from_pandas(df_train)
        test_dataset = Dataset.from_pandas(df_test)
        data = DatasetDict({'train': train_dataset, 'test': test_dataset})

        return data
    
    def _tokenize(self, batch):
        return self.tokenizer(batch["tweet"], padding=True)

    def _encode_data(self, data):
        data_encoded = data.map(self._tokenize, batched=True, batch_size=None)
        return data_encoded

    def _extract_hidden_states(self, batch):
        inputs = {k:v.to(self.device) for k,v in batch.items()
                    if k in self.tokenizer.model_input_names}
        with torch.no_grad():
            last_hidden_state = self.model(**inputs).last_hidden_state

        return {"hidden_state": last_hidden_state[:,0].cpu().numpy()}
    
    def _get_features(self, data_encoded):
        data_encoded.set_format("torch", columns=["input_ids", "attention_mask", "label"])
        data_hidden = data_encoded.map(self._extract_hidden_states, batched=True, batch_size=50)
        return data_hidden  
        
    def preprocess_data(self):
        data = self._get_datasetdict_object()
        data_encoded = self._encode_data(data)
        data_hidden = self._get_features(data_encoded)
        serialize_data(data_hidden, output_path=self.output_path)


class Model():
    def __init__(self, data_input_path:str, model_name:str):
        self.model_name = model_name
        self.model = None
        self.data = load_data(input_path=data_input_path)
        self.X_train = np.array(self.data["train"]["hidden_state"])
        self.X_test = np.array(self.data["test"]["hidden_state"])
        self.y_train = np.array(self.data["train"]["label"])
        self.y_test = np.array(self.data["test"]["label"])

    def _train_logistic_regression(X_train, y_train):
        lr_model = LogisticRegression(multi_class='multinomial', 
                                    class_weight="balanced", 
                                    max_iter=1000, 
                                    random_state=2024)
        lr_model.fit(X_train, y_train)
        return lr_model

    def train_model(self, output_path):
        if self.model_name != "lr":
            raise ValueError(f"Model name {self.model_name} does not exist. Please try 'lr'!")

        lr_model = self._train_logistic_regression(self.X_train, self.y_train)
        self.model = lr_model
        serialize_data(lr_model, output_path)
    
    def _get_metrics(self, y_true, y_preds):
        accuracy = accuracy_score(y_true, y_preds)
        f1_macro = f1_score(y_true, y_preds, average="macro")
        f1_weighted = f1_score(y_true, y_preds, average="weighted")
        print(f"Accuracy: {accuracy}")
        print(f"F1 macro average: {f1_macro}")
        print(f"F1 weighted average: {f1_weighted}")

    def evaluate_predictions(self):
        train_preds = self.model.predict(self.X_train)
        test_preds = self.model.predict(self.X_test)

        print(self.model_name)
        print("\nTrain set:")
        self._get_metrics(self.y_train, train_preds)
        print("-"*50)
        print("Test set:")
        self._get_metrics(self.y_test, test_preds)


def main():
    file_path = "../data/data_hidden.pkl"
    preprocessor = PreProcessor(model_name="moussaKam/AraBART",
                                train_path="../data/DA_train_labeled.tsv",
                                test_path="../data/DA_dev_labeled.tsv",
                                output_path=file_path)
    preprocessor.preprocess_data()
    model = Model(data_input_path=file_path, model_name="lr")
    model.train_model("../models/logistic_regression.pkl")
    model.evaluate_predictions()

if __name__ == "__main__":
    main()