AIBugHunter / utils.py
MickyMike's picture
Upload 14 files
f513a95
raw
history blame
9.24 kB
from transformers import RobertaTokenizer, T5Config, T5EncoderModel
from statement_t5 import StatementT5
import torch
import pickle
import numpy as np
import onnxruntime
def to_numpy(tensor):
""" get np input for onnx runtime model """
return tensor.detach().cpu().numpy() if tensor.requires_grad else tensor.cpu().numpy()
def predict_vul_lines(code: list, gpu: bool = False) -> dict:
"""Generate statement-level and function-level vulnerability prediction probabilities.
Parameters
----------
code : :obj:`list`
A list of String functions.
gpu : bool
Defines if CUDA inference is enabled
Returns
-------
:obj:`dict`
A dictionary with two keys, "batch_vul_pred", "batch_vul_pred_prob", and "batch_line_scores"
"batch_func_pred" stores a list of function-level vulnerability prediction: [0, 1, ...] where 0 means non-vulnerable and 1 means vulnerable
"batch_func_pred_prob" stores a list of function-level vulnerability prediction probabilities [0.89, 0.75, ...] corresponding to "batch_func_pred"
"batch_statement_pred" stores a list of statement-level vulnerability prediction: [0, 1, ...] where 0 means non-vulnerable and 1 means vulnerable
"batch_statement_pred_prob" stores a list of statement-level vulnerability prediction probabilities [0.89, 0.75, ...] corresponding to "batch_statement_pred"
"""
MAX_STATEMENTS = 155
MAX_STATEMENT_LENGTH = 20
DEVICE = 'cuda' if gpu else 'cpu'
# load tokenizer
tokenizer = RobertaTokenizer.from_pretrained("./statement_t5_tokenizer")
# load model
config = T5Config.from_pretrained("./t5_config.json")
model = T5EncoderModel(config=config)
model = StatementT5(model, tokenizer, device=DEVICE)
output_dir = "./models/statement_t5_model.bin"
model.load_state_dict(torch.load(output_dir, map_location=DEVICE))
model.to(DEVICE)
model.eval()
input_ids, statement_mask = statement_tokenization(code, MAX_STATEMENTS, MAX_STATEMENT_LENGTH, tokenizer)
with torch.no_grad():
statement_probs, func_probs = model(input_ids=input_ids, statement_mask=statement_mask)
func_preds = torch.argmax(func_probs, dim=-1)
statement_preds = torch.where(statement_probs>0.5, 1, 0)
return {"batch_func_pred": func_preds, "batch_func_pred_prob": func_probs,
"batch_statement_pred": statement_preds, "batch_statement_pred_prob": statement_probs}
def statement_tokenization(code: list, max_statements: int, max_statement_length: int, tokenizer):
batch_input_ids = []
batch_statement_mask = []
for c in code:
source = c.split("\n")
source = [statement for statement in source if statement != ""]
source = source[:max_statements]
padding_statement = [tokenizer.pad_token_id for _ in range(20)]
input_ids = []
for stat in source:
ids_ = tokenizer.encode(str(stat),
truncation=True,
max_length=max_statement_length,
padding='max_length',
add_special_tokens=False)
input_ids.append(ids_)
if len(input_ids) < max_statements:
for _ in range(max_statements-len(input_ids)):
input_ids.append(padding_statement)
statement_mask = []
for statement in input_ids:
if statement == padding_statement:
statement_mask.append(0)
else:
statement_mask.append(1)
batch_input_ids.append(input_ids)
batch_statement_mask.append(statement_mask)
return torch.tensor(batch_input_ids), torch.tensor(batch_statement_mask)
def predict_cweid(code: list, gpu: bool = False) -> dict:
"""Generate CWE-IDs and CWE Abstract Types Predictions.
Parameters
----------
code : :obj:`list`
A list of String functions.
gpu : bool
Defines if CUDA inference is enabled
Returns
-------
:obj:`dict`
A dictionary with four keys, "cwe_id", "cwe_id_prob", "cwe_type", "cwe_type_prob"
"cwe_id" stores a list of CWE-ID predictions: [CWE-787, CWE-119, ...]
"cwe_id_prob" stores a list of confidence scores of CWE-ID predictions [0.9, 0.7, ...]
"cwe_type" stores a list of CWE abstract types predictions: ["Base", "Class", ...]
"cwe_type_prob" stores a list of confidence scores of CWE abstract types predictions [0.9, 0.7, ...]
"""
provider = ["CUDAExecutionProvider", "CPUExecutionProvider"] if gpu else ["CPUExecutionProvider"]
with open("./inference-common/label_map.pkl", "rb") as f:
cwe_id_map, cwe_type_map = pickle.load(f)
# load tokenizer
tokenizer = RobertaTokenizer.from_pretrained("./inference-common/tokenizer")
tokenizer.add_tokens(["<cls_type>"])
tokenizer.cls_type_token = "<cls_type>"
model_input = []
for c in code:
code_tokens = tokenizer.tokenize(str(c))[:512 - 3]
source_tokens = [tokenizer.cls_token] + code_tokens + [tokenizer.cls_type_token] + [tokenizer.sep_token]
input_ids = tokenizer.convert_tokens_to_ids(source_tokens)
padding_length = 512 - len(input_ids)
input_ids += [tokenizer.pad_token_id] * padding_length
model_input.append(input_ids)
device = "cuda" if gpu else "cpu"
model_input = torch.tensor(model_input, device=device)
# onnx runtime session
ort_session = onnxruntime.InferenceSession("./models/cwe_model.onnx", providers=provider)
# compute ONNX Runtime output prediction
ort_inputs = {ort_session.get_inputs()[0].name: to_numpy(model_input)}
cwe_id_prob, cwe_type_prob = ort_session.run(None, ort_inputs)
# batch_cwe_id_pred (1D list with shape of [batch size]): [pred_1, pred_2, ..., pred_n]
batch_cwe_id = np.argmax(cwe_id_prob, axis=-1).tolist()
# map predicted idx back to CWE-ID
batch_cwe_id_pred = [cwe_id_map[str(idx)] for idx in batch_cwe_id]
# batch_cwe_id_pred_prob (1D list with shape of [batch_size]): [prob_1, prob_2, ..., prob_n]
batch_cwe_id_pred_prob = []
for i in range(len(cwe_id_prob)):
batch_cwe_id_pred_prob.append(cwe_id_prob[i][batch_cwe_id[i]].item())
# batch_cwe_type_pred (1D list with shape of [batch size]): [pred_1, pred_2, ..., pred_n]
batch_cwe_type = np.argmax(cwe_type_prob, axis=-1).tolist()
# map predicted idx back to CWE-Type
batch_cwe_type_pred = [cwe_type_map[str(idx)] for idx in batch_cwe_type]
# batch_cwe_type_pred_prob (1D list with shape of [batch_size]): [prob_1, prob_2, ..., prob_n]
batch_cwe_type_pred_prob = []
for i in range(len(cwe_type_prob)):
batch_cwe_type_pred_prob.append(cwe_type_prob[i][batch_cwe_type[i]].item())
return {"cwe_id": batch_cwe_id_pred,
"cwe_id_prob": batch_cwe_id_pred_prob,
"cwe_type": batch_cwe_type_pred,
"cwe_type_prob": batch_cwe_type_pred_prob}
def predict_sev(code: list, gpu: bool = False) -> dict:
"""Generate CVSS severity score predictions.
Parameters
----------
code : :obj:`list`
A list of String functions.
gpu : bool
Defines if CUDA inference is enabled
Returns
-------
:obj:`dict`
A dictionary with two keys, "batch_sev_score", "batch_sev_class"
"batch_sev_score" stores a list of severity score prediction: [1.0, 5.0, 9.0 ...]
"batch_sev_class" stores a list of severity class based on predicted severity score ["Medium", "Critical"...]
"""
provider = ["CUDAExecutionProvider", "CPUExecutionProvider"] if gpu else ["CPUExecutionProvider"]
# load tokenizer
tokenizer = RobertaTokenizer.from_pretrained("./inference-common/tokenizer")
model_input = tokenizer(code, truncation=True, max_length=512, padding='max_length',
return_tensors="pt").input_ids
# onnx runtime session
ort_session = onnxruntime.InferenceSession("./models/sev_model.onnx", providers=provider)
# compute ONNX Runtime output prediction
ort_inputs = {ort_session.get_inputs()[0].name: to_numpy(model_input)}
cvss_score = ort_session.run(None, ort_inputs)
batch_sev_score = list(cvss_score[0].flatten().tolist())
batch_sev_class = []
for i in range(len(batch_sev_score)):
if batch_sev_score[i] == 0:
batch_sev_class.append("None")
elif batch_sev_score[i] < 4:
batch_sev_class.append("Low")
elif batch_sev_score[i] < 7:
batch_sev_class.append("Medium")
elif batch_sev_score[i] < 9:
batch_sev_class.append("High")
else:
batch_sev_class.append("Critical")
return {"batch_sev_score": batch_sev_score, "batch_sev_class": batch_sev_class}
def predict(code: list):
vul_preds = predict_vul_lines(code)
cwe_preds = predict_cweid(code)
sev_preds = predict_sev(code)
if __name__ == "__main__":
import pandas as pd
df = pd.read_csv("./data/processed_test.csv")
funcs = df["func_before"].tolist()
for code in funcs:
out = predict_vul_lines([code])
print(out["batch_func_pred"][0])