Spaces:
Running
Running
feat(agent): add Claude Code-style agent, skills, slash-commands, hooks, todos, sandboxed workspace, and full-stack scaffolding
81aa0b5 verified | """File system tools: read, write, edit, list, glob, grep. | |
| All tools are sandboxed to a configurable workspace root (default: ./workspace). | |
| They return JSON-serializable dicts so they can be exposed via the API. | |
| """ | |
| from __future__ import annotations | |
| import fnmatch | |
| import os | |
| import re | |
| import shutil | |
| from pathlib import Path | |
| from typing import Any | |
| # βββ Workspace sandbox ββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| # Default workspace: ./workspace under the app root | |
| _DEFAULT_WORKSPACE = os.environ.get( | |
| "SONICODER_WORKSPACE", | |
| os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "workspace")), | |
| ) | |
| def get_workspace_root() -> str: | |
| """Return the absolute path of the agent's workspace root.""" | |
| root = _DEFAULT_WORKSPACE | |
| os.makedirs(root, exist_ok=True) | |
| return root | |
| def _resolve_safe(path: str) -> str: | |
| """Resolve a path safely within the workspace root. | |
| Raises ValueError if the resolved path escapes the workspace. | |
| """ | |
| root = get_workspace_root() | |
| if os.path.isabs(path): | |
| full = os.path.abspath(path) | |
| else: | |
| full = os.path.abspath(os.path.join(root, path)) | |
| # Ensure path is within the workspace | |
| if not (full == root or full.startswith(root + os.sep)): | |
| raise ValueError( | |
| f"Path '{path}' resolves outside the workspace root ({root}). " | |
| "Agent tools are sandboxed." | |
| ) | |
| return full | |
| # βββ read_file ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def read_file(path: str, offset: int = 0, limit: int | None = None) -> dict[str, Any]: | |
| """Read a text file from the workspace. | |
| Args: | |
| path: Relative path inside the workspace, or absolute within it. | |
| offset: 1-indexed line to start reading from. | |
| limit: Maximum number of lines to read. | |
| Returns: | |
| dict with: path, content, line_count, truncated | |
| """ | |
| try: | |
| full = _resolve_safe(path) | |
| if not os.path.exists(full): | |
| return {"success": False, "error": f"File not found: {path}"} | |
| if os.path.isdir(full): | |
| return {"success": False, "error": f"Path is a directory: {path}"} | |
| with open(full, "r", encoding="utf-8", errors="replace") as f: | |
| lines = f.readlines() | |
| total = len(lines) | |
| start = max(0, (offset - 1) if offset > 0 else 0) | |
| end = (start + limit) if limit else total | |
| selected = lines[start:end] | |
| # Re-number for display | |
| numbered = "".join( | |
| f"{start + i + 1:6}\t{line}" for i, line in enumerate(selected) | |
| ) | |
| return { | |
| "success": True, | |
| "path": path, | |
| "content": numbered, | |
| "line_count": total, | |
| "returned_lines": len(selected), | |
| "truncated": end < total, | |
| } | |
| except Exception as exc: | |
| return {"success": False, "error": str(exc)} | |
| # βββ write_file βββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def write_file(path: str, content: str) -> dict[str, Any]: | |
| """Write content to a file, creating parent directories as needed.""" | |
| try: | |
| full = _resolve_safe(path) | |
| os.makedirs(os.path.dirname(full), exist_ok=True) | |
| with open(full, "w", encoding="utf-8") as f: | |
| f.write(content) | |
| return { | |
| "success": True, | |
| "path": path, | |
| "bytes_written": len(content.encode("utf-8")), | |
| } | |
| except Exception as exc: | |
| return {"success": False, "error": str(exc)} | |
| # βββ edit_file ββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def edit_file( | |
| path: str, | |
| old_str: str, | |
| new_str: str, | |
| replace_all: bool = False, | |
| ) -> dict[str, Any]: | |
| """Replace occurrences of old_str with new_str in a file.""" | |
| try: | |
| full = _resolve_safe(path) | |
| if not os.path.exists(full): | |
| return {"success": False, "error": f"File not found: {path}"} | |
| with open(full, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| if old_str not in content: | |
| return { | |
| "success": False, | |
| "error": f"old_str not found in {path}. Edit aborted.", | |
| } | |
| if old_str == new_str: | |
| return {"success": False, "error": "old_str and new_str are identical."} | |
| count = content.count(old_str) if replace_all else 1 | |
| if not replace_all and count > 1: | |
| return { | |
| "success": False, | |
| "error": ( | |
| f"old_str is not unique ({count} matches) in {path}. " | |
| "Provide more context or use replace_all=true." | |
| ), | |
| } | |
| new_content = content.replace(old_str, new_str) if replace_all else content.replace( | |
| old_str, new_str, 1 | |
| ) | |
| with open(full, "w", encoding="utf-8") as f: | |
| f.write(new_content) | |
| return { | |
| "success": True, | |
| "path": path, | |
| "replacements": count, | |
| } | |
| except Exception as exc: | |
| return {"success": False, "error": str(exc)} | |
| def multi_edit(path: str, edits: list[dict[str, Any]]) -> dict[str, Any]: | |
| """Apply multiple edits to a file atomically (all-or-nothing).""" | |
| try: | |
| full = _resolve_safe(path) | |
| if not os.path.exists(full): | |
| return {"success": False, "error": f"File not found: {path}"} | |
| with open(full, "r", encoding="utf-8") as f: | |
| content = f.read() | |
| applied = 0 | |
| for edit in edits: | |
| old_str = edit.get("old_str", "") | |
| new_str = edit.get("new_str", "") | |
| replace_all = edit.get("replace_all", False) | |
| if old_str not in content: | |
| return { | |
| "success": False, | |
| "error": f"old_str not found in {path} for edit #{applied + 1}.", | |
| "applied": applied, | |
| } | |
| if old_str == new_str: | |
| return { | |
| "success": False, | |
| "error": f"old_str and new_str identical in edit #{applied + 1}.", | |
| "applied": applied, | |
| } | |
| content = content.replace(old_str, new_str) if replace_all else content.replace( | |
| old_str, new_str, 1 | |
| ) | |
| applied += 1 | |
| with open(full, "w", encoding="utf-8") as f: | |
| f.write(content) | |
| return {"success": True, "path": path, "applied": applied} | |
| except Exception as exc: | |
| return {"success": False, "error": str(exc)} | |
| # βββ list_dir βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def list_dir(path: str = ".") -> dict[str, Any]: | |
| """List directory contents.""" | |
| try: | |
| full = _resolve_safe(path) | |
| if not os.path.exists(full): | |
| return {"success": False, "error": f"Path not found: {path}"} | |
| if not os.path.isdir(full): | |
| return {"success": False, "error": f"Not a directory: {path}"} | |
| entries = [] | |
| for name in sorted(os.listdir(full)): | |
| entry_path = os.path.join(full, name) | |
| stat = os.stat(entry_path) | |
| entries.append({ | |
| "name": name, | |
| "type": "dir" if os.path.isdir(entry_path) else "file", | |
| "size": stat.st_size, | |
| "path": os.path.relpath(entry_path, get_workspace_root()), | |
| }) | |
| return { | |
| "success": True, | |
| "path": path, | |
| "entries": entries, | |
| } | |
| except Exception as exc: | |
| return {"success": False, "error": str(exc)} | |
| # βββ glob βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def glob_paths(pattern: str, path: str = ".") -> dict[str, Any]: | |
| """Glob file paths matching a pattern, recursively.""" | |
| try: | |
| full = _resolve_safe(path) | |
| matches: list[str] = [] | |
| for root_dir, _dirs, files in os.walk(full): | |
| for fname in files: | |
| if fnmatch.fnmatch(fname, pattern) or fnmatch.fnmatch( | |
| os.path.relpath(os.path.join(root_dir, fname), full), pattern | |
| ): | |
| matches.append(os.path.relpath(os.path.join(root_dir, fname), get_workspace_root())) | |
| matches.sort() | |
| return {"success": True, "pattern": pattern, "matches": matches} | |
| except Exception as exc: | |
| return {"success": False, "error": str(exc)} | |
| # βββ grep βββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββββ | |
| def grep_search( | |
| pattern: str, | |
| path: str = ".", | |
| include: str | None = None, | |
| ignore_case: bool = False, | |
| max_results: int = 100, | |
| ) -> dict[str, Any]: | |
| """Search file contents with a regex pattern.""" | |
| try: | |
| full = _resolve_safe(path) | |
| flags = re.IGNORECASE if ignore_case else 0 | |
| regex = re.compile(pattern, flags) | |
| matches: list[dict[str, Any]] = [] | |
| for root_dir, _dirs, files in os.walk(full): | |
| for fname in files: | |
| if include and not fnmatch.fnmatch(fname, include): | |
| continue | |
| fpath = os.path.join(root_dir, fname) | |
| try: | |
| with open(fpath, "r", encoding="utf-8", errors="replace") as f: | |
| for lineno, line in enumerate(f, 1): | |
| if regex.search(line): | |
| matches.append({ | |
| "file": os.path.relpath(fpath, get_workspace_root()), | |
| "line": lineno, | |
| "text": line.rstrip()[:500], | |
| }) | |
| if len(matches) >= max_results: | |
| return { | |
| "success": True, | |
| "pattern": pattern, | |
| "matches": matches, | |
| "truncated": True, | |
| } | |
| except (UnicodeDecodeError, PermissionError): | |
| continue | |
| return { | |
| "success": True, | |
| "pattern": pattern, | |
| "matches": matches, | |
| "truncated": False, | |
| } | |
| except re.error as exc: | |
| return {"success": False, "error": f"Invalid regex: {exc}"} | |
| except Exception as exc: | |
| return {"success": False, "error": str(exc)} | |
| # βββ Workspace management βββββββββββββββββββββββββββββββββββββββββββββββ | |
| def list_workspace_tree(max_depth: int = 3) -> dict[str, Any]: | |
| """Return a tree view of the workspace.""" | |
| try: | |
| root = get_workspace_root() | |
| def _walk(path: str, depth: int) -> dict[str, Any]: | |
| if depth > max_depth: | |
| return {"name": os.path.basename(path), "type": "dir", "truncated": True} | |
| entries = [] | |
| try: | |
| for name in sorted(os.listdir(path)): | |
| full = os.path.join(path, name) | |
| if os.path.isdir(full): | |
| entries.append(_walk(full, depth + 1)) | |
| else: | |
| entries.append({ | |
| "name": name, | |
| "type": "file", | |
| "size": os.path.getsize(full), | |
| }) | |
| except PermissionError: | |
| pass | |
| return {"name": os.path.basename(path), "type": "dir", "children": entries} | |
| tree = _walk(root, 0) | |
| return {"success": True, "tree": tree} | |
| except Exception as exc: | |
| return {"success": False, "error": str(exc)} | |
| def reset_workspace() -> dict[str, Any]: | |
| """Clear all files in the workspace (used by /new command).""" | |
| try: | |
| root = get_workspace_root() | |
| if os.path.exists(root): | |
| for entry in os.listdir(root): | |
| full = os.path.join(root, entry) | |
| if os.path.isdir(full): | |
| shutil.rmtree(full) | |
| else: | |
| os.remove(full) | |
| return {"success": True, "message": "Workspace cleared"} | |
| except Exception as exc: | |
| return {"success": False, "error": str(exc)} | |
| def snapshot_workspace() -> dict[str, str]: | |
| """Return a dict of {relative_path: content} for all text files in the workspace. | |
| Used to package workspace files for ZIP/HF deploy. | |
| """ | |
| root = get_workspace_root() | |
| files: dict[str, str] = {} | |
| for dirpath, _dirs, fnames in os.walk(root): | |
| # Skip hidden dirs and node_modules / __pycache__ | |
| parts = os.path.relpath(dirpath, root).split(os.sep) | |
| if any(p.startswith(".") or p in {"node_modules", "__pycache__", ".venv", "venv"} for p in parts): | |
| continue | |
| for fname in fnames: | |
| if fname.startswith("."): | |
| continue | |
| full = os.path.join(dirpath, fname) | |
| try: | |
| with open(full, "r", encoding="utf-8") as f: | |
| files[os.path.relpath(full, root)] = f.read() | |
| except (UnicodeDecodeError, PermissionError): | |
| continue | |
| return files | |