AryanPrakhar's picture
Add inference.py
44c3a8f verified
"""
Length-aware gating ensemble: CodeBERTfinetune + XGBoost predictor
"""
import torch
import joblib
import numpy as np
import re
import warnings
from transformers import AutoTokenizer, AutoModelForSequenceClassification
from sklearn.feature_extraction.text import TfidfVectorizer
from scipy.sparse import hstack
from pathlib import Path
warnings.filterwarnings("ignore")
LABELS = {'Functional': 0, 'Non-Paradigm': 1, 'Oop': 2, 'Procedural': 3}
LABEL_TO_NAME = {v: k for k, v in LABELS.items()}
class FeatureExtractor:
"""Same feature extractor used during training"""
def __init__(self):
self.oop_kw = ['class', 'object', 'this', 'self', 'extends', 'implements', 'interface',
'public', 'private', 'protected', 'static', 'virtual', 'override']
self.fp_kw = ['map', 'filter', 'reduce', 'fold', 'lambda', 'closure', '=>',
'monad', 'functor', 'pure', 'immutable', 'const', 'let']
self.proc_kw = ['void', 'int', 'char', 'float', 'struct', 'malloc', 'free',
'pointer', 'goto', 'scanf', 'printf']
def extract(self, text):
t = text.lower()
return {
'oop_score': sum(t.count(k) for k in self.oop_kw),
'fp_score': sum(t.count(k) for k in self.fp_kw),
'proc_score': sum(t.count(k) for k in self.proc_kw),
'length': len(text),
'num_lines': text.count('\n') + 1,
'has_class': 1 if re.search(r'\bclass\s+\w+', t) else 0,
'has_lambda': 1 if 'lambda' in t or '=>' in text else 0,
'num_dots': text.count('.'),
'num_arrows': text.count('->') + text.count('=>'),
'num_braces': text.count('{') + text.count('}')
}
class EnsemblePredictor:
def __init__(self, codebert_path, xgb_model_path, tfidf_path=None):
"""
Initialize the ensemble predictor.
Args:
codebert_path: Path to CodeBERT model directory
xgb_model_path: Path to saved XGBoost model (REQUIRED)
tfidf_path: Path to saved TF-IDF vectorizer (REQUIRED for XGBoost features)
"""
self.feature_extractor = FeatureExtractor()
self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {self.device}")
print("Loading CodeBERT model...")
self.tokenizer = AutoTokenizer.from_pretrained(codebert_path)
self.codebert = AutoModelForSequenceClassification.from_pretrained(codebert_path)
self.codebert.eval()
self.codebert.to(self.device)
print("CodeBERT fine-tuned model loaded successfully\n")
if tfidf_path is None:
tfidf_path = Path(__file__).parent / "tfidf_vectorizer.pkl"
tfidf_path = Path(tfidf_path)
if not tfidf_path.exists():
raise FileNotFoundError(f"TF-IDF vectorizer NOT FOUND: {tfidf_path}\n")
print(f"Loading TF-IDF vectorizer from {tfidf_path}...")
self.tfidf = joblib.load(str(tfidf_path))
print("TF-IDF vectorizer loaded successfully\n")
# Load XGBoost
xgb_path = Path(xgb_model_path)
if not xgb_path.exists():
raise FileNotFoundError(f"XGBoost model NOT FOUND: {xgb_model_path}")
print(f"Loading XGBoost model from {xgb_model_path}...")
try:
self.xgb_model = joblib.load(str(xgb_path))
print("XGBoost model loaded successfully\n")
except Exception as e:
raise RuntimeError(f"Failed to load XGBoost model: {e}")
def get_codebert_proba(self, text):
"""Get probability predictions from CodeBERT"""
with torch.no_grad():
inputs = self.tokenizer(
text,
return_tensors="pt",
padding=True,
truncation=True,
max_length=256
)
inputs = {k: v.to(self.device) for k, v in inputs.items()}
outputs = self.codebert(**inputs)
logits = outputs.logits
proba = torch.softmax(logits, dim=-1).cpu().numpy()[0]
return proba
def get_xgb_proba(self, text):
"""Get probability predictions from XGBoost."""
features = self._extract_features(text)
proba = self.xgb_model.predict_proba(features)[0]
return proba
def _extract_features(self, text):
"""
Extract features using same pipeline as training:
TF-IDF (1000 features) + Handcrafted Features (10 features) = 1010 total
NO CodeBERT embeddings
"""
tfidf_vec = self.tfidf.transform([text]) # Returns sparse matrix [1, 1000]
handcrafted_feats = self.feature_extractor.extract(text)
handcrafted_vec = np.array(list(handcrafted_feats.values()), dtype=np.float32).reshape(1, -1)
# Stack: TF-IDF + handcrafted
combined = hstack([tfidf_vec, handcrafted_vec])
return combined
def predict(self, text):
"""
Predict using length-aware gating ensemble.
Args:
text: Input code/text string
Returns:
Dictionary with probabilities, ensembled prediction, and paradigm label
"""
tokens = self.tokenizer.tokenize(text)
length = len(tokens)
# Get CodeBERT probabilities
codebert_probas = self.get_codebert_proba(text)
cb_pred_class = np.argmax(codebert_probas)
# Get XGBoost probabilities
xgb_probas = self.get_xgb_proba(text)
xgb_pred_class = np.argmax(xgb_probas)
# Length-aware gating
if length < 60:
weight_info = "Short (CodeBERT 80% + XGB 20%)"
cb_weight = 0.8
xgb_weight = 0.2
elif length > 150:
weight_info = "Long (CodeBERT 50% + XGB 50%)"
cb_weight = 0.5
xgb_weight = 0.5
else:
weight_info = "Medium (CodeBERT 65% + XGB 35%)"
cb_weight = 0.65
xgb_weight = 0.35
# Weighted average of two probability distributions
ensemble_probas = cb_weight * codebert_probas + xgb_weight * xgb_probas
ensemble_probas = ensemble_probas / ensemble_probas.sum() # Normalize
predicted_class = np.argmax(ensemble_probas)
predicted_label = LABEL_TO_NAME[predicted_class]
# Debug prints
print("\n" + "="*60)
print("DEBUG: Model Outputs")
print("="*60)
print(f"Token length: {length}")
print(f"Weights: CB={cb_weight:.2f}, XGB={xgb_weight:.2f}\n")
print("CodeBERT class probabilities:")
for i, prob in enumerate(codebert_probas):
print(f" {LABEL_TO_NAME[i]:15s}: {prob:.4f}")
print(f" → Predicted: {LABEL_TO_NAME[cb_pred_class]}\n")
print("XGBoost class probabilities:")
for i, prob in enumerate(xgb_probas):
print(f" {LABEL_TO_NAME[i]:15s}: {prob:.4f}")
print(f" → Predicted: {LABEL_TO_NAME[xgb_pred_class]}\n")
print("Ensemble class probabilities:")
for i, prob in enumerate(ensemble_probas):
marker = " ← FINAL" if i == predicted_class else ""
print(f" {LABEL_TO_NAME[i]:15s}: {prob:.4f}{marker}")
print("="*60 + "\n")
return {
"length": length,
"weight_info": weight_info,
"codebert_class_probas": {LABEL_TO_NAME[i]: round(float(codebert_probas[i]), 4) for i in range(len(codebert_probas))},
"codebert_pred_class": LABEL_TO_NAME[cb_pred_class],
"xgb_class_probas": {LABEL_TO_NAME[i]: round(float(xgb_probas[i]), 4) for i in range(len(xgb_probas))},
"xgb_pred_class": LABEL_TO_NAME[xgb_pred_class],
"ensemble_class_probas": {LABEL_TO_NAME[i]: round(float(ensemble_probas[i]), 4) for i in range(len(ensemble_probas))},
"predicted_class": predicted_class,
"predicted_label": predicted_label,
"confidence": round(float(ensemble_probas[predicted_class]), 4)
}
# ============================================================================
# USAGE EXAMPLE
# ============================================================================
if __name__ == "__main__":
# ===== INPUT: Modify this variable for querying =====
text_input = """Increment the value of a pointer when sent as a parameter I am stuck in the following pointer problem: Say you have a function: void Function (unsigned char *ubPointer) { ubPointer++; } int main (void) { unsigned char *PointerX; Function( PointerX ); } What I want is that the ++ is reflected in PointerX, without declaring it as a global variable. Thank you very much.
"""
# =======================================
# Paths
codebert_path = Path(__file__).parent / "codebert_model"
xgb_model_path = Path(__file__).parent / "xgboost_model.pkl"
tfidf_path = Path(__file__).parent / "tfidf_vectorizer.pkl"
try:
print("="*60)
print("ENSEMBLE PREDICTOR - CodeBERT + XGBoost")
print("(Training Pipeline: TF-IDF + Handcrafted Features)")
print("="*60 + "\n")
predictor = EnsemblePredictor(
codebert_path=str(codebert_path),
xgb_model_path=str(xgb_model_path),
tfidf_path=str(tfidf_path)
)
print(f"Input text: {repr(text_input)}\n")
result = predictor.predict(text_input)
print("="*60)
print("FINAL RESULTS")
print("="*60)
print(f"Weighting: {result['weight_info']}")
print(f"\nFINAL PREDICTION: {result['predicted_label'].upper()}")
print(f"Confidence: {result['confidence']:.4f}")
print("="*60)
except Exception as e:
print(f"Error: {e}")
import traceback
traceback.print_exc()