Spaces:
Sleeping
Sleeping
| """ | |
| Deterministic graders for each task. | |
| All graders return a score in [0.0, 1.0] and a feedback message. | |
| """ | |
| from typing import List, Tuple | |
| from environment.models import CodeReviewAction, Issue | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Helpers | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _keyword_hit(text: str, keywords: List[str]) -> bool: | |
| """Case-insensitive check β does `text` contain any keyword?""" | |
| text_lower = text.lower() | |
| return any(kw.lower() in text_lower for kw in keywords) | |
| def _keyword_score(text: str, keywords: List[str]) -> float: | |
| """Fraction of keywords found in text (0.0 β 1.0).""" | |
| if not keywords: | |
| return 0.0 | |
| hits = sum(1 for kw in keywords if kw.lower() in text.lower()) | |
| return hits / len(keywords) | |
| def _issue_text(issues: List[Issue]) -> str: | |
| """Concatenate all issue fields into a single string for matching.""" | |
| parts = [] | |
| for issue in issues: | |
| parts.append(issue.issue_type) | |
| parts.append(issue.description) | |
| if issue.line_number is not None: | |
| parts.append(str(issue.line_number)) | |
| return " ".join(parts).lower() | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Easy grader: syntax error detection | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def grade_easy(action: CodeReviewAction, ground_truth: dict) -> Tuple[float, str]: | |
| """ | |
| Rubric (total 1.0): | |
| 0.35 β identified issue_type == "syntax_error" | |
| 0.35 β description mentions the relevant keywords (colon / if / syntax) | |
| 0.30 β suggested_fix contains the corrected line | |
| """ | |
| score = 0.0 | |
| feedback_parts = [] | |
| issue_types = [i.issue_type.lower() for i in action.identified_issues] | |
| all_text = _issue_text(action.identified_issues) + " " + (action.explanation or "") | |
| # 1) Issue type check | |
| if "syntax_error" in issue_types: | |
| score += 0.35 | |
| feedback_parts.append("β Correctly identified as a syntax error (+0.35)") | |
| else: | |
| feedback_parts.append( | |
| f"β Expected issue_type='syntax_error', got {issue_types} (+0.00)" | |
| ) | |
| # 2) Description keyword check | |
| kw_score = _keyword_score(all_text, ground_truth["keywords"]) | |
| desc_points = round(0.35 * min(kw_score * 2, 1.0), 3) | |
| score += desc_points | |
| feedback_parts.append( | |
| f"{'β ' if desc_points > 0.1 else 'β'} Description accuracy: " | |
| f"{desc_points:.2f}/0.35 (keyword match {kw_score:.0%})" | |
| ) | |
| # 3) Fix quality check | |
| fix = action.suggested_fix or "" | |
| if _keyword_hit(fix, ground_truth["fix_keywords"]): | |
| score += 0.30 | |
| feedback_parts.append("β Suggested fix contains the correct patch (+0.30)") | |
| else: | |
| feedback_parts.append("β Suggested fix missing or incorrect (+0.00)") | |
| score = round(min(score, 1.0), 4) | |
| return score, "\n".join(feedback_parts) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Medium grader: logic bug detection | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def _run_is_palindrome(code: str) -> List[Tuple[str, bool, bool]]: | |
| """ | |
| Execute the patched `is_palindrome` function in a subprocess-safe sandbox. | |
| Returns list of (input, expected, actual). | |
| """ | |
| import subprocess, sys, json, textwrap | |
| test_driver = textwrap.dedent(f""" | |
| {code} | |
| import json, sys | |
| cases = [ | |
| ("racecar", True), | |
| ("hello", False), | |
| ("amanaplanacanalpanama", True), | |
| ("abba", True), | |
| ("abc", False), | |
| ] | |
| results = [] | |
| for inp, exp in cases: | |
| try: | |
| got = is_palindrome(inp) | |
| results.append([inp, exp, bool(got)]) | |
| except Exception as e: | |
| results.append([inp, exp, None]) | |
| print(json.dumps(results)) | |
| """) | |
| try: | |
| out = subprocess.run( | |
| [sys.executable, "-c", test_driver], | |
| capture_output=True, text=True, timeout=5 | |
| ) | |
| if out.returncode != 0: | |
| return [] | |
| return [tuple(r) for r in json.loads(out.stdout.strip())] | |
| except Exception: | |
| return [] | |
| def grade_medium(action: CodeReviewAction, ground_truth: dict) -> Tuple[float, str]: | |
| """ | |
| Rubric (total 1.0): | |
| 0.25 β identified issue_type == "logic_bug" | |
| 0.25 β description mentions index / off-by-one keywords | |
| 0.50 β suggested fix passes all 5 test cases (0.10 each) | |
| """ | |
| score = 0.0 | |
| feedback_parts = [] | |
| issue_types = [i.issue_type.lower() for i in action.identified_issues] | |
| all_text = _issue_text(action.identified_issues) + " " + (action.explanation or "") | |
| # 1) Issue type | |
| if "logic_bug" in issue_types: | |
| score += 0.25 | |
| feedback_parts.append("β Correctly identified as a logic bug (+0.25)") | |
| else: | |
| feedback_parts.append(f"β Expected 'logic_bug', got {issue_types} (+0.00)") | |
| # 2) Description accuracy | |
| kw_score = _keyword_score(all_text, ground_truth["keywords"]) | |
| desc_pts = round(0.25 * min(kw_score * 2.5, 1.0), 3) | |
| score += desc_pts | |
| feedback_parts.append( | |
| f"{'β ' if desc_pts > 0.08 else 'β'} Description accuracy: " | |
| f"{desc_pts:.2f}/0.25 (keyword match {kw_score:.0%})" | |
| ) | |
| # 3) Fix execution test | |
| fix_code = action.suggested_fix or "" | |
| if fix_code.strip(): | |
| results = _run_is_palindrome(fix_code) | |
| if results: | |
| passed = sum(1 for inp, exp, got in results if got == exp) | |
| pts = round(0.50 * (passed / len(results)), 3) | |
| score += pts | |
| feedback_parts.append( | |
| f"{'β ' if passed == len(results) else 'β οΈ'} Fix passed " | |
| f"{passed}/{len(results)} test cases β +{pts:.2f}/0.50" | |
| ) | |
| for inp, exp, got in results: | |
| status = "β " if got == exp else "β" | |
| feedback_parts.append(f" {status} is_palindrome({inp!r}) β {got} (expected {exp})") | |
| else: | |
| feedback_parts.append("β Fix code could not be executed (+0.00)") | |
| else: | |
| feedback_parts.append("β No suggested fix provided (+0.00)") | |
| score = round(min(score, 1.0), 4) | |
| return score, "\n".join(feedback_parts) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Hard grader: security vulnerability detection | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def grade_hard(action: CodeReviewAction, ground_truth: dict) -> Tuple[float, str]: | |
| """ | |
| Rubric (total 1.0) β 3 vulnerabilities, each worth ~0.33: | |
| Per vulnerability: | |
| 0.15 β identified as security_vulnerability | |
| 0.10 β description mentions relevant keywords | |
| 0.08 β fix mentions remediation keywords | |
| Bonus 0.05 for finding all 3 and providing a complete fixed file. | |
| """ | |
| vulns = ground_truth["vulnerabilities"] | |
| per_vuln = 1.0 / len(vulns) | |
| all_issue_text = _issue_text(action.identified_issues) + " " + (action.explanation or "") | |
| fix_text = (action.suggested_fix or "") + " " + (action.explanation or "") | |
| total_score = 0.0 | |
| feedback_parts = [] | |
| found_count = 0 | |
| for vuln in vulns: | |
| v_score = 0.0 | |
| v_name = vuln["name"] | |
| feedback_parts.append(f"\nπ Checking: {v_name}") | |
| # a) issue_type == security_vulnerability | |
| sec_issues = [i for i in action.identified_issues if "security" in i.issue_type.lower()] | |
| if sec_issues: | |
| v_score += per_vuln * 0.45 | |
| feedback_parts.append(f" β Flagged as security vulnerability (+{per_vuln*0.45:.3f})") | |
| else: | |
| feedback_parts.append(f" β Not flagged as security vulnerability (+0.00)") | |
| # b) description keyword match | |
| kw_hit = _keyword_hit(all_issue_text, vuln["keywords"]) | |
| if kw_hit: | |
| kw_score_val = _keyword_score(all_issue_text, vuln["keywords"]) | |
| pts = round(per_vuln * 0.30 * min(kw_score_val * 3, 1.0), 4) | |
| v_score += pts | |
| feedback_parts.append(f" β Identified '{v_name}' in description (+{pts:.3f})") | |
| found_count += 1 | |
| else: | |
| feedback_parts.append(f" β '{v_name}' not mentioned in description (+0.00)") | |
| # c) fix keyword match | |
| fix_hit = _keyword_hit(fix_text, vuln["fix_keywords"]) | |
| if fix_hit: | |
| v_score += per_vuln * 0.25 | |
| feedback_parts.append(f" β Fix addresses '{v_name}' (+{per_vuln*0.25:.3f})") | |
| else: | |
| feedback_parts.append(f" β Fix doesn't address '{v_name}' (+0.00)") | |
| total_score += v_score | |
| # Bonus: found all 3 | |
| if found_count == len(vulns): | |
| total_score = min(total_score + 0.05, 1.0) | |
| feedback_parts.append("\nπ― Bonus: All 3 vulnerabilities identified! (+0.05)") | |
| total_score = round(min(total_score, 1.0), 4) | |
| return total_score, "\n".join(feedback_parts) | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Dispatcher | |
| # βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def grade(task_id: str, action: CodeReviewAction, ground_truth: dict) -> Tuple[float, str]: | |
| if task_id == "easy_syntax": | |
| return grade_easy(action, ground_truth) | |
| elif task_id == "medium_logic": | |
| return grade_medium(action, ground_truth) | |
| elif task_id == "hard_security": | |
| return grade_hard(action, ground_truth) | |
| else: | |
| raise ValueError(f"No grader for task: {task_id}") | |