Spaces:
Sleeping
Sleeping
| """Typed action tool dispatcher for the generated app sandbox.""" | |
| from __future__ import annotations | |
| import json | |
| from dataclasses import dataclass | |
| try: | |
| from ..models import CyberSecurityOWASPAction, CyberSecurityOWASPState | |
| from .app_sandbox import AppSandbox | |
| except ImportError: # pragma: no cover | |
| from models import CyberSecurityOWASPAction, CyberSecurityOWASPState | |
| from server.app_sandbox import AppSandbox | |
| class ToolResult: | |
| message: str | |
| visible_test_result: str | None = None | |
| class ActionTools: | |
| """Executes phase-gated, safe tools against one episode state.""" | |
| def __init__( | |
| self, | |
| state: CyberSecurityOWASPState, | |
| visible_policy_hint: dict, | |
| workspace_summary: dict, | |
| ): | |
| self.state = state | |
| self.visible_policy_hint = visible_policy_hint | |
| self.workspace_summary = workspace_summary | |
| self.sandbox = AppSandbox(state) | |
| def execute(self, action: CyberSecurityOWASPAction) -> ToolResult: | |
| args = action.arguments or {} | |
| if action.tool_name == "noop": | |
| return ToolResult("No operation.") | |
| if action.tool_name == "inspect_policy_graph": | |
| return ToolResult(json.dumps(self.visible_policy_hint, indent=2, sort_keys=True)) | |
| if action.tool_name == "list_routes": | |
| return ToolResult(json.dumps(self.workspace_summary["routes"], indent=2)) | |
| if action.tool_name == "read_openapi": | |
| return ToolResult(self.sandbox.read_openapi()) | |
| if action.tool_name == "read_file": | |
| return ToolResult(self.sandbox.read_file(str(args.get("path", "")))) | |
| if action.tool_name == "search_code": | |
| return ToolResult(self.sandbox.search_code(str(args.get("query", "")))) | |
| if action.tool_name == "send_local_request": | |
| response = self.sandbox.send_local_request( | |
| str(args.get("method", "GET")), | |
| str(args.get("path", "")), | |
| args.get("user_id"), | |
| ) | |
| return ToolResult(json.dumps(response, indent=2, sort_keys=True)) | |
| if action.tool_name == "compare_identities": | |
| response = self.sandbox.compare_identities( | |
| str(args.get("method", "GET")), | |
| str(args.get("path", "")), | |
| str(args.get("first_user_id", "")), | |
| str(args.get("second_user_id", "")), | |
| ) | |
| return ToolResult(json.dumps(response, indent=2, sort_keys=True)) | |
| if action.tool_name == "patch_file": | |
| result = self.sandbox.patch_file( | |
| str(args.get("path", "")), | |
| content=str(args["content"]) if "content" in args else None, | |
| diff=str(args.get("diff", "")) if "content" not in args else None, | |
| ) | |
| changed = "no diff" if not result["diff"].strip() else "diff recorded" | |
| return ToolResult(f"Patched {result['path']} ({changed}).") | |
| raise ValueError(f"Unhandled tool {action.tool_name}") | |