sonicoder / code /tools /fs.py
R-Kentaren's picture
feat(agent): add Claude Code-style agent, skills, slash-commands, hooks, todos, sandboxed workspace, and full-stack scaffolding
81aa0b5 verified
Raw
History Blame Contribute Delete
14.1 kB
"""File system tools: read, write, edit, list, glob, grep.
All tools are sandboxed to a configurable workspace root (default: ./workspace).
They return JSON-serializable dicts so they can be exposed via the API.
"""
from __future__ import annotations
import fnmatch
import os
import re
import shutil
from pathlib import Path
from typing import Any
# ─── Workspace sandbox ──────────────────────────────────────────────────
# Default workspace: ./workspace under the app root
_DEFAULT_WORKSPACE = os.environ.get(
"SONICODER_WORKSPACE",
os.path.abspath(os.path.join(os.path.dirname(__file__), "..", "..", "workspace")),
)
def get_workspace_root() -> str:
"""Return the absolute path of the agent's workspace root."""
root = _DEFAULT_WORKSPACE
os.makedirs(root, exist_ok=True)
return root
def _resolve_safe(path: str) -> str:
"""Resolve a path safely within the workspace root.
Raises ValueError if the resolved path escapes the workspace.
"""
root = get_workspace_root()
if os.path.isabs(path):
full = os.path.abspath(path)
else:
full = os.path.abspath(os.path.join(root, path))
# Ensure path is within the workspace
if not (full == root or full.startswith(root + os.sep)):
raise ValueError(
f"Path '{path}' resolves outside the workspace root ({root}). "
"Agent tools are sandboxed."
)
return full
# ─── read_file ──────────────────────────────────────────────────────────
def read_file(path: str, offset: int = 0, limit: int | None = None) -> dict[str, Any]:
"""Read a text file from the workspace.
Args:
path: Relative path inside the workspace, or absolute within it.
offset: 1-indexed line to start reading from.
limit: Maximum number of lines to read.
Returns:
dict with: path, content, line_count, truncated
"""
try:
full = _resolve_safe(path)
if not os.path.exists(full):
return {"success": False, "error": f"File not found: {path}"}
if os.path.isdir(full):
return {"success": False, "error": f"Path is a directory: {path}"}
with open(full, "r", encoding="utf-8", errors="replace") as f:
lines = f.readlines()
total = len(lines)
start = max(0, (offset - 1) if offset > 0 else 0)
end = (start + limit) if limit else total
selected = lines[start:end]
# Re-number for display
numbered = "".join(
f"{start + i + 1:6}\t{line}" for i, line in enumerate(selected)
)
return {
"success": True,
"path": path,
"content": numbered,
"line_count": total,
"returned_lines": len(selected),
"truncated": end < total,
}
except Exception as exc:
return {"success": False, "error": str(exc)}
# ─── write_file ─────────────────────────────────────────────────────────
def write_file(path: str, content: str) -> dict[str, Any]:
"""Write content to a file, creating parent directories as needed."""
try:
full = _resolve_safe(path)
os.makedirs(os.path.dirname(full), exist_ok=True)
with open(full, "w", encoding="utf-8") as f:
f.write(content)
return {
"success": True,
"path": path,
"bytes_written": len(content.encode("utf-8")),
}
except Exception as exc:
return {"success": False, "error": str(exc)}
# ─── edit_file ──────────────────────────────────────────────────────────
def edit_file(
path: str,
old_str: str,
new_str: str,
replace_all: bool = False,
) -> dict[str, Any]:
"""Replace occurrences of old_str with new_str in a file."""
try:
full = _resolve_safe(path)
if not os.path.exists(full):
return {"success": False, "error": f"File not found: {path}"}
with open(full, "r", encoding="utf-8") as f:
content = f.read()
if old_str not in content:
return {
"success": False,
"error": f"old_str not found in {path}. Edit aborted.",
}
if old_str == new_str:
return {"success": False, "error": "old_str and new_str are identical."}
count = content.count(old_str) if replace_all else 1
if not replace_all and count > 1:
return {
"success": False,
"error": (
f"old_str is not unique ({count} matches) in {path}. "
"Provide more context or use replace_all=true."
),
}
new_content = content.replace(old_str, new_str) if replace_all else content.replace(
old_str, new_str, 1
)
with open(full, "w", encoding="utf-8") as f:
f.write(new_content)
return {
"success": True,
"path": path,
"replacements": count,
}
except Exception as exc:
return {"success": False, "error": str(exc)}
def multi_edit(path: str, edits: list[dict[str, Any]]) -> dict[str, Any]:
"""Apply multiple edits to a file atomically (all-or-nothing)."""
try:
full = _resolve_safe(path)
if not os.path.exists(full):
return {"success": False, "error": f"File not found: {path}"}
with open(full, "r", encoding="utf-8") as f:
content = f.read()
applied = 0
for edit in edits:
old_str = edit.get("old_str", "")
new_str = edit.get("new_str", "")
replace_all = edit.get("replace_all", False)
if old_str not in content:
return {
"success": False,
"error": f"old_str not found in {path} for edit #{applied + 1}.",
"applied": applied,
}
if old_str == new_str:
return {
"success": False,
"error": f"old_str and new_str identical in edit #{applied + 1}.",
"applied": applied,
}
content = content.replace(old_str, new_str) if replace_all else content.replace(
old_str, new_str, 1
)
applied += 1
with open(full, "w", encoding="utf-8") as f:
f.write(content)
return {"success": True, "path": path, "applied": applied}
except Exception as exc:
return {"success": False, "error": str(exc)}
# ─── list_dir ───────────────────────────────────────────────────────────
def list_dir(path: str = ".") -> dict[str, Any]:
"""List directory contents."""
try:
full = _resolve_safe(path)
if not os.path.exists(full):
return {"success": False, "error": f"Path not found: {path}"}
if not os.path.isdir(full):
return {"success": False, "error": f"Not a directory: {path}"}
entries = []
for name in sorted(os.listdir(full)):
entry_path = os.path.join(full, name)
stat = os.stat(entry_path)
entries.append({
"name": name,
"type": "dir" if os.path.isdir(entry_path) else "file",
"size": stat.st_size,
"path": os.path.relpath(entry_path, get_workspace_root()),
})
return {
"success": True,
"path": path,
"entries": entries,
}
except Exception as exc:
return {"success": False, "error": str(exc)}
# ─── glob ───────────────────────────────────────────────────────────────
def glob_paths(pattern: str, path: str = ".") -> dict[str, Any]:
"""Glob file paths matching a pattern, recursively."""
try:
full = _resolve_safe(path)
matches: list[str] = []
for root_dir, _dirs, files in os.walk(full):
for fname in files:
if fnmatch.fnmatch(fname, pattern) or fnmatch.fnmatch(
os.path.relpath(os.path.join(root_dir, fname), full), pattern
):
matches.append(os.path.relpath(os.path.join(root_dir, fname), get_workspace_root()))
matches.sort()
return {"success": True, "pattern": pattern, "matches": matches}
except Exception as exc:
return {"success": False, "error": str(exc)}
# ─── grep ───────────────────────────────────────────────────────────────
def grep_search(
pattern: str,
path: str = ".",
include: str | None = None,
ignore_case: bool = False,
max_results: int = 100,
) -> dict[str, Any]:
"""Search file contents with a regex pattern."""
try:
full = _resolve_safe(path)
flags = re.IGNORECASE if ignore_case else 0
regex = re.compile(pattern, flags)
matches: list[dict[str, Any]] = []
for root_dir, _dirs, files in os.walk(full):
for fname in files:
if include and not fnmatch.fnmatch(fname, include):
continue
fpath = os.path.join(root_dir, fname)
try:
with open(fpath, "r", encoding="utf-8", errors="replace") as f:
for lineno, line in enumerate(f, 1):
if regex.search(line):
matches.append({
"file": os.path.relpath(fpath, get_workspace_root()),
"line": lineno,
"text": line.rstrip()[:500],
})
if len(matches) >= max_results:
return {
"success": True,
"pattern": pattern,
"matches": matches,
"truncated": True,
}
except (UnicodeDecodeError, PermissionError):
continue
return {
"success": True,
"pattern": pattern,
"matches": matches,
"truncated": False,
}
except re.error as exc:
return {"success": False, "error": f"Invalid regex: {exc}"}
except Exception as exc:
return {"success": False, "error": str(exc)}
# ─── Workspace management ───────────────────────────────────────────────
def list_workspace_tree(max_depth: int = 3) -> dict[str, Any]:
"""Return a tree view of the workspace."""
try:
root = get_workspace_root()
def _walk(path: str, depth: int) -> dict[str, Any]:
if depth > max_depth:
return {"name": os.path.basename(path), "type": "dir", "truncated": True}
entries = []
try:
for name in sorted(os.listdir(path)):
full = os.path.join(path, name)
if os.path.isdir(full):
entries.append(_walk(full, depth + 1))
else:
entries.append({
"name": name,
"type": "file",
"size": os.path.getsize(full),
})
except PermissionError:
pass
return {"name": os.path.basename(path), "type": "dir", "children": entries}
tree = _walk(root, 0)
return {"success": True, "tree": tree}
except Exception as exc:
return {"success": False, "error": str(exc)}
def reset_workspace() -> dict[str, Any]:
"""Clear all files in the workspace (used by /new command)."""
try:
root = get_workspace_root()
if os.path.exists(root):
for entry in os.listdir(root):
full = os.path.join(root, entry)
if os.path.isdir(full):
shutil.rmtree(full)
else:
os.remove(full)
return {"success": True, "message": "Workspace cleared"}
except Exception as exc:
return {"success": False, "error": str(exc)}
def snapshot_workspace() -> dict[str, str]:
"""Return a dict of {relative_path: content} for all text files in the workspace.
Used to package workspace files for ZIP/HF deploy.
"""
root = get_workspace_root()
files: dict[str, str] = {}
for dirpath, _dirs, fnames in os.walk(root):
# Skip hidden dirs and node_modules / __pycache__
parts = os.path.relpath(dirpath, root).split(os.sep)
if any(p.startswith(".") or p in {"node_modules", "__pycache__", ".venv", "venv"} for p in parts):
continue
for fname in fnames:
if fname.startswith("."):
continue
full = os.path.join(dirpath, fname)
try:
with open(full, "r", encoding="utf-8") as f:
files[os.path.relpath(full, root)] = f.read()
except (UnicodeDecodeError, PermissionError):
continue
return files