"""FastAPI / Gradio Server routes. Defines all HTTP and API endpoints: - GET / → serves the index.html frontend - GET /api/model-status → model loading status - GET /images/{f} → serve generated plot images - GET /download/{f} → serve project ZIP downloads - API web_search → Google search scraping - API chat → streaming chat with code execution - API push_hf → push to HuggingFace Hub - API switch_model → switch between loaded models - API upload_image → upload image for VLM inference - API hf_auth → get HF OAuth profile & organizations - API agent_run → Claude Code-style agent loop with tools - API list_skills → list available skills - API list_commands→ list available slash commands - API list_hooks → list configured hooks - API workspace_tree→ list workspace files - API workspace_read→ read a workspace file - API workspace_write→ write a workspace file - API workspace_bash→ run a bash command in workspace - API todo_read → read current todo list - API todo_write → update todo list - API import_github → clone a GitHub repo into the workspace - API github_url_examples → return accepted GitHub URL formats - API push_github → push the current workspace to a GitHub repo """ from __future__ import annotations import base64 import json import logging import os import tempfile from pathlib import Path from typing import Any, Optional import gradio as gr from fastapi.responses import HTMLResponse, FileResponse try: from gradio import Server except ImportError: # Fallback for older/newer Gradio versions where Server may not be exposed # at the top level. We provide a minimal shim so the module can still be # imported for testing purposes. class Server: # type: ignore """Minimal shim for Gradio Server when not available.""" def __init__(self, *args, **kwargs): from fastapi import FastAPI self._fastapi = FastAPI() def get(self, path: str, **kwargs): return self._fastapi.get(path, **kwargs) def api(self, name: str = None, concurrency_limit: int = 1): def decorator(fn): # Store as attribute so it can be inspected fn._api_name = name fn._concurrency_limit = concurrency_limit return fn return decorator from code.config.constants import ( APP_TITLE, DEFAULT_MODEL_KEY, EXAMPLE_PROMPTS, LANGUAGE_OPTIONS, MODEL_CONFIGS, MODEL_URL, PY_TIMEOUT_S, ) from code.execution.code_extractor import ( build_iframe, extract_code, extract_multi_file, is_gradio_code, normalize_language, strip_thinking_blocks, ) from code.execution.gradio_runner import run_gradio_app, stop_gradio_app from code.execution.python_runner import run_python from code.huggingface.push import create_project_zip, push_to_huggingface from code.model.loader import ( get_model_status, is_model_loaded, get_current_model_key, get_current_model_type, switch_model, ) from code.model.inference import call_model from code.server.chat_helpers import chat_history_to_messages, targeted_prompt from code.websearch.google_scraper import web_search_google, format_search_results logger = logging.getLogger(__name__) # ─── Served Files Registry ────────────────────────────────────────────── _served_files: dict[str, str] = {} # ─── Uploaded Images Registry ─────────────────────────────────────────── _uploaded_images: dict[str, str] = {} # ─── Server Instance ──────────────────────────────────────────────────── app = Server() # ─── HTTP Routes ──────────────────────────────────────────────────────── @app.get("/", response_class=HTMLResponse) async def homepage(): """Serve the index.html frontend with runtime config injected.""" html_path = os.path.join(os.path.dirname(os.path.abspath(__file__)), "..", "..", "index.html") with open(html_path, "r", encoding="utf-8") as f: content = f.read() # Load skills, commands, hooks for the frontend try: from code.skills import list_skills skills_list = list_skills() except Exception: skills_list = [] try: from code.commands import list_commands commands_list = list_commands() except Exception: commands_list = [] try: from code.hooks import list_hooks hooks_list = list_hooks() except Exception: hooks_list = [] try: from code.agents import list_agents, get_active_agent agents_list = list_agents() active_agent = get_active_agent() except Exception: agents_list = [] active_agent = None config = json.dumps({ "app_title": APP_TITLE, "model_id": MODEL_CONFIGS[DEFAULT_MODEL_KEY]["id"], "model_configs": {k: {"name": v["name"], "type": v["type"], "description": v["description"]} for k, v in MODEL_CONFIGS.items()}, "model_url": MODEL_URL, "languages": LANGUAGE_OPTIONS, "examples": [ {"label": label, "prompt": prompt, "language": lang, "framework": fw} for label, prompt, lang, fw in EXAMPLE_PROMPTS ], "default_model": "minicpm5-1b", "skills": skills_list, "commands": commands_list, "hooks": hooks_list, "agents": agents_list, "active_agent": active_agent, }) content = content.replace("__RUNTIME_CONFIG__", config) return content @app.get("/api/model-status") async def model_status_endpoint(): """Return the current model loading status.""" return get_model_status() @app.get("/images/{filename}") async def serve_image(filename: str): """Serve a generated plot image by filename.""" path = _served_files.get(f"img:{filename}") if path and os.path.exists(path): return FileResponse(path, media_type="image/png") return HTMLResponse("Not found", status_code=404) @app.get("/download/{filename}") async def serve_download(filename: str): """Serve a project ZIP download by filename.""" path = _served_files.get(f"dl:{filename}") if path and os.path.exists(path): return FileResponse(path, filename=filename, media_type="application/octet-stream") return HTMLResponse("Not found", status_code=404) @app.get("/uploaded-images/{image_id}") async def serve_uploaded_image(image_id: str): """Serve an uploaded image by its ID.""" path = _uploaded_images.get(image_id) if path and os.path.exists(path): return FileResponse(path, media_type="image/png") return HTMLResponse("Not found", status_code=404) # ─── Gradio API Endpoints ────────────────────────────────────────────── @app.api(name="switch_model", concurrency_limit=1) def handle_switch_model(model_key: str) -> str: """Switch to a different model.""" result = switch_model(model_key) yield json.dumps(result) @app.api(name="upload_image", concurrency_limit=4) def handle_upload_image(image_data: str) -> str: """Upload a base64-encoded image for VLM inference. Returns an image ID that can be referenced in chat. """ try: if not image_data: yield json.dumps({"success": False, "message": "No image data provided"}) return # Handle data URI format: data:image/png;base64,... if image_data.startswith("data:"): # Extract the base64 part parts = image_data.split(",", 1) if len(parts) == 2: image_data = parts[1] # Decode base64 image_bytes = base64.b64decode(image_data) # Save to temp file img_dir = tempfile.mkdtemp(prefix="uploaded_img_") image_id = f"img_{os.getpid()}_{int(os.urandom(4).hex(), 16)}" img_path = os.path.join(img_dir, f"{image_id}.png") Path(img_path).write_bytes(image_bytes) # Register for serving _uploaded_images[image_id] = img_path # Create a URL for the image that the VLM can access image_url = f"/uploaded-images/{image_id}" # Also save as a file:// URL for local VLM access file_url = f"file://{img_path}" yield json.dumps({ "success": True, "image_id": image_id, "image_url": image_url, "file_url": file_url, "message": "Image uploaded successfully", }) except Exception as exc: logger.exception("Image upload failed") yield json.dumps({ "success": False, "message": f"Upload failed: {str(exc)}", }) @app.api(name="web_search", concurrency_limit=4) def handle_web_search(query: str) -> str: """Search the web using Google scraping. No API key needed.""" query = (query or "").strip() if not query: yield json.dumps({"success": False, "results": [], "message": "Empty search query"}) return try: results = web_search_google(query, num_results=8) formatted = format_search_results(results) yield json.dumps({ "success": True, "results": results, "formatted": formatted, "message": f"Found {len(results)} results", }) except Exception as exc: logger.exception("Web search failed") yield json.dumps({ "success": False, "results": [], "message": f"Search failed: {str(exc)}", }) @app.api(name="chat", concurrency_limit=2) def handle_chat( prompt: str, target_language: str, target_framework: str, history_json: str, exec_context_json: str, search_enabled: str = "false", image_url: str = "", ) -> str: """Stream chat responses with code execution. Yields JSON strings.""" history = json.loads(history_json) if history_json else [] execution_context = json.loads(exec_context_json) if exec_context_json else {} prompt = (prompt or "").strip() if not prompt: yield json.dumps({ "type": "error", "status_text": "Enter a prompt to get started.", "status_state": "info", "history": history, "execution": execution_context, }) return # Check model status model_status = get_model_status() if model_status["status"] == "loading": yield json.dumps({ "type": "error", "status_text": model_status["message"], "status_state": "working", "history": history, "execution": execution_context, }) return if model_status["status"] != "ready": yield json.dumps({ "type": "error", "status_text": model_status["message"], "status_state": "error", "history": history, "execution": execution_context, }) return # Add user message and placeholder assistant message history = list(history) + [ {"role": "user", "content": prompt}, {"role": "assistant", "content": ""}, ] yield json.dumps({ "type": "status", "status_text": "Thinking...", "status_state": "working", "history": history, "execution": execution_context, }) # Web search if enabled search_context = "" if search_enabled.lower() == "true": yield json.dumps({ "type": "status", "status_text": "Searching the web...", "status_state": "working", "history": history, "execution": execution_context, }) search_results = web_search_google(prompt, num_results=6) if search_results: search_context = format_search_results(search_results) yield json.dumps({ "type": "search_results", "status_text": f"Found {len(search_results)} results, generating code...", "status_state": "working", "history": history, "execution": execution_context, "search_results": search_results, }) # Build messages for model model_history = list(history[:-1]) model_history[-1] = { "role": "user", "content": targeted_prompt( prompt, target_language, target_framework, execution_context, search_context ), } messages = chat_history_to_messages(model_history) # Determine image URL for VLM vlm_image_url = image_url.strip() if image_url else None final_response = "" for partial in call_model(messages, image_url=vlm_image_url): final_response = partial # Strip thinking blocks so chat only shows clean output clean_partial = strip_thinking_blocks(partial) history[-1]["content"] = clean_partial yield json.dumps({ "type": "streaming", "status_text": "Generating...", "status_state": "working", "history": history, "execution": execution_context, }) if not final_response: history[-1]["content"] = "The model did not return a response." yield json.dumps({ "type": "error", "status_text": "No model response.", "status_state": "error", "history": history, "execution": execution_context, }) return # Extract code from response (use cleaned version) clean_response = strip_thinking_blocks(final_response) code, fence_lang = extract_code(clean_response) target = normalize_language(target_language, fence_lang) # Also try multi-file extraction multi_files = extract_multi_file(clean_response) if not code and not multi_files: yield json.dumps({ "type": "complete", "status_text": "Answered without running code.", "status_state": "info", "history": history, "execution": execution_context, }) return yield json.dumps({ "type": "status", "status_text": "Running...", "status_state": "working", "history": history, "execution": execution_context, }) # Execute code stdout, stderr, image_path, status_text, status_state = "", "", None, "Preview ready", "success" is_gradio = False gradio_url = None if target == "python" and code: if is_gradio_code(code) or target_framework == "Gradio": is_gradio = True gradio_result = run_gradio_app(code) if gradio_result["success"]: gradio_url = gradio_result["url"] status_text = f"Gradio app running at {gradio_url}" status_state = "success" stderr = f"Gradio app launched successfully at {gradio_url}" else: status_text = "Gradio launch failed" status_state = "error" stderr = gradio_result.get("stderr", gradio_result.get("message", "Launch failed")) else: result = run_python(code) if result.timed_out: stdout, stderr, image_path = result.stdout, result.stderr, result.image_path status_text = f"Timed out after {PY_TIMEOUT_S}s" status_state = "error" elif result.returncode: stdout, stderr, image_path = result.stdout, result.stderr, result.image_path status_text = "Finished with errors" status_state = "error" else: stdout, stderr, image_path = result.stdout, result.stderr, result.image_path status_text = "Ran successfully" status_state = "success" # Register image for serving image_url_out = None if image_path: filename = os.path.basename(image_path) _served_files[f"img:{filename}"] = image_path image_url_out = f"/images/{filename}" # Register code for download download_url = None project_files = dict(multi_files) if multi_files else {} # Rename main.py → app.py for Python/Gradio projects (HF Spaces expects app.py) if project_files and "main.py" in project_files and "app.py" not in project_files: if target == "python" or is_gradio: project_files["app.py"] = project_files.pop("main.py") # If project_files is empty but we have single code, add it if not project_files and code: if target == "python": fname = "app.py" if (is_gradio or is_gradio_code(code)) else "main.py" elif target in {"web", "html", "javascript"}: fname = "index.html" else: fname = f"main.{fence_lang or 'txt'}" project_files = {fname: code} if project_files: project_name = "generated-project" zip_path = create_project_zip(project_files, project_name) zip_filename = f"{project_name}.zip" _served_files[f"dl:{zip_filename}"] = zip_path download_url = f"/download/{zip_filename}" elif code: ext = "py" if target == "python" else "html" dl_filename = f"generated.{ext}" dl_dir = tempfile.mkdtemp(prefix="fullstack_dl_") dl_path = os.path.join(dl_dir, dl_filename) Path(dl_path).write_text(code, encoding="utf-8") _served_files[f"dl:{dl_filename}"] = dl_path download_url = f"/download/{dl_filename}" # Determine if this is web previewable is_web = target in {"web", "javascript", "typescript", "html"} or (fence_lang or "") in {"html", "web"} web_code = code if is_web else None execution_context = { "code": code, "target": target, "fence_lang": fence_lang or target, "stdout": stdout, "stderr": stderr, "image_url": image_url_out, "image_path": image_path, "status": status_text, "language": fence_lang or target, "suggested_tab": "preview" if (image_path or is_web or is_gradio) else "console", "download_url": download_url, "project_files": project_files, "is_web": is_web, "web_code": web_code, "is_gradio": is_gradio, "gradio_url": gradio_url, } yield json.dumps({ "type": "complete", "status_text": status_text, "status_state": status_state, "history": history, "execution": execution_context, }) @app.api(name="hf_auth", concurrency_limit=4) def handle_hf_auth( oauth_token: str = "", token: Optional[gr.OAuthToken] = None, ) -> str: """Get HuggingFace OAuth profile and list of organizations. Reads the OAuth token from (in priority order): 1. The `oauth_token` parameter (explicit pass-in from the client). 2. The injected `token: gr.OAuthToken` — Gradio auto-injects this from the user's session IF they have completed the "Sign in with HuggingFace" OAuth flow. 3. The `HF_TOKEN` env var (for local dev / when running outside a Space). Returns the user's profile, organizations, and the access token so the frontend can use it for `push_hf` calls (deploying to user's HF account). """ try: from huggingface_hub import whoami resolved = oauth_token.strip() if oauth_token else "" # ── Fall back to Gradio-injected OAuthToken (from session) ─────── if not resolved and token is not None: try: resolved = (getattr(token, "token", "") or "").strip() except Exception: pass # ── Last-resort fallback: HF_TOKEN env var (local dev) ────────── if not resolved: resolved = (os.getenv("HF_TOKEN") or "").strip() if not resolved: yield json.dumps({ "authenticated": False, "username": "", "name": "", "picture": "", "organizations": [], "message": "Not signed in. Click Sign In to authenticate with HuggingFace.", }) return # Get user info using the OAuth token user_info = whoami(token=resolved) username = user_info.get("name", "") fullname = user_info.get("fullname", username) # Get avatar avatar_url = "" avatar_info = user_info.get("avatarUrl", "") if avatar_info: avatar_url = avatar_info # Get organizations orgs = [] for org in user_info.get("orgs", []): orgs.append({ "name": org.get("name", ""), "avatar": org.get("avatarUrl", ""), }) # Also check orgRoles for role info org_roles = user_info.get("orgRoles", []) for role_info in org_roles: org_name = role_info.get("org", "") role = role_info.get("role", "member") # Add role info to existing org if found for org in orgs: if org["name"] == org_name: org["role"] = role break yield json.dumps({ "authenticated": True, "username": username, "name": fullname, "picture": avatar_url, "organizations": orgs, "token": resolved, "message": f"Signed in as {username}", }) except Exception as exc: logger.exception("HF auth check failed") yield json.dumps({ "authenticated": False, "username": "", "name": "", "picture": "", "organizations": [], "message": f"Auth check failed: {str(exc)}", }) @app.api(name="push_hf", concurrency_limit=1) def handle_push_hf( exec_context_json: str, repo_name: str, hf_token: str, space_sdk: str = "auto", is_space: str = "true", ) -> str: """Push generated project to HuggingFace Hub.""" try: execution_context = json.loads(exec_context_json) if exec_context_json else {} project_files = dict(execution_context.get("project_files", {}) or {}) code = execution_context.get("code", "") # If project_files is empty but we have code, build files from code if not project_files and code: lang = execution_context.get("language", "python") is_gradio = execution_context.get("is_gradio", False) # Map language to entry file — JS/TS single-files get wrapped for Docker if lang in ("javascript", "js", "typescript", "ts"): # For single-file JS/TS code that is HTML (vanilla), keep as index.html if " Server: """Return the configured Gradio Server app instance.""" return app # ─── Agent / Skills / Commands / Hooks / Workspace Endpoints ────────── @app.api(name="agent_run", concurrency_limit=2) def handle_agent_run( prompt: str, target_language: str = "", target_framework: str = "", history_json: str = "[]", skills_json: str = "[]", search_enabled: str = "false", image_url: str = "", agent_name: str = "", ) -> str: """Run the Claude Code-style agent loop with tools. Yields JSON events: status, tool_call, tool_result, streaming, complete, error. `agent_name` (optional) overrides the session-active agent for this run. The `/agent use`, `/agent reset`, and `/agent delete` slash commands are intercepted here and dispatched to the agents module before the model runs. """ from code.agent import run_agent history = json.loads(history_json) if history_json else [] skills = json.loads(skills_json) if skills_json else [] prompt = (prompt or "").strip() if not prompt: yield json.dumps({ "type": "error", "message": "Empty prompt", }) return # ── Intercept /agent use|reset|delete (session-state mutations) ──── # These need to happen server-side BEFORE the model runs so the very # next prompt reflects the change. stripped = prompt.lstrip() if stripped.startswith("/agent ") or stripped == "/agent": from code.agents import ( set_active_agent, delete_agent as _delete_agent, list_agents as _list_agents, get_active_agent, ) parts = stripped.split(None, 2) # ["/agent", , ] sub = parts[1] if len(parts) > 1 else "" arg = parts[2].strip() if len(parts) > 2 else "" if sub == "use" and arg: result = set_active_agent(arg) yield json.dumps({ "type": "complete", "content": ( f"**Agent activated: `{result.get('active_agent')}`**\n\n" + (result.get("message", "") if not result.get("success") else "Subsequent prompts will use this agent's persona and tool whitelist.") ), "agent": result.get("active_agent"), "agent_op": "use", }) return if sub == "reset" or (sub == "" and arg == ""): result = set_active_agent(None) yield json.dumps({ "type": "complete", "content": "**Active agent reset.** Subsequent prompts will use the default SoniCoder persona.", "agent": None, "agent_op": "reset", }) return if sub == "delete" and arg: result = _delete_agent(arg) if result.get("success"): yield json.dumps({ "type": "complete", "content": f"**Agent `{arg}` deleted.**", "agent": None, "agent_op": "delete", }) else: yield json.dumps({ "type": "error", "message": result.get("error", f"Failed to delete agent '{arg}'"), "agent_op": "delete", }) return if sub == "list": agents_list = _list_agents() if not agents_list: content = "_No agents available._ Create one with `/agent create `." else: lines = ["| Name | Description | Author | Tools |", "|------|-------------|--------|-------|"] for a in agents_list: tools = ", ".join(a.get("tools", [])) or "(all)" active_marker = " **(active)**" if a.get("active") else "" lines.append(f"| `{a['name']}`{active_marker} | {a.get('description', '')[:80]} | {a.get('author', '')} | {tools} |") content = "\n".join(lines) yield json.dumps({ "type": "complete", "content": content, "agents": agents_list, "agent_op": "list", }) return # /agent create|show → fall through to the model (handled by slash command expansion) # Optional web search search_context = "" if search_enabled.lower() == "true": try: search_results = web_search_google(prompt, num_results=6) if search_results: search_context = format_search_results(search_results) yield json.dumps({ "type": "search_results", "results": search_results, "status_text": f"Found {len(search_results)} results, running agent...", }) except Exception as exc: logger.warning("Web search failed: %s", exc) try: for event in run_agent( user_input=prompt, history=history, target_language=target_language, target_framework=target_framework, skills=skills, search_context=search_context, image_url=image_url.strip() or None, agent_name=agent_name.strip() or None, ): yield json.dumps(event, default=str) except Exception as exc: logger.exception("Agent run failed") yield json.dumps({ "type": "error", "message": str(exc), }) @app.api(name="list_skills", concurrency_limit=4) def handle_list_skills() -> str: """List all available skills.""" from code.skills import list_skills skills = list_skills() yield json.dumps({"success": True, "skills": skills}) @app.api(name="list_commands", concurrency_limit=4) def handle_list_commands() -> str: """List all available slash commands.""" from code.commands import list_commands commands = list_commands() yield json.dumps({"success": True, "commands": commands}) @app.api(name="list_hooks", concurrency_limit=4) def handle_list_hooks() -> str: """List all configured hooks.""" from code.hooks import list_hooks hooks = list_hooks() yield json.dumps({"success": True, "hooks": hooks}) @app.api(name="workspace_tree", concurrency_limit=4) def handle_workspace_tree() -> str: """Return the workspace file tree.""" from code.tools.fs import list_workspace_tree result = list_workspace_tree() yield json.dumps(result, default=str) @app.api(name="workspace_read", concurrency_limit=4) def handle_workspace_read(path: str, offset: int = 0, limit: int = 0) -> str: """Read a file from the workspace.""" from code.tools.fs import read_file args = {"path": path} if offset: args["offset"] = offset if limit: args["limit"] = limit result = read_file(**args) yield json.dumps(result, default=str) @app.api(name="workspace_write", concurrency_limit=1) def handle_workspace_write(path: str, content: str) -> str: """Write a file to the workspace.""" from code.tools.fs import write_file result = write_file(path=path, content=content) yield json.dumps(result, default=str) @app.api(name="workspace_bash", concurrency_limit=1) def handle_workspace_bash(command: str, timeout: int = 30) -> str: """Run a bash command in the workspace.""" from code.tools.bash import run_bash result = run_bash(command=command, timeout=timeout) yield json.dumps(result, default=str) @app.api(name="workspace_edit", concurrency_limit=1) def handle_workspace_edit( path: str, old_str: str, new_str: str, replace_all: str = "false", ) -> str: """Edit a file in the workspace.""" from code.tools.fs import edit_file result = edit_file( path=path, old_str=old_str, new_str=new_str, replace_all=replace_all.lower() == "true", ) yield json.dumps(result, default=str) @app.api(name="workspace_glob", concurrency_limit=4) def handle_workspace_glob(pattern: str, path: str = ".") -> str: """Glob files in the workspace.""" from code.tools.fs import glob_paths result = glob_paths(pattern=pattern, path=path) yield json.dumps(result, default=str) @app.api(name="workspace_grep", concurrency_limit=4) def handle_workspace_grep( pattern: str, path: str = ".", include: str = "", ignore_case: str = "false", ) -> str: """Grep file contents in the workspace.""" from code.tools.fs import grep_search result = grep_search( pattern=pattern, path=path, include=include or None, ignore_case=ignore_case.lower() == "true", ) yield json.dumps(result, default=str) @app.api(name="todo_read", concurrency_limit=4) def handle_todo_read(session_id: str = "default") -> str: """Read the current todo list.""" from code.tools.todos import todo_read result = todo_read(session_id=session_id) yield json.dumps(result, default=str) @app.api(name="todo_write", concurrency_limit=1) def handle_todo_write(todos_json: str, session_id: str = "default") -> str: """Replace the todo list.""" from code.tools.todos import todo_write todos = json.loads(todos_json) if todos_json else [] result = todo_write(todos=todos, session_id=session_id) yield json.dumps(result, default=str) @app.api(name="workspace_snapshot", concurrency_limit=2) def handle_workspace_snapshot() -> str: """Return all workspace files for ZIP/deploy.""" from code.tools.fs import snapshot_workspace files = snapshot_workspace() yield json.dumps({"success": True, "files": files, "count": len(files)}) @app.api(name="workspace_reset", concurrency_limit=1) def handle_workspace_reset() -> str: """Clear the workspace.""" from code.tools.fs import reset_workspace result = reset_workspace() yield json.dumps(result, default=str) @app.api(name="create_hook", concurrency_limit=1) def handle_create_hook( name: str, event: str, pattern: str, action: str = "warn", message: str = "", enabled: str = "true", ) -> str: """Create a new user hook.""" from code.hooks import create_hook result = create_hook( name=name, event=event, pattern=pattern, action=action, message=message, enabled=enabled.lower() == "true", ) yield json.dumps(result, default=str) @app.api(name="delete_hook", concurrency_limit=1) def handle_delete_hook(name: str) -> str: """Delete a user hook by name.""" from code.hooks import delete_hook result = delete_hook(name) yield json.dumps(result, default=str) # ─── Custom Agent Endpoints ──────────────────────────────────────────── @app.api(name="list_agents", concurrency_limit=4) def handle_list_agents() -> str: """List all available custom agents (builtins + user).""" from code.agents import list_agents, get_active_agent agents = list_agents() active = get_active_agent() yield json.dumps({ "success": True, "agents": agents, "active_agent": active, }, default=str) @app.api(name="get_agent", concurrency_limit=4) def handle_get_agent(name: str) -> str: """Get the full definition of a single agent.""" from code.agents import get_agent agent = get_agent(name) if not agent: yield json.dumps({"success": False, "error": f"Agent not found: {name}"}) return # Strip non-serializable path agent_serializable = {k: v for k, v in agent.items() if k != "path"} yield json.dumps({"success": True, "agent": agent_serializable}, default=str) @app.api(name="save_agent", concurrency_limit=1) def handle_save_agent( name: str, description: str, body: str, tools: str = "", skills: str = "", temperature: str = "", max_iterations: str = "", tags: str = "", author: str = "user", ) -> str: """Create or overwrite a custom agent definition (manual save, no AI). `tools`, `skills`, `tags` are comma-separated strings. `temperature` and `max_iterations` are strings that will be parsed if non-empty. """ from code.agents import save_agent, ALL_TOOLS def _split(s: str) -> list[str]: return [x.strip() for x in (s or "").split(",") if x.strip()] tools_list = _split(tools) or list(ALL_TOOLS) skills_list = _split(skills) tags_list = _split(tags) temp_val = None if temperature.strip(): try: temp_val = float(temperature) except ValueError: yield json.dumps({"success": False, "error": f"Invalid temperature: {temperature}"}) return iter_val = None if max_iterations.strip(): try: iter_val = int(max_iterations) except ValueError: yield json.dumps({"success": False, "error": f"Invalid max_iterations: {max_iterations}"}) return result = save_agent( name=name, description=description, body=body, tools=tools_list, skills=skills_list, temperature=temp_val, max_iterations=iter_val, tags=tags_list, author=author, ) yield json.dumps(result, default=str) @app.api(name="delete_agent", concurrency_limit=1) def handle_delete_agent(name: str) -> str: """Delete a user-defined agent by name.""" from code.agents import delete_agent result = delete_agent(name) yield json.dumps(result, default=str) @app.api(name="set_active_agent", concurrency_limit=1) def handle_set_active_agent(name: str = "") -> str: """Set the active agent for subsequent prompts. Empty string resets.""" from code.agents import set_active_agent, list_agents, get_active_agent result = set_active_agent(name.strip() or None) if not result.get("success"): yield json.dumps(result, default=str) return # Return fresh list + active agent so frontend can re-render yield json.dumps({ **result, "agents": list_agents(), "active_agent": get_active_agent(), }, default=str) # ─── GitHub Import Endpoint ──────────────────────────────────────────── @app.api(name="import_github", concurrency_limit=1) def handle_import_github( url: str, branch: str = "", subdir: str = "", target_subdir: str = "", depth: str = "1", timeout: str = "120", ) -> str: """Clone a GitHub repo into the sandboxed workspace. Parameters ---------- url : str GitHub URL. Accepts: - https://github.com//[.git] - https://github.com///tree/[/] - git@github.com:/.git branch : str Optional branch/tag override. If empty, uses URL's branch or the repo's default branch. subdir : str Optional sub-directory inside the repo to import. target_subdir : str Where inside the workspace to place the import. Empty = root. depth : str Git clone depth (default "1" for shallow clone). timeout : str Git clone timeout in seconds (default "120"). Yields ------ JSON dict with keys: success, message, url, owner, repo, branch, subdir, files_imported, dirs_skipped, workspace_path, tree_preview. """ from code.tools.github import import_github_repo try: depth_int = int(depth) if str(depth).strip() else 1 depth_int = max(1, min(50, depth_int)) except (ValueError, TypeError): depth_int = 1 try: timeout_int = int(timeout) if str(timeout).strip() else 120 timeout_int = max(10, min(600, timeout_int)) except (ValueError, TypeError): timeout_int = 120 result = import_github_repo( url=url, branch=branch, subdir=subdir, target_subdir=target_subdir, depth=depth_int, timeout=timeout_int, ) yield json.dumps(result, default=str) @app.api(name="github_url_examples", concurrency_limit=4) def handle_github_url_examples() -> str: """Return example GitHub URL formats accepted by import_github.""" from code.tools.github import list_github_url_examples result = list_github_url_examples() yield json.dumps(result, default=str) @app.api(name="push_github", concurrency_limit=1) def handle_push_github( repo_name: str, github_token: str, username: str, branch: str = "main", commit_message: str = "", timeout: str = "120", ) -> str: """Push the current workspace to a GitHub repo. Requires only 3 user inputs (repo_name, github_token, username) plus optional branch / commit_message / timeout. The workspace is snapshotted (via `snapshot_workspace`), written into a fresh git repo in a temp dir, committed, and pushed to `https://github.com//.git` using HTTPS basic auth with the token. The push uses `--force-with-lease` so it replaces the remote tip with the SoniCoder workspace contents. If the remote doesn't exist yet (no refs to lease against), it retries with a plain push. Yields ------ JSON dict with keys: success, message, repo_full_name, branch, commit_sha, commit_url, repo_url, files_pushed, error (on failure). """ from code.tools.github import push_to_github try: timeout_int = int(timeout) if str(timeout).strip() else 120 timeout_int = max(10, min(600, timeout_int)) except (ValueError, TypeError): timeout_int = 120 result = push_to_github( repo_name=repo_name, github_token=github_token, username=username, branch=branch or "main", commit_message=commit_message or "", timeout=timeout_int, ) yield json.dumps(result, default=str)