SimranShaikh's picture
commit
bdbb08b verified
"""
Deterministic graders for each task.
All graders return a score in [0.0, 1.0] and a feedback message.
"""
from typing import List, Tuple
from environment.models import CodeReviewAction, Issue
# ─────────────────────────────────────────────────────────────
# Helpers
# ─────────────────────────────────────────────────────────────
def _keyword_hit(text: str, keywords: List[str]) -> bool:
"""Case-insensitive check β€” does `text` contain any keyword?"""
text_lower = text.lower()
return any(kw.lower() in text_lower for kw in keywords)
def _keyword_score(text: str, keywords: List[str]) -> float:
"""Fraction of keywords found in text (0.0 – 1.0)."""
if not keywords:
return 0.0
hits = sum(1 for kw in keywords if kw.lower() in text.lower())
return hits / len(keywords)
def _issue_text(issues: List[Issue]) -> str:
"""Concatenate all issue fields into a single string for matching."""
parts = []
for issue in issues:
parts.append(issue.issue_type)
parts.append(issue.description)
if issue.line_number is not None:
parts.append(str(issue.line_number))
return " ".join(parts).lower()
# ─────────────────────────────────────────────────────────────
# Easy grader: syntax error detection
# ─────────────────────────────────────────────────────────────
def grade_easy(action: CodeReviewAction, ground_truth: dict) -> Tuple[float, str]:
"""
Rubric (total 1.0):
0.35 β€” identified issue_type == "syntax_error"
0.35 β€” description mentions the relevant keywords (colon / if / syntax)
0.30 β€” suggested_fix contains the corrected line
"""
score = 0.0
feedback_parts = []
issue_types = [i.issue_type.lower() for i in action.identified_issues]
all_text = _issue_text(action.identified_issues) + " " + (action.explanation or "")
# 1) Issue type check
if "syntax_error" in issue_types:
score += 0.35
feedback_parts.append("βœ… Correctly identified as a syntax error (+0.35)")
else:
feedback_parts.append(
f"❌ Expected issue_type='syntax_error', got {issue_types} (+0.00)"
)
# 2) Description keyword check
kw_score = _keyword_score(all_text, ground_truth["keywords"])
desc_points = round(0.35 * min(kw_score * 2, 1.0), 3)
score += desc_points
feedback_parts.append(
f"{'βœ…' if desc_points > 0.1 else '❌'} Description accuracy: "
f"{desc_points:.2f}/0.35 (keyword match {kw_score:.0%})"
)
# 3) Fix quality check
fix = action.suggested_fix or ""
if _keyword_hit(fix, ground_truth["fix_keywords"]):
score += 0.30
feedback_parts.append("βœ… Suggested fix contains the correct patch (+0.30)")
else:
feedback_parts.append("❌ Suggested fix missing or incorrect (+0.00)")
score = round(min(score, 1.0), 4)
return score, "\n".join(feedback_parts)
# ─────────────────────────────────────────────────────────────
# Medium grader: logic bug detection
# ─────────────────────────────────────────────────────────────
def _run_is_palindrome(code: str) -> List[Tuple[str, bool, bool]]:
"""
Execute the patched `is_palindrome` function in a subprocess-safe sandbox.
Returns list of (input, expected, actual).
"""
import subprocess, sys, json, textwrap
test_driver = textwrap.dedent(f"""
{code}
import json, sys
cases = [
("racecar", True),
("hello", False),
("amanaplanacanalpanama", True),
("abba", True),
("abc", False),
]
results = []
for inp, exp in cases:
try:
got = is_palindrome(inp)
results.append([inp, exp, bool(got)])
except Exception as e:
results.append([inp, exp, None])
print(json.dumps(results))
""")
try:
out = subprocess.run(
[sys.executable, "-c", test_driver],
capture_output=True, text=True, timeout=5
)
if out.returncode != 0:
return []
return [tuple(r) for r in json.loads(out.stdout.strip())]
except Exception:
return []
def grade_medium(action: CodeReviewAction, ground_truth: dict) -> Tuple[float, str]:
"""
Rubric (total 1.0):
0.25 β€” identified issue_type == "logic_bug"
0.25 β€” description mentions index / off-by-one keywords
0.50 β€” suggested fix passes all 5 test cases (0.10 each)
"""
score = 0.0
feedback_parts = []
issue_types = [i.issue_type.lower() for i in action.identified_issues]
all_text = _issue_text(action.identified_issues) + " " + (action.explanation or "")
# 1) Issue type
if "logic_bug" in issue_types:
score += 0.25
feedback_parts.append("βœ… Correctly identified as a logic bug (+0.25)")
else:
feedback_parts.append(f"❌ Expected 'logic_bug', got {issue_types} (+0.00)")
# 2) Description accuracy
kw_score = _keyword_score(all_text, ground_truth["keywords"])
desc_pts = round(0.25 * min(kw_score * 2.5, 1.0), 3)
score += desc_pts
feedback_parts.append(
f"{'βœ…' if desc_pts > 0.08 else '❌'} Description accuracy: "
f"{desc_pts:.2f}/0.25 (keyword match {kw_score:.0%})"
)
# 3) Fix execution test
fix_code = action.suggested_fix or ""
if fix_code.strip():
results = _run_is_palindrome(fix_code)
if results:
passed = sum(1 for inp, exp, got in results if got == exp)
pts = round(0.50 * (passed / len(results)), 3)
score += pts
feedback_parts.append(
f"{'βœ…' if passed == len(results) else '⚠️'} Fix passed "
f"{passed}/{len(results)} test cases β†’ +{pts:.2f}/0.50"
)
for inp, exp, got in results:
status = "βœ…" if got == exp else "❌"
feedback_parts.append(f" {status} is_palindrome({inp!r}) β†’ {got} (expected {exp})")
else:
feedback_parts.append("❌ Fix code could not be executed (+0.00)")
else:
feedback_parts.append("❌ No suggested fix provided (+0.00)")
score = round(min(score, 1.0), 4)
return score, "\n".join(feedback_parts)
# ─────────────────────────────────────────────────────────────
# Hard grader: security vulnerability detection
# ─────────────────────────────────────────────────────────────
def grade_hard(action: CodeReviewAction, ground_truth: dict) -> Tuple[float, str]:
"""
Rubric (total 1.0) β€” 3 vulnerabilities, each worth ~0.33:
Per vulnerability:
0.15 β€” identified as security_vulnerability
0.10 β€” description mentions relevant keywords
0.08 β€” fix mentions remediation keywords
Bonus 0.05 for finding all 3 and providing a complete fixed file.
"""
vulns = ground_truth["vulnerabilities"]
per_vuln = 1.0 / len(vulns)
all_issue_text = _issue_text(action.identified_issues) + " " + (action.explanation or "")
fix_text = (action.suggested_fix or "") + " " + (action.explanation or "")
total_score = 0.0
feedback_parts = []
found_count = 0
for vuln in vulns:
v_score = 0.0
v_name = vuln["name"]
feedback_parts.append(f"\nπŸ” Checking: {v_name}")
# a) issue_type == security_vulnerability
sec_issues = [i for i in action.identified_issues if "security" in i.issue_type.lower()]
if sec_issues:
v_score += per_vuln * 0.45
feedback_parts.append(f" βœ… Flagged as security vulnerability (+{per_vuln*0.45:.3f})")
else:
feedback_parts.append(f" ❌ Not flagged as security vulnerability (+0.00)")
# b) description keyword match
kw_hit = _keyword_hit(all_issue_text, vuln["keywords"])
if kw_hit:
kw_score_val = _keyword_score(all_issue_text, vuln["keywords"])
pts = round(per_vuln * 0.30 * min(kw_score_val * 3, 1.0), 4)
v_score += pts
feedback_parts.append(f" βœ… Identified '{v_name}' in description (+{pts:.3f})")
found_count += 1
else:
feedback_parts.append(f" ❌ '{v_name}' not mentioned in description (+0.00)")
# c) fix keyword match
fix_hit = _keyword_hit(fix_text, vuln["fix_keywords"])
if fix_hit:
v_score += per_vuln * 0.25
feedback_parts.append(f" βœ… Fix addresses '{v_name}' (+{per_vuln*0.25:.3f})")
else:
feedback_parts.append(f" ❌ Fix doesn't address '{v_name}' (+0.00)")
total_score += v_score
# Bonus: found all 3
if found_count == len(vulns):
total_score = min(total_score + 0.05, 1.0)
feedback_parts.append("\n🎯 Bonus: All 3 vulnerabilities identified! (+0.05)")
total_score = round(min(total_score, 1.0), 4)
return total_score, "\n".join(feedback_parts)
# ─────────────────────────────────────────────────────────────
# Dispatcher
# ─────────────────────────────────────────────────────────────
def grade(task_id: str, action: CodeReviewAction, ground_truth: dict) -> Tuple[float, str]:
if task_id == "easy_syntax":
return grade_easy(action, ground_truth)
elif task_id == "medium_logic":
return grade_medium(action, ground_truth)
elif task_id == "hard_security":
return grade_hard(action, ground_truth)
else:
raise ValueError(f"No grader for task: {task_id}")