Spaces:
Runtime error
Runtime error
from transformers import RobertaTokenizer, T5Config, T5EncoderModel | |
from statement_t5 import StatementT5 | |
import torch | |
import pickle | |
import numpy as np | |
import onnxruntime | |
def to_numpy(tensor): | |
""" get np input for onnx runtime model """ | |
return tensor.detach().cpu().numpy() if tensor.requires_grad else tensor.cpu().numpy() | |
def predict_vul_lines(code: list, gpu: bool = False) -> dict: | |
"""Generate statement-level and function-level vulnerability prediction probabilities. | |
Parameters | |
---------- | |
code : :obj:`list` | |
A list of String functions. | |
gpu : bool | |
Defines if CUDA inference is enabled | |
Returns | |
------- | |
:obj:`dict` | |
A dictionary with two keys, "batch_vul_pred", "batch_vul_pred_prob", and "batch_line_scores" | |
"batch_func_pred" stores a list of function-level vulnerability prediction: [0, 1, ...] where 0 means non-vulnerable and 1 means vulnerable | |
"batch_func_pred_prob" stores a list of function-level vulnerability prediction probabilities [0.89, 0.75, ...] corresponding to "batch_func_pred" | |
"batch_statement_pred" stores a list of statement-level vulnerability prediction: [0, 1, ...] where 0 means non-vulnerable and 1 means vulnerable | |
"batch_statement_pred_prob" stores a list of statement-level vulnerability prediction probabilities [0.89, 0.75, ...] corresponding to "batch_statement_pred" | |
""" | |
MAX_STATEMENTS = 155 | |
MAX_STATEMENT_LENGTH = 20 | |
DEVICE = 'cuda' if gpu else 'cpu' | |
# load tokenizer | |
tokenizer = RobertaTokenizer.from_pretrained("./statement_t5_tokenizer") | |
# load model | |
config = T5Config.from_pretrained("./t5_config.json") | |
model = T5EncoderModel(config=config) | |
model = StatementT5(model, tokenizer, device=DEVICE) | |
output_dir = "./models/statement_t5_model.bin" | |
model.load_state_dict(torch.load(output_dir, map_location=DEVICE)) | |
model.to(DEVICE) | |
model.eval() | |
input_ids, statement_mask = statement_tokenization(code, MAX_STATEMENTS, MAX_STATEMENT_LENGTH, tokenizer) | |
with torch.no_grad(): | |
statement_probs, func_probs = model(input_ids=input_ids, statement_mask=statement_mask) | |
func_preds = torch.argmax(func_probs, dim=-1) | |
statement_preds = torch.where(statement_probs>0.5, 1, 0) | |
return {"batch_func_pred": func_preds, "batch_func_pred_prob": func_probs, | |
"batch_statement_pred": statement_preds, "batch_statement_pred_prob": statement_probs} | |
def statement_tokenization(code: list, max_statements: int, max_statement_length: int, tokenizer): | |
batch_input_ids = [] | |
batch_statement_mask = [] | |
for c in code: | |
source = c.split("\n") | |
source = [statement for statement in source if statement != ""] | |
source = source[:max_statements] | |
padding_statement = [tokenizer.pad_token_id for _ in range(20)] | |
input_ids = [] | |
for stat in source: | |
ids_ = tokenizer.encode(str(stat), | |
truncation=True, | |
max_length=max_statement_length, | |
padding='max_length', | |
add_special_tokens=False) | |
input_ids.append(ids_) | |
if len(input_ids) < max_statements: | |
for _ in range(max_statements-len(input_ids)): | |
input_ids.append(padding_statement) | |
statement_mask = [] | |
for statement in input_ids: | |
if statement == padding_statement: | |
statement_mask.append(0) | |
else: | |
statement_mask.append(1) | |
batch_input_ids.append(input_ids) | |
batch_statement_mask.append(statement_mask) | |
return torch.tensor(batch_input_ids), torch.tensor(batch_statement_mask) | |
def predict_cweid(code: list, gpu: bool = False) -> dict: | |
"""Generate CWE-IDs and CWE Abstract Types Predictions. | |
Parameters | |
---------- | |
code : :obj:`list` | |
A list of String functions. | |
gpu : bool | |
Defines if CUDA inference is enabled | |
Returns | |
------- | |
:obj:`dict` | |
A dictionary with four keys, "cwe_id", "cwe_id_prob", "cwe_type", "cwe_type_prob" | |
"cwe_id" stores a list of CWE-ID predictions: [CWE-787, CWE-119, ...] | |
"cwe_id_prob" stores a list of confidence scores of CWE-ID predictions [0.9, 0.7, ...] | |
"cwe_type" stores a list of CWE abstract types predictions: ["Base", "Class", ...] | |
"cwe_type_prob" stores a list of confidence scores of CWE abstract types predictions [0.9, 0.7, ...] | |
""" | |
provider = ["CUDAExecutionProvider", "CPUExecutionProvider"] if gpu else ["CPUExecutionProvider"] | |
with open("./inference-common/label_map.pkl", "rb") as f: | |
cwe_id_map, cwe_type_map = pickle.load(f) | |
# load tokenizer | |
tokenizer = RobertaTokenizer.from_pretrained("./inference-common/tokenizer") | |
tokenizer.add_tokens(["<cls_type>"]) | |
tokenizer.cls_type_token = "<cls_type>" | |
model_input = [] | |
for c in code: | |
code_tokens = tokenizer.tokenize(str(c))[:512 - 3] | |
source_tokens = [tokenizer.cls_token] + code_tokens + [tokenizer.cls_type_token] + [tokenizer.sep_token] | |
input_ids = tokenizer.convert_tokens_to_ids(source_tokens) | |
padding_length = 512 - len(input_ids) | |
input_ids += [tokenizer.pad_token_id] * padding_length | |
model_input.append(input_ids) | |
device = "cuda" if gpu else "cpu" | |
model_input = torch.tensor(model_input, device=device) | |
# onnx runtime session | |
ort_session = onnxruntime.InferenceSession("./models/cwe_model.onnx", providers=provider) | |
# compute ONNX Runtime output prediction | |
ort_inputs = {ort_session.get_inputs()[0].name: to_numpy(model_input)} | |
cwe_id_prob, cwe_type_prob = ort_session.run(None, ort_inputs) | |
# batch_cwe_id_pred (1D list with shape of [batch size]): [pred_1, pred_2, ..., pred_n] | |
batch_cwe_id = np.argmax(cwe_id_prob, axis=-1).tolist() | |
# map predicted idx back to CWE-ID | |
batch_cwe_id_pred = [cwe_id_map[str(idx)] for idx in batch_cwe_id] | |
# batch_cwe_id_pred_prob (1D list with shape of [batch_size]): [prob_1, prob_2, ..., prob_n] | |
batch_cwe_id_pred_prob = [] | |
for i in range(len(cwe_id_prob)): | |
batch_cwe_id_pred_prob.append(cwe_id_prob[i][batch_cwe_id[i]].item()) | |
# batch_cwe_type_pred (1D list with shape of [batch size]): [pred_1, pred_2, ..., pred_n] | |
batch_cwe_type = np.argmax(cwe_type_prob, axis=-1).tolist() | |
# map predicted idx back to CWE-Type | |
batch_cwe_type_pred = [cwe_type_map[str(idx)] for idx in batch_cwe_type] | |
# batch_cwe_type_pred_prob (1D list with shape of [batch_size]): [prob_1, prob_2, ..., prob_n] | |
batch_cwe_type_pred_prob = [] | |
for i in range(len(cwe_type_prob)): | |
batch_cwe_type_pred_prob.append(cwe_type_prob[i][batch_cwe_type[i]].item()) | |
return {"cwe_id": batch_cwe_id_pred, | |
"cwe_id_prob": batch_cwe_id_pred_prob, | |
"cwe_type": batch_cwe_type_pred, | |
"cwe_type_prob": batch_cwe_type_pred_prob} | |
def predict_sev(code: list, gpu: bool = False) -> dict: | |
"""Generate CVSS severity score predictions. | |
Parameters | |
---------- | |
code : :obj:`list` | |
A list of String functions. | |
gpu : bool | |
Defines if CUDA inference is enabled | |
Returns | |
------- | |
:obj:`dict` | |
A dictionary with two keys, "batch_sev_score", "batch_sev_class" | |
"batch_sev_score" stores a list of severity score prediction: [1.0, 5.0, 9.0 ...] | |
"batch_sev_class" stores a list of severity class based on predicted severity score ["Medium", "Critical"...] | |
""" | |
provider = ["CUDAExecutionProvider", "CPUExecutionProvider"] if gpu else ["CPUExecutionProvider"] | |
# load tokenizer | |
tokenizer = RobertaTokenizer.from_pretrained("./inference-common/tokenizer") | |
model_input = tokenizer(code, truncation=True, max_length=512, padding='max_length', | |
return_tensors="pt").input_ids | |
# onnx runtime session | |
ort_session = onnxruntime.InferenceSession("./models/sev_model.onnx", providers=provider) | |
# compute ONNX Runtime output prediction | |
ort_inputs = {ort_session.get_inputs()[0].name: to_numpy(model_input)} | |
cvss_score = ort_session.run(None, ort_inputs) | |
batch_sev_score = list(cvss_score[0].flatten().tolist()) | |
batch_sev_class = [] | |
for i in range(len(batch_sev_score)): | |
if batch_sev_score[i] == 0: | |
batch_sev_class.append("None") | |
elif batch_sev_score[i] < 4: | |
batch_sev_class.append("Low") | |
elif batch_sev_score[i] < 7: | |
batch_sev_class.append("Medium") | |
elif batch_sev_score[i] < 9: | |
batch_sev_class.append("High") | |
else: | |
batch_sev_class.append("Critical") | |
return {"batch_sev_score": batch_sev_score, "batch_sev_class": batch_sev_class} | |
def predict(code: list): | |
vul_preds = predict_vul_lines(code) | |
cwe_preds = predict_cweid(code) | |
sev_preds = predict_sev(code) | |
if __name__ == "__main__": | |
import pandas as pd | |
df = pd.read_csv("./data/processed_test.csv") | |
funcs = df["func_before"].tolist() | |
for code in funcs: | |
out = predict_vul_lines([code]) | |
print(out["batch_func_pred"][0]) |