|
import os |
|
import joblib |
|
import torch |
|
import numpy as np |
|
from transformers import DistilBertTokenizerFast, DistilBertForSequenceClassification |
|
import torch.nn.functional as F |
|
|
|
class EndpointHandler: |
|
def __init__(self, model_dir): |
|
self.model = DistilBertForSequenceClassification.from_pretrained(model_dir) |
|
self.tokenizer = DistilBertTokenizerFast.from_pretrained(model_dir) |
|
self.label_mapping = joblib.load(os.path.join(model_dir, "label_mapping.joblib")) |
|
self.labels = {v: k for k, v in self.label_mapping.items()} |
|
|
|
def __call__(self, inputs): |
|
if isinstance(inputs, dict) and 'inputs' in inputs: |
|
return self.predict(inputs['inputs']) |
|
return self.predict(inputs) |
|
|
|
def predict(self, text): |
|
|
|
encoded_input = self.tokenizer(text, return_tensors='pt', truncation=True, max_length=512) |
|
|
|
|
|
with torch.no_grad(): |
|
outputs = self.model(**encoded_input) |
|
logits = outputs.logits |
|
|
|
|
|
probabilities = F.softmax(logits, dim=-1).squeeze().numpy() |
|
|
|
|
|
predicted_class_idx = np.argmax(probabilities) |
|
predicted_label = self.labels[predicted_class_idx] |
|
confidence = probabilities[predicted_class_idx] |
|
|
|
|
|
entropy = -np.sum(probabilities * np.log(probabilities + 1e-9)) |
|
|
|
|
|
adjusted_confidence = confidence * (1 - entropy/np.log(len(probabilities))) |
|
|
|
|
|
injection_keywords = ['ignore', 'previous', 'instructions', 'don\'t', 'matter'] |
|
jailbreak_keywords = ['bypass', 'restrictions', 'override', 'security'] |
|
|
|
injection_score = sum(keyword in text.lower() for keyword in injection_keywords) / len(injection_keywords) |
|
jailbreak_score = sum(keyword in text.lower() for keyword in jailbreak_keywords) / len(jailbreak_keywords) |
|
|
|
if predicted_label in ['INJECTION', 'JAILBREAK']: |
|
if injection_score > jailbreak_score: |
|
predicted_label = 'INJECTION' |
|
elif jailbreak_score > injection_score: |
|
predicted_label = 'JAILBREAK' |
|
|
|
adjusted_confidence = max(adjusted_confidence, injection_score, jailbreak_score) |
|
|
|
return { |
|
"label": predicted_label, |
|
"score": float(adjusted_confidence), |
|
"raw_scores": {label: float(prob) for label, prob in zip(self.labels.values(), probabilities)} |
|
} |
|
|
|
def get_pipeline(): |
|
return EndpointHandler |