|
|
|
|
|
|
|
|
|
|
|
|
|
|
import os |
|
|
import re |
|
|
import torch |
|
|
from transformers import AutoTokenizer, AutoModelForCausalLM, BitsAndBytesConfig |
|
|
from peft import PeftModel |
|
|
from human_eval.data import write_jsonl, read_problems |
|
|
from human_eval.evaluation import evaluate_functional_correctness |
|
|
import tempfile |
|
|
import json |
|
|
from tqdm import tqdm |
|
|
|
|
|
print("="*60) |
|
|
print("EVALUATION v2: Base vs Fine-tuned on HumanEval") |
|
|
print("Using correct Instruct format for fine-tuned model") |
|
|
print("="*60) |
|
|
|
|
|
|
|
|
BASE_MODEL = "mistralai/Devstral-Small-2505" |
|
|
FINETUNED_MODEL = "stmasson/alizee-coder-devstral-1-small" |
|
|
NUM_SAMPLES = 1 |
|
|
TEMPERATURE = 0.1 |
|
|
MAX_NEW_TOKENS = 1024 |
|
|
|
|
|
|
|
|
bnb_config = BitsAndBytesConfig( |
|
|
load_in_4bit=True, |
|
|
bnb_4bit_quant_type="nf4", |
|
|
bnb_4bit_compute_dtype=torch.bfloat16, |
|
|
bnb_4bit_use_double_quant=True, |
|
|
) |
|
|
|
|
|
def load_model(model_name, adapter_name=None): |
|
|
"""Load model with optional LoRA adapter""" |
|
|
print(f"\nLoading model: {model_name}") |
|
|
if adapter_name: |
|
|
print(f"With adapter: {adapter_name}") |
|
|
|
|
|
tokenizer = AutoTokenizer.from_pretrained(model_name, trust_remote_code=True) |
|
|
if tokenizer.pad_token is None: |
|
|
tokenizer.pad_token = tokenizer.eos_token |
|
|
|
|
|
model = AutoModelForCausalLM.from_pretrained( |
|
|
model_name, |
|
|
quantization_config=bnb_config, |
|
|
device_map="auto", |
|
|
trust_remote_code=True, |
|
|
torch_dtype=torch.bfloat16, |
|
|
) |
|
|
|
|
|
if adapter_name: |
|
|
model = PeftModel.from_pretrained(model, adapter_name) |
|
|
model = model.merge_and_unload() |
|
|
|
|
|
model.eval() |
|
|
return model, tokenizer |
|
|
|
|
|
def extract_python_code(text): |
|
|
"""Extract Python code from model output""" |
|
|
|
|
|
pattern = r'```python\s*(.*?)\s*```' |
|
|
matches = re.findall(pattern, text, re.DOTALL) |
|
|
if matches: |
|
|
return matches[-1].strip() |
|
|
|
|
|
|
|
|
pattern = r'```\s*(.*?)\s*```' |
|
|
matches = re.findall(pattern, text, re.DOTALL) |
|
|
if matches: |
|
|
return matches[-1].strip() |
|
|
|
|
|
|
|
|
markers = ["**Solution:**", "Solution:", "```"] |
|
|
for marker in markers: |
|
|
if marker in text: |
|
|
code_part = text.split(marker)[-1] |
|
|
|
|
|
code_part = code_part.replace("```", "").strip() |
|
|
if code_part: |
|
|
return code_part |
|
|
|
|
|
|
|
|
return text.strip() |
|
|
|
|
|
def generate_completion_base(model, tokenizer, prompt): |
|
|
"""Generate code completion for BASE model (direct completion)""" |
|
|
inputs = tokenizer(prompt, return_tensors="pt").to(model.device) |
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = model.generate( |
|
|
**inputs, |
|
|
max_new_tokens=512, |
|
|
temperature=TEMPERATURE, |
|
|
do_sample=True if TEMPERATURE > 0 else False, |
|
|
pad_token_id=tokenizer.pad_token_id, |
|
|
eos_token_id=tokenizer.eos_token_id, |
|
|
) |
|
|
|
|
|
completion = tokenizer.decode(outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True) |
|
|
|
|
|
|
|
|
stop_tokens = ["\ndef ", "\nclass ", "\nif __name__", "\n\n\n"] |
|
|
for stop in stop_tokens: |
|
|
if stop in completion: |
|
|
completion = completion[:completion.index(stop)] |
|
|
|
|
|
return completion |
|
|
|
|
|
def generate_completion_finetuned(model, tokenizer, prompt, problem_text): |
|
|
"""Generate code completion for FINE-TUNED model (Instruct format)""" |
|
|
|
|
|
instruct_prompt = f"<s>[INST] Solve this programming problem with detailed reasoning:\n\n{problem_text}\n\nComplete the following function:\n{prompt}\n[/INST]" |
|
|
|
|
|
inputs = tokenizer(instruct_prompt, return_tensors="pt").to(model.device) |
|
|
|
|
|
with torch.no_grad(): |
|
|
outputs = model.generate( |
|
|
**inputs, |
|
|
max_new_tokens=MAX_NEW_TOKENS, |
|
|
temperature=TEMPERATURE, |
|
|
do_sample=True if TEMPERATURE > 0 else False, |
|
|
pad_token_id=tokenizer.pad_token_id, |
|
|
eos_token_id=tokenizer.eos_token_id, |
|
|
) |
|
|
|
|
|
full_response = tokenizer.decode(outputs[0][inputs['input_ids'].shape[1]:], skip_special_tokens=True) |
|
|
|
|
|
|
|
|
code = extract_python_code(full_response) |
|
|
|
|
|
|
|
|
if "def " in code: |
|
|
|
|
|
lines = code.split('\n') |
|
|
result_lines = [] |
|
|
in_function = False |
|
|
for line in lines: |
|
|
if line.strip().startswith("def "): |
|
|
in_function = True |
|
|
continue |
|
|
if in_function: |
|
|
result_lines.append(line) |
|
|
if result_lines: |
|
|
return '\n'.join(result_lines) |
|
|
|
|
|
return code |
|
|
|
|
|
def evaluate_model(model, tokenizer, problems, model_name, is_finetuned=False): |
|
|
"""Evaluate model on HumanEval""" |
|
|
print(f"\nEvaluating {model_name}...") |
|
|
samples = [] |
|
|
|
|
|
for task_id, problem in tqdm(problems.items(), desc=f"Generating ({model_name})"): |
|
|
prompt = problem["prompt"] |
|
|
|
|
|
for _ in range(NUM_SAMPLES): |
|
|
if is_finetuned: |
|
|
|
|
|
completion = generate_completion_finetuned(model, tokenizer, prompt, problem.get("prompt", "")) |
|
|
else: |
|
|
|
|
|
completion = generate_completion_base(model, tokenizer, prompt) |
|
|
|
|
|
samples.append({ |
|
|
"task_id": task_id, |
|
|
"completion": completion |
|
|
}) |
|
|
|
|
|
|
|
|
with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as f: |
|
|
sample_file = f.name |
|
|
write_jsonl(sample_file, samples) |
|
|
|
|
|
results = evaluate_functional_correctness(sample_file, k=[1]) |
|
|
os.unlink(sample_file) |
|
|
|
|
|
return results |
|
|
|
|
|
def main(): |
|
|
|
|
|
print("\nLoading HumanEval problems...") |
|
|
problems = read_problems() |
|
|
print(f"Total problems: {len(problems)}") |
|
|
|
|
|
results = {} |
|
|
|
|
|
|
|
|
print("\n" + "="*60) |
|
|
print("EVALUATING BASE MODEL (direct completion)") |
|
|
print("="*60) |
|
|
base_model, base_tokenizer = load_model(BASE_MODEL) |
|
|
results["base"] = evaluate_model(base_model, base_tokenizer, problems, "Devstral-Small (Base)", is_finetuned=False) |
|
|
print(f"\nBase Model Results: {results['base']}") |
|
|
|
|
|
|
|
|
del base_model |
|
|
torch.cuda.empty_cache() |
|
|
|
|
|
|
|
|
print("\n" + "="*60) |
|
|
print("EVALUATING FINE-TUNED MODEL (Instruct format)") |
|
|
print("="*60) |
|
|
ft_model, ft_tokenizer = load_model(BASE_MODEL, FINETUNED_MODEL) |
|
|
results["finetuned"] = evaluate_model(ft_model, ft_tokenizer, problems, "Alizee-Coder (Fine-tuned)", is_finetuned=True) |
|
|
print(f"\nFine-tuned Model Results: {results['finetuned']}") |
|
|
|
|
|
|
|
|
print("\n" + "="*60) |
|
|
print("COMPARISON SUMMARY (v2 - Correct Prompt Format)") |
|
|
print("="*60) |
|
|
print(f"\n{'Model':<45} {'pass@1':>10}") |
|
|
print("-"*57) |
|
|
print(f"{'Devstral-Small-2505 (Base)':<45} {results['base']['pass@1']*100:>9.2f}%") |
|
|
print(f"{'Alizee-Coder-Devstral (Fine-tuned+Instruct)':<45} {results['finetuned']['pass@1']*100:>9.2f}%") |
|
|
|
|
|
improvement = (results['finetuned']['pass@1'] - results['base']['pass@1']) * 100 |
|
|
sign = "+" if improvement >= 0 else "" |
|
|
print(f"\n{'Improvement:':<45} {sign}{improvement:>9.2f}%") |
|
|
|
|
|
|
|
|
with open("eval_results_v2.json", "w") as f: |
|
|
json.dump({ |
|
|
"base_pass@1": float(results['base']['pass@1']), |
|
|
"finetuned_pass@1": float(results['finetuned']['pass@1']), |
|
|
"improvement": float(improvement) |
|
|
}, f, indent=2) |
|
|
print("\nResults saved to eval_results_v2.json") |
|
|
|
|
|
if __name__ == "__main__": |
|
|
main() |
|
|
|