Spaces:
Sleeping
Sleeping
| """ | |
| Task 3: Identify the function that violates a specified property. | |
| """ | |
| import json | |
| from typing import Any, Dict, Tuple | |
| from data.data_loader import ( | |
| get_function_by_name, | |
| list_function_names, | |
| list_state_variable_names, | |
| get_state_variable_by_name | |
| ) | |
| from env.schemas import Reward, ActionType | |
| def list_functions(ctx: Any, qkey: str, params: Dict) -> Tuple[str, Reward]: | |
| """Handle LIST_FUNCTIONS action.""" | |
| if ctx._is_repeated(qkey): | |
| return "Repeated query.", Reward(value=ActionType.REPEATED.cost, reason="Repeated query") | |
| names = list_function_names(ctx._contract) | |
| return ( | |
| f"Functions in {ctx._contract['contract_name']}: {', '.join(names)}", | |
| Reward(value=ActionType.LIST_FUNCTIONS.cost, reason="list_functions cost"), | |
| ) | |
| def get_function_metadata(ctx: Any, qkey: str, params: Dict) -> Tuple[str, Reward]: | |
| """Handle GET_FUNCTION_METADATA action.""" | |
| fn_name = params.get("function_name", "") | |
| if ctx._is_repeated(qkey): | |
| return "Repeated query.", Reward(value=ActionType.REPEATED.cost, reason="Repeated query") | |
| fn = get_function_by_name(ctx._contract, fn_name) | |
| if fn is None: | |
| return ( | |
| f"Function '{fn_name}' not found. " | |
| f"Available: {list_function_names(ctx._contract)}", | |
| Reward(value=ActionType.GET_FUNCTION_METADATA.cost, reason="Unknown function"), | |
| ) | |
| params_list = fn.get("parameters", []) | |
| modifiers = fn.get("modifiers", []) | |
| lines = [ | |
| f"Function : {fn.get('signature', fn_name)}", | |
| f"Visibility : {fn.get('visibility', 'unknown')}", | |
| f"Modifiers : {', '.join(modifiers) if modifiers else 'none'}", | |
| ] | |
| if params_list: | |
| lines.append("Parameters :") | |
| for p in params_list: | |
| lines.append(f" {p['type']} {p['name']} — {p.get('description','')}") | |
| else: | |
| lines.append("Parameters : none") | |
| lines.append(f"Returns : {fn.get('returns','') or 'void'}") | |
| lines.append(f"Summary : {fn.get('comment','')}") | |
| return "\n".join(lines), Reward(value=ActionType.GET_FUNCTION_METADATA.cost, reason="get_function_metadata cost") | |
| def get_function_code(ctx: Any, qkey: str, params: Dict) -> Tuple[str, Reward]: | |
| """Handle GET_FUNCTION_CODE action.""" | |
| fn_name = params.get("function_name", "") | |
| if ctx._is_repeated(qkey): | |
| return "Repeated query.", Reward(value=ActionType.REPEATED.cost, reason="Repeated query") | |
| fn = get_function_by_name(ctx._contract, fn_name) | |
| if fn is None: | |
| return ( | |
| f"Function '{fn_name}' not found. " | |
| f"Available: {list_function_names(ctx._contract)}", | |
| Reward(value=ActionType.GET_FUNCTION_CODE.cost, reason="Unknown function — extra penalty"), | |
| ) | |
| code = fn.get("code", "// no code available") | |
| return ( | |
| f"// {fn_name}\n{code}", | |
| Reward(value=ActionType.GET_FUNCTION_CODE.cost, reason="get_function_code cost"), | |
| ) | |
| def get_state_variable(ctx: Any, qkey: str, params: Dict) -> Tuple[str, Reward]: | |
| """Handle GET_STATE_VARIABLE action.""" | |
| var_name = params.get("variable_name", "") | |
| if ctx._is_repeated(qkey): | |
| return "Repeated query.", Reward(value=ActionType.REPEATED.cost, reason="Repeated query") | |
| if not var_name: | |
| names = list_state_variable_names(ctx._contract) | |
| return ( | |
| f"State variables: {', '.join(names)}", | |
| Reward(value=ActionType.GET_STATE_VARIABLE.cost, reason="Listed state variables"), | |
| ) | |
| sv = get_state_variable_by_name(ctx._contract, var_name) | |
| if sv is None: | |
| return ( | |
| f"Variable '{var_name}' not found.", | |
| Reward(value=ActionType.GET_STATE_VARIABLE.cost, reason="Unknown state variable"), | |
| ) | |
| return ( | |
| f"{sv['type']} {sv['visibility']} {sv['name']}: {sv.get('description','')}", | |
| Reward(value=ActionType.GET_STATE_VARIABLE.cost, reason="get_state_variable cost"), | |
| ) | |
| def get_call_graph(ctx: Any, qkey: str, params: Dict) -> Tuple[str, Reward]: | |
| """Handle GET_CALL_GRAPH action.""" | |
| if ctx._is_repeated(qkey): | |
| return "Repeated query.", Reward(value=ActionType.REPEATED.cost, reason="Repeated query") | |
| cg = ctx._contract.get("call_graph", {}) | |
| cg_str = "; ".join( | |
| f"{fn} → [{', '.join(callees)}]" for fn, callees in cg.items() | |
| ) | |
| return ( | |
| f"Call graph: {cg_str}", | |
| Reward(value=ActionType.GET_CALL_GRAPH.cost, reason="get_call_graph cost"), | |
| ) | |
| def get_property_specification(ctx: Any, qkey: str, params: Dict) -> Tuple[str, Reward]: | |
| """Handle GET_PROPERTY_SPECIFICATION action.""" | |
| if ctx._is_repeated(qkey): | |
| return "Repeated query.", Reward(value=ActionType.REPEATED.cost, reason="Repeated query") | |
| rule = ctx._target_fn.get("property_specification", {}) | |
| if not rule: | |
| rule = "No rule specification available for this property." | |
| rule_parsed = json.dumps(rule) if isinstance(rule, dict) else rule | |
| return ( | |
| f"Formal property:\n{rule_parsed}", | |
| Reward(value=ActionType.GET_PROPERTY_SPECIFICATION.cost, reason="get_property_specification cost"), | |
| ) | |
| def submit_function(ctx: Any, qkey: str, params: Dict) -> Tuple[str, Reward]: | |
| """Handle SUBMIT_FUNCTION action for Task 3. | |
| Expected params | |
| --------------- | |
| function_name : str – name of the function that violates the given property | |
| """ | |
| if ctx._done: | |
| return ( | |
| "Only ONE submission is allowed. Reset environment to start again.", | |
| Reward(value=ActionType.SUBMIT.cost, reason="Second submit_function attempt"), | |
| ) | |
| fn_name = params.get("function_name", "").strip() | |
| if not fn_name: | |
| return ( | |
| "submit_function requires 'function_name' in params.", | |
| Reward(value=ActionType.SUBMIT.cost, reason="Malformed submission"), | |
| ) | |
| ctx._done = True | |
| score = ctx._grader.grade(fn_name, ctx._step_count, ctx._cum_reward) | |
| return (f"Correct Answer: {ctx._grader.get_canonical_answer}"), Reward( | |
| value=score, | |
| reason=f"submit_function score={score:.1f}" | |
| ) | |
| def unknown_action(ctx: Any, qkey: str, params: Dict, action_type: str) -> Tuple[str, Reward]: | |
| """Fallback for unknown actions.""" | |
| return ( | |
| f"Unknown action '{action_type}'. Valid: {[a.value for a in ActionType]}", | |
| Reward(value=-0.10, reason="Unknown action"), | |
| ) |