File size: 9,077 Bytes
f513a95
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0700577
f513a95
0700577
f513a95
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0700577
f513a95
 
0700577
f513a95
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0700577
f513a95
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
0700577
f513a95
 
 
 
 
 
 
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
from transformers import RobertaTokenizer, T5Config, T5EncoderModel
from statement_t5 import StatementT5
import torch
import pickle
import numpy as np
import onnxruntime

def to_numpy(tensor):
    """ get np input for onnx runtime model """
    return tensor.detach().cpu().numpy() if tensor.requires_grad else tensor.cpu().numpy()

def predict_vul_lines(code: list, gpu: bool = False) -> dict:
    """Generate statement-level and function-level vulnerability prediction probabilities.
    Parameters
    ----------
    code : :obj:`list`
        A list of String functions.
    gpu : bool
        Defines if CUDA inference is enabled
    Returns
    -------
    :obj:`dict`
        A dictionary with two keys, "batch_vul_pred", "batch_vul_pred_prob", and "batch_line_scores"
        "batch_func_pred" stores a list of function-level vulnerability prediction: [0, 1, ...] where 0 means non-vulnerable and 1 means vulnerable
        "batch_func_pred_prob" stores a list of function-level vulnerability prediction probabilities [0.89, 0.75, ...] corresponding to "batch_func_pred"
        "batch_statement_pred" stores a list of statement-level vulnerability prediction: [0, 1, ...] where 0 means non-vulnerable and 1 means vulnerable
        "batch_statement_pred_prob" stores a list of statement-level vulnerability prediction probabilities [0.89, 0.75, ...] corresponding to "batch_statement_pred"
    """
    MAX_STATEMENTS = 155
    MAX_STATEMENT_LENGTH = 20
    DEVICE = 'cuda' if gpu else 'cpu'
    # load tokenizer
    tokenizer = RobertaTokenizer.from_pretrained("./utils/statement_t5_tokenizer")
    # load model
    config = T5Config.from_pretrained("./utils/t5_config.json")
    model = T5EncoderModel(config=config)    
    model = StatementT5(model, tokenizer, device=DEVICE)
    output_dir = "./models/statement_t5_model.bin"
    model.load_state_dict(torch.load(output_dir, map_location=DEVICE))
    model.to(DEVICE)
    model.eval()
    input_ids, statement_mask = statement_tokenization(code, MAX_STATEMENTS, MAX_STATEMENT_LENGTH, tokenizer)
    with torch.no_grad():
        statement_probs, func_probs = model(input_ids=input_ids, statement_mask=statement_mask)
    func_preds = torch.argmax(func_probs, dim=-1)
    statement_preds = torch.where(statement_probs>0.5, 1, 0)
    return {"batch_func_pred": func_preds, "batch_func_pred_prob": func_probs,
            "batch_statement_pred": statement_preds, "batch_statement_pred_prob": statement_probs}

def statement_tokenization(code: list, max_statements: int, max_statement_length: int, tokenizer):
    batch_input_ids = []
    batch_statement_mask = []
    for c in code:
        source = c.split("\n")
        source = [statement for statement in source if statement != ""]
        
        source = source[:max_statements]
        padding_statement = [tokenizer.pad_token_id for _ in range(20)]
        
        input_ids = []
        for stat in source:
            ids_ = tokenizer.encode(str(stat),
                                    truncation=True,
                                    max_length=max_statement_length,
                                    padding='max_length',
                                    add_special_tokens=False)
            input_ids.append(ids_)
        if len(input_ids) < max_statements:
            for _ in range(max_statements-len(input_ids)):
                input_ids.append(padding_statement)
        statement_mask = []
        for statement in input_ids:
            if statement == padding_statement:
                statement_mask.append(0)
            else:
                statement_mask.append(1)
        batch_input_ids.append(input_ids)
        batch_statement_mask.append(statement_mask)
    return torch.tensor(batch_input_ids), torch.tensor(batch_statement_mask)

def predict_cweid(code: list, gpu: bool = False) -> dict:
    """Generate CWE-IDs and CWE Abstract Types Predictions.
    Parameters
    ----------
    code : :obj:`list`
        A list of String functions.
    gpu : bool
        Defines if CUDA inference is enabled
    Returns
    -------
    :obj:`dict`
        A dictionary with four keys, "cwe_id", "cwe_id_prob", "cwe_type", "cwe_type_prob"
        "cwe_id" stores a list of CWE-ID predictions: [CWE-787, CWE-119, ...]
        "cwe_id_prob" stores a list of confidence scores of CWE-ID predictions [0.9, 0.7, ...]
        "cwe_type" stores a list of CWE abstract types predictions: ["Base", "Class", ...]
        "cwe_type_prob" stores a list of confidence scores of CWE abstract types predictions [0.9, 0.7, ...]
    """
    provider = ["CUDAExecutionProvider", "CPUExecutionProvider"] if gpu else ["CPUExecutionProvider"]
    with open("./utils/label_map.pkl", "rb") as f:
        cwe_id_map, cwe_type_map = pickle.load(f)
    # load tokenizer
    tokenizer = RobertaTokenizer.from_pretrained("./utils/tokenizer")
    tokenizer.add_tokens(["<cls_type>"])
    tokenizer.cls_type_token = "<cls_type>"
    model_input = []
    for c in code:
        code_tokens = tokenizer.tokenize(str(c))[:512 - 3]
        source_tokens = [tokenizer.cls_token] + code_tokens + [tokenizer.cls_type_token] + [tokenizer.sep_token]
        input_ids = tokenizer.convert_tokens_to_ids(source_tokens)
        padding_length = 512 - len(input_ids)
        input_ids += [tokenizer.pad_token_id] * padding_length
        model_input.append(input_ids)
    device = "cuda" if gpu else "cpu"
    model_input = torch.tensor(model_input, device=device)
    # onnx runtime session
    ort_session = onnxruntime.InferenceSession("./models/cwe_model.onnx", providers=provider)
    # compute ONNX Runtime output prediction
    ort_inputs = {ort_session.get_inputs()[0].name: to_numpy(model_input)}
    cwe_id_prob, cwe_type_prob = ort_session.run(None, ort_inputs)
    # batch_cwe_id_pred (1D list with shape of [batch size]): [pred_1, pred_2, ..., pred_n]
    batch_cwe_id = np.argmax(cwe_id_prob, axis=-1).tolist()
    # map predicted idx back to CWE-ID
    batch_cwe_id_pred = [cwe_id_map[str(idx)] for idx in batch_cwe_id]
    # batch_cwe_id_pred_prob (1D list with shape of [batch_size]): [prob_1, prob_2, ..., prob_n]
    batch_cwe_id_pred_prob = []
    for i in range(len(cwe_id_prob)):
        batch_cwe_id_pred_prob.append(cwe_id_prob[i][batch_cwe_id[i]].item())
    # batch_cwe_type_pred (1D list with shape of [batch size]): [pred_1, pred_2, ..., pred_n]
    batch_cwe_type = np.argmax(cwe_type_prob, axis=-1).tolist()
    # map predicted idx back to CWE-Type
    batch_cwe_type_pred = [cwe_type_map[str(idx)] for idx in batch_cwe_type]
    # batch_cwe_type_pred_prob (1D list with shape of [batch_size]): [prob_1, prob_2, ..., prob_n]
    batch_cwe_type_pred_prob = []
    for i in range(len(cwe_type_prob)):
        batch_cwe_type_pred_prob.append(cwe_type_prob[i][batch_cwe_type[i]].item())
    return {"cwe_id": batch_cwe_id_pred,
            "cwe_id_prob": batch_cwe_id_pred_prob,
            "cwe_type": batch_cwe_type_pred,
            "cwe_type_prob": batch_cwe_type_pred_prob}
    
def predict_sev(code: list, gpu: bool = False) -> dict:
    """Generate CVSS severity score predictions.
    Parameters
    ----------
    code : :obj:`list`
        A list of String functions.
    gpu : bool
        Defines if CUDA inference is enabled
    Returns
    -------
    :obj:`dict`
        A dictionary with two keys, "batch_sev_score", "batch_sev_class"
        "batch_sev_score" stores a list of severity score prediction: [1.0, 5.0, 9.0 ...]
        "batch_sev_class" stores a list of severity class based on predicted severity score ["Medium", "Critical"...]
    """
    provider = ["CUDAExecutionProvider", "CPUExecutionProvider"] if gpu else ["CPUExecutionProvider"]
    # load tokenizer
    tokenizer = RobertaTokenizer.from_pretrained("./utils/tokenizer")
    model_input = tokenizer(code, truncation=True, max_length=512, padding='max_length',
                            return_tensors="pt").input_ids
    # onnx runtime session
    ort_session = onnxruntime.InferenceSession("./models/sev_model.onnx", providers=provider)
    # compute ONNX Runtime output prediction
    ort_inputs = {ort_session.get_inputs()[0].name: to_numpy(model_input)}
    cvss_score = ort_session.run(None, ort_inputs)
    batch_sev_score = list(cvss_score[0].flatten().tolist())
    batch_sev_class = []
    for i in range(len(batch_sev_score)):
        if batch_sev_score[i] == 0:
            batch_sev_class.append("None")
        elif batch_sev_score[i] < 4:
            batch_sev_class.append("Low")
        elif batch_sev_score[i] < 7:
            batch_sev_class.append("Medium")
        elif batch_sev_score[i] < 9:
            batch_sev_class.append("High")
        else:
            batch_sev_class.append("Critical")
    return {"batch_sev_score": batch_sev_score, "batch_sev_class": batch_sev_class}
 
if __name__ == "__main__":
    import pandas as pd
    df = pd.read_csv("./data/processed_test.csv")
    funcs = df["func_before"].tolist()
    for code in funcs:        
        out = predict_vul_lines([code])
        print(out["batch_func_pred"][0])