from transformers import RobertaTokenizer, T5Config, T5EncoderModel from statement_t5 import StatementT5 import torch import pickle import numpy as np import onnxruntime def to_numpy(tensor): """ get np input for onnx runtime model """ return tensor.detach().cpu().numpy() if tensor.requires_grad else tensor.cpu().numpy() def predict_vul_lines(code: list, gpu: bool = False) -> dict: """Generate statement-level and function-level vulnerability prediction probabilities. Parameters ---------- code : :obj:`list` A list of String functions. gpu : bool Defines if CUDA inference is enabled Returns ------- :obj:`dict` A dictionary with two keys, "batch_vul_pred", "batch_vul_pred_prob", and "batch_line_scores" "batch_func_pred" stores a list of function-level vulnerability prediction: [0, 1, ...] where 0 means non-vulnerable and 1 means vulnerable "batch_func_pred_prob" stores a list of function-level vulnerability prediction probabilities [0.89, 0.75, ...] corresponding to "batch_func_pred" "batch_statement_pred" stores a list of statement-level vulnerability prediction: [0, 1, ...] where 0 means non-vulnerable and 1 means vulnerable "batch_statement_pred_prob" stores a list of statement-level vulnerability prediction probabilities [0.89, 0.75, ...] corresponding to "batch_statement_pred" """ MAX_STATEMENTS = 155 MAX_STATEMENT_LENGTH = 20 DEVICE = 'cuda' if gpu else 'cpu' # load tokenizer tokenizer = RobertaTokenizer.from_pretrained("./utils/statement_t5_tokenizer") # load model config = T5Config.from_pretrained("./utils/t5_config.json") model = T5EncoderModel(config=config) model = StatementT5(model, tokenizer, device=DEVICE) output_dir = "./models/statement_t5_model.bin" model.load_state_dict(torch.load(output_dir, map_location=DEVICE)) model.to(DEVICE) model.eval() input_ids, statement_mask = statement_tokenization(code, MAX_STATEMENTS, MAX_STATEMENT_LENGTH, tokenizer) with torch.no_grad(): statement_probs, func_probs = model(input_ids=input_ids, statement_mask=statement_mask) func_preds = torch.argmax(func_probs, dim=-1) statement_preds = torch.where(statement_probs>0.5, 1, 0) return {"batch_func_pred": func_preds, "batch_func_pred_prob": func_probs, "batch_statement_pred": statement_preds, "batch_statement_pred_prob": statement_probs} def statement_tokenization(code: list, max_statements: int, max_statement_length: int, tokenizer): batch_input_ids = [] batch_statement_mask = [] for c in code: source = c.split("\n") source = [statement for statement in source if statement != ""] source = source[:max_statements] padding_statement = [tokenizer.pad_token_id for _ in range(20)] input_ids = [] for stat in source: ids_ = tokenizer.encode(str(stat), truncation=True, max_length=max_statement_length, padding='max_length', add_special_tokens=False) input_ids.append(ids_) if len(input_ids) < max_statements: for _ in range(max_statements-len(input_ids)): input_ids.append(padding_statement) statement_mask = [] for statement in input_ids: if statement == padding_statement: statement_mask.append(0) else: statement_mask.append(1) batch_input_ids.append(input_ids) batch_statement_mask.append(statement_mask) return torch.tensor(batch_input_ids), torch.tensor(batch_statement_mask) def predict_cweid(code: list, gpu: bool = False) -> dict: """Generate CWE-IDs and CWE Abstract Types Predictions. Parameters ---------- code : :obj:`list` A list of String functions. gpu : bool Defines if CUDA inference is enabled Returns ------- :obj:`dict` A dictionary with four keys, "cwe_id", "cwe_id_prob", "cwe_type", "cwe_type_prob" "cwe_id" stores a list of CWE-ID predictions: [CWE-787, CWE-119, ...] "cwe_id_prob" stores a list of confidence scores of CWE-ID predictions [0.9, 0.7, ...] "cwe_type" stores a list of CWE abstract types predictions: ["Base", "Class", ...] "cwe_type_prob" stores a list of confidence scores of CWE abstract types predictions [0.9, 0.7, ...] """ provider = ["CUDAExecutionProvider", "CPUExecutionProvider"] if gpu else ["CPUExecutionProvider"] with open("./utils/label_map.pkl", "rb") as f: cwe_id_map, cwe_type_map = pickle.load(f) # load tokenizer tokenizer = RobertaTokenizer.from_pretrained("./utils/tokenizer") tokenizer.add_tokens([""]) tokenizer.cls_type_token = "" model_input = [] for c in code: code_tokens = tokenizer.tokenize(str(c))[:512 - 3] source_tokens = [tokenizer.cls_token] + code_tokens + [tokenizer.cls_type_token] + [tokenizer.sep_token] input_ids = tokenizer.convert_tokens_to_ids(source_tokens) padding_length = 512 - len(input_ids) input_ids += [tokenizer.pad_token_id] * padding_length model_input.append(input_ids) device = "cuda" if gpu else "cpu" model_input = torch.tensor(model_input, device=device) # onnx runtime session ort_session = onnxruntime.InferenceSession("./models/cwe_model.onnx", providers=provider) # compute ONNX Runtime output prediction ort_inputs = {ort_session.get_inputs()[0].name: to_numpy(model_input)} cwe_id_prob, cwe_type_prob = ort_session.run(None, ort_inputs) # batch_cwe_id_pred (1D list with shape of [batch size]): [pred_1, pred_2, ..., pred_n] batch_cwe_id = np.argmax(cwe_id_prob, axis=-1).tolist() # map predicted idx back to CWE-ID batch_cwe_id_pred = [cwe_id_map[str(idx)] for idx in batch_cwe_id] # batch_cwe_id_pred_prob (1D list with shape of [batch_size]): [prob_1, prob_2, ..., prob_n] batch_cwe_id_pred_prob = [] for i in range(len(cwe_id_prob)): batch_cwe_id_pred_prob.append(cwe_id_prob[i][batch_cwe_id[i]].item()) # batch_cwe_type_pred (1D list with shape of [batch size]): [pred_1, pred_2, ..., pred_n] batch_cwe_type = np.argmax(cwe_type_prob, axis=-1).tolist() # map predicted idx back to CWE-Type batch_cwe_type_pred = [cwe_type_map[str(idx)] for idx in batch_cwe_type] # batch_cwe_type_pred_prob (1D list with shape of [batch_size]): [prob_1, prob_2, ..., prob_n] batch_cwe_type_pred_prob = [] for i in range(len(cwe_type_prob)): batch_cwe_type_pred_prob.append(cwe_type_prob[i][batch_cwe_type[i]].item()) return {"cwe_id": batch_cwe_id_pred, "cwe_id_prob": batch_cwe_id_pred_prob, "cwe_type": batch_cwe_type_pred, "cwe_type_prob": batch_cwe_type_pred_prob} def predict_sev(code: list, gpu: bool = False) -> dict: """Generate CVSS severity score predictions. Parameters ---------- code : :obj:`list` A list of String functions. gpu : bool Defines if CUDA inference is enabled Returns ------- :obj:`dict` A dictionary with two keys, "batch_sev_score", "batch_sev_class" "batch_sev_score" stores a list of severity score prediction: [1.0, 5.0, 9.0 ...] "batch_sev_class" stores a list of severity class based on predicted severity score ["Medium", "Critical"...] """ provider = ["CUDAExecutionProvider", "CPUExecutionProvider"] if gpu else ["CPUExecutionProvider"] # load tokenizer tokenizer = RobertaTokenizer.from_pretrained("./utils/tokenizer") model_input = tokenizer(code, truncation=True, max_length=512, padding='max_length', return_tensors="pt").input_ids # onnx runtime session ort_session = onnxruntime.InferenceSession("./models/sev_model.onnx", providers=provider) # compute ONNX Runtime output prediction ort_inputs = {ort_session.get_inputs()[0].name: to_numpy(model_input)} cvss_score = ort_session.run(None, ort_inputs) batch_sev_score = list(cvss_score[0].flatten().tolist()) batch_sev_class = [] for i in range(len(batch_sev_score)): if batch_sev_score[i] == 0: batch_sev_class.append("None") elif batch_sev_score[i] < 4: batch_sev_class.append("Low") elif batch_sev_score[i] < 7: batch_sev_class.append("Medium") elif batch_sev_score[i] < 9: batch_sev_class.append("High") else: batch_sev_class.append("Critical") return {"batch_sev_score": batch_sev_score, "batch_sev_class": batch_sev_class} if __name__ == "__main__": import pandas as pd df = pd.read_csv("./data/processed_test.csv") funcs = df["func_before"].tolist() for code in funcs: out = predict_vul_lines([code]) print(out["batch_func_pred"][0])