File size: 4,778 Bytes
4e38daf
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
from cybersecurity_knowledge_graph.event_arg_role_dataloader import EventArgumentRoleDataset
from cybersecurity_knowledge_graph.utils import arg_2_role

import os
from transformers import AutoTokenizer
import optuna
from sklearn.model_selection import StratifiedKFold
from sklearn.model_selection import cross_val_score
from sklearn.metrics import make_scorer, f1_score
from sklearn.ensemble import VotingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.neural_network import MLPClassifier
from sklearn.svm import SVC
from joblib import dump, load
from sentence_transformers import SentenceTransformer
import numpy as np

embed_model = SentenceTransformer('all-MiniLM-L6-v2')

model_checkpoint = "ehsanaghaei/SecureBERT"

tokenizer = AutoTokenizer.from_pretrained(model_checkpoint, add_prefix_space=True)

classifiers = {}
folder_path = '/cybersecurity_knowledge_graph/arg_role_models'

for filename in os.listdir(os.getcwd() + folder_path):
    if filename.endswith('.joblib'):
        file_path = os.getcwd() + os.path.join(folder_path, filename)
        clf = load(file_path)
        arg = filename.split(".")[0]
        classifiers[arg] = clf

"""
Function: fit()
Description: This function performs a machine learning task to train and evaluate classifiers for multiple argument roles.
             It utilizes Optuna for hyperparameter optimization and creates a Voting Classifier.
             The trained classifiers are saved as joblib files.
"""
def fit():
    for arg, roles in arg_2_role.items():
        if len(roles) > 1:

            dataset = EventArgumentRoleDataset(path="./data/annotation/", tokenizer=tokenizer, arg=arg)
            dataset.load_data()
            dataset.train_val_test_split()


            X = [datapoint["embedding"] for datapoint in dataset.data]
            y = [roles.index(datapoint["label"]) for datapoint in dataset.data]


            # FYI: Objective functions can take additional arguments
            # (https://optuna.readthedocs.io/en/stable/faq.html#objective-func-additional-args).
            def objective(trial):

                classifier_name = trial.suggest_categorical("classifier", ["voting"])
                if classifier_name == "voting":
                    svc_c = trial.suggest_float("svc_c", 1e-3, 1e3, log=True)
                    svc_kernel = trial.suggest_categorical("kernel", ['rbf'])
                    classifier_obj = VotingClassifier(estimators=[
                        ('Logistic Regression', LogisticRegression()),
                        ('Neural Network', MLPClassifier(max_iter=500)),
                        ('Support Vector Machine', SVC(C=svc_c, kernel=svc_kernel))
                    ], voting='hard')

                f1_scorer = make_scorer(f1_score, average = "weighted")
                stratified_kfold = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
                cv_scores = cross_val_score(classifier_obj, X, y, cv=stratified_kfold, scoring=f1_scorer)
                return cv_scores.mean()


            study = optuna.create_study(direction="maximize")
            study.optimize(objective, n_trials=20)
            print(f"{arg} : {study.best_trial.values[0]}")

            best_clf = VotingClassifier(estimators=[
                        ('Logistic Regression', LogisticRegression()),
                        ('Neural Network', MLPClassifier(max_iter=500)),
                        ('Support Vector Machine', SVC(C=study.best_trial.params["svc_c"], kernel=study.best_trial.params["kernel"]))
                    ], voting='hard')

            best_clf.fit(X, y)
            dump(best_clf, f'{arg}.joblib')

"""
Function: get_arg_roles(event_args, doc)
Description: This function assigns argument roles to a list of event arguments within a document.
Inputs:
    - event_args: A list of event argument dictionaries, each containing information about an argument.
    - doc: A spaCy document representing the analyzed text.
Output:
    - The input 'event_args' list with updated 'role' values assigned to each argument.
"""
def get_arg_roles(event_args, doc):    
    for arg in event_args:
        if len(arg_2_role[arg["subtype"]]) > 1:
            sent = next(filter(lambda x : arg["startOffset"] >= x.start_char and arg["endOffset"] <= x.end_char, doc.sents))

            sent_embed = embed_model.encode(sent.text)
            arg_embed = embed_model.encode(arg["text"])
            embed = np.concatenate((sent_embed, arg_embed))

            arg_clf = classifiers[arg["subtype"]]
            role_id = arg_clf.predict(embed.reshape(1, -1))
            role = arg_2_role[arg["subtype"]][role_id[0]]

            arg["role"] = role
        else:
            arg["role"] = arg_2_role[arg["subtype"]][0]
    return event_args