Spaces:
Running
Running
| import json | |
| import os | |
| import subprocess | |
| import threading | |
| from pathlib import Path | |
| from typing import Any, Dict | |
| from uuid import uuid4 | |
| import gradio as gr | |
| from fastapi import FastAPI, Request, HTTPException | |
| from fastapi.responses import JSONResponse | |
| # Import our coder agent | |
| from app.coder_agent import CloudCoderAgent | |
| ROOT = Path(__file__).resolve().parent | |
| AGENT_META_PATH = ROOT / "agent.json" | |
| # Global agent instance | |
| _agent: CloudCoderAgent | None = None | |
| _tasks: dict[str, dict[str, Any]] = {} | |
| _tasks_lock = threading.Lock() | |
| def _load_agent_meta() -> dict[str, Any]: | |
| try: | |
| return json.loads(AGENT_META_PATH.read_text(encoding="utf-8")) | |
| except Exception: | |
| return {"id": "a2a-sin-code-unknown", "name": "A2A-SIN-Code Agent", "version": "1.0.0"} | |
| def _build_agent_card(base_url: str) -> dict[str, Any]: | |
| meta = _load_agent_meta() | |
| meta = dict(meta) | |
| meta["endpoints"] = dict(meta.get("endpoints") or {}) | |
| meta["endpoints"]["gradio_ui"] = base_url | |
| meta["endpoints"]["a2a_api"] = f"{base_url}/a2a/v1" | |
| meta["endpoints"]["agent_card"] = f"{base_url}/.well-known/agent-card.json" | |
| return meta | |
| def _ensure_agent() -> CloudCoderAgent: | |
| global _agent | |
| if _agent is None: | |
| _agent = CloudCoderAgent() | |
| return _agent | |
| def _public_base_url(request: Request) -> str: | |
| """Extract the public base URL from the request.""" | |
| base_url = str(request.base_url) | |
| if base_url.endswith("/"): | |
| base_url = base_url[:-1] | |
| return base_url | |
| # A2A JSON-RPC 2.0 endpoint | |
| async def handle_a2a_message(request: Request) -> JSONResponse: | |
| """Handle A2A JSON-RPC 2.0 message/send requests.""" | |
| try: | |
| data = await request.json() | |
| except Exception: | |
| raise HTTPException(status_code=400, detail="Invalid JSON") | |
| # Validate JSON-RPC structure | |
| if data.get("jsonrpc") != "2.0": | |
| raise HTTPException(status_code=400, detail="Invalid JSON-RPC version") | |
| method = data.get("method") | |
| params = data.get("params", {}) | |
| msg_id = data.get("id") | |
| if method != "message/send": | |
| raise HTTPException(status_code=400, detail=f"Unsupported method: {method}") | |
| message = params.get("message", {}) | |
| role = message.get("role") | |
| parts = message.get("parts", []) | |
| if role != "user": | |
| raise HTTPException(status_code=400, detail="Only user messages supported") | |
| # Extract task data from parts | |
| task_data = None | |
| for part in parts: | |
| if part.get("type") == "data": | |
| task_data = part.get("data", {}) | |
| break | |
| elif part.get("type") == "text": | |
| # Plain text request | |
| task_data = {"description": part.get("text", "")} | |
| break | |
| if not task_data: | |
| raise HTTPException(status_code=400, detail="No valid task data found") | |
| # Get agent and queue task | |
| agent = _ensure_agent() | |
| task_id = str(uuid4()) | |
| with _tasks_lock: | |
| _tasks[task_id] = {"status": "queued", "data": task_data, "result": None, "error": None} | |
| # Process task async | |
| thread = threading.Thread( | |
| target=agent.process_task, args=(task_id, task_data, _tasks), daemon=True | |
| ) | |
| thread.start() | |
| # Return immediate response with task ID | |
| return JSONResponse( | |
| {"jsonrpc": "2.0", "id": msg_id, "result": {"taskId": task_id, "status": "queued"}} | |
| ) | |
| # Gradio interface functions | |
| def get_status() -> str: | |
| """Get current agent status for Gradio UI.""" | |
| agent = _ensure_agent() | |
| status = agent.get_status() | |
| task_count = len([t for t in _tasks.values() if t["status"] == "running"]) | |
| return f"""**Agent Status:** {status} | |
| **Queued Tasks:** {task_count} | |
| **Total Tasks:** {len(_tasks)} | |
| **Recent Tasks:** | |
| {_format_recent_tasks()} | |
| """ | |
| def _format_recent_tasks(limit: int = 5) -> str: | |
| recent = sorted(_tasks.items(), key=lambda x: x[1].get("timestamp", ""), reverse=True)[:limit] | |
| lines = [] | |
| for task_id, task in recent: | |
| status = task["status"] | |
| desc = task["data"].get("description", "No description")[:50] | |
| lines.append(f"- `{task_id[:8]}`: {status} - {desc}...") | |
| return "\n".join(lines) if lines else "No tasks yet" | |
| def submit_task(description: str, task_type: str, target_branch: str) -> str: | |
| """Submit a new task via Gradio UI.""" | |
| agent = _ensure_agent() | |
| task_id = str(uuid4()) | |
| task_data = { | |
| "description": description, | |
| "type": task_type, | |
| "target_branch": target_branch, | |
| "source": "gradio_ui", | |
| } | |
| with _tasks_lock: | |
| _tasks[task_id] = { | |
| "status": "queued", | |
| "data": task_data, | |
| "result": None, | |
| "error": None, | |
| "timestamp": str(uuid4().hex), # simplified timestamp | |
| } | |
| thread = threading.Thread( | |
| target=agent.process_task, args=(task_id, task_data, _tasks), daemon=True | |
| ) | |
| thread.start() | |
| return f"Task submitted! Task ID: `{task_id}`\nCheck status with 'Get Status' button." | |
| def get_task_log(task_id: str) -> str: | |
| """Get detailed log for a specific task.""" | |
| task = _tasks.get(task_id) | |
| if not task: | |
| return f"Task ID `{task_id}` not found." | |
| result = task.get("result", {}) | |
| error = task.get("error") | |
| status = task["status"] | |
| output = f"""**Task ID:** {task_id} | |
| **Status:** {status} | |
| **Description:** {task["data"].get("description", "N/A")} | |
| **Result:** | |
| ```json | |
| {json.dumps(result, indent=2) if result else "No result yet"} | |
| ``` | |
| """ | |
| if error: | |
| output += f"\n**Error:**\n```\n{error}\n```" | |
| return output | |
| # Build Gradio UI | |
| def build_gradio_ui() -> gr.Blocks: | |
| with gr.Blocks(title="A2A Cloud Coder", theme=gr.themes.Soft()) as demo: | |
| gr.Markdown("# π€ A2A Cloud Coder Agent") | |
| gr.Markdown( | |
| "This agent receives tasks via A2A protocol and executes them using the opencode CLI." | |
| ) | |
| with gr.Row(): | |
| with gr.Column(): | |
| status_btn = gr.Button("π Get Status") | |
| status_out = gr.Markdown("") | |
| gr.Markdown("### Submit Manual Task") | |
| desc_input = gr.Textbox( | |
| label="Task Description", | |
| placeholder="e.g., Create a new plugin for...", | |
| lines=3, | |
| ) | |
| type_select = gr.Dropdown( | |
| choices=["plugin", "command", "tool", "backend", "fullstack", "frontend"], | |
| label="Task Type", | |
| value="plugin", | |
| ) | |
| branch_input = gr.Textbox( | |
| label="Target Branch", placeholder="e.g., feat/issue-1065-auto-fix" | |
| ) | |
| submit_btn = gr.Button("π Submit Task") | |
| submit_out = gr.Markdown("") | |
| gr.Markdown("### Task Log") | |
| task_id_input = gr.Textbox(label="Task ID", placeholder="Enter task ID to view log") | |
| log_btn = gr.Button("π View Log") | |
| log_out = gr.Markdown("") | |
| # Wire up events | |
| status_btn.click(fn=get_status, outputs=status_out) | |
| submit_btn.click( | |
| fn=submit_task, inputs=[desc_input, type_select, branch_input], outputs=submit_out | |
| ) | |
| log_btn.click(fn=get_task_log, inputs=task_id_input, outputs=log_out) | |
| return demo | |
| # FastAPI app | |
| app = FastAPI(title="A2A Cloud Coder Agent") | |
| # A2A endpoint | |
| app.post("/a2a/v1")(handle_a2a_message) | |
| # Health check (HF expects 200 on /) | |
| async def health() -> JSONResponse: | |
| agent = _ensure_agent() | |
| return JSONResponse( | |
| { | |
| "status": "healthy" if agent.is_healthy() else "unhealthy", | |
| "agent": agent.get_status(), | |
| "tasks": len(_tasks), | |
| } | |
| ) | |
| # Agent card endpoint | |
| async def agent_card(request: Request) -> JSONResponse: | |
| base_url = _public_base_url(request) | |
| card = _build_agent_card(base_url) | |
| return JSONResponse(card) | |
| async def agent_json() -> JSONResponse: | |
| meta = _load_agent_meta() | |
| return JSONResponse(meta) | |
| async def oauth_client() -> JSONResponse: | |
| # CIMD/SEP-991 identity - for now return minimal | |
| return JSONResponse( | |
| { | |
| "client_id": os.getenv("OAUTH_CLIENT_ID", "a2a-sin-code-agent"), | |
| "redirect_uris": ["https://delqhi-a2a-sin-code-agent.hf.space/auth/callback"], | |
| "grant_types": ["authorization_code", "refresh_token"], | |
| "token_endpoint_auth_method": "client_secret_post", | |
| } | |
| ) | |
| # Build Gradio UI and mount under /ui | |
| gradio_app = build_gradio_ui() | |
| app.mount("/ui", gradio_app.app) | |
| # Root endpoint - simple redirect/info | |
| async def root_info() -> JSONResponse: | |
| return JSONResponse( | |
| { | |
| "service": "A2A Cloud Coder", | |
| "status": "running", | |
| "endpoints": { | |
| "health": "/health", | |
| "a2a_api": "/a2a/v1", | |
| "agent_card": "/.well-known/agent-card.json", | |
| "ui": "/ui", | |
| }, | |
| } | |
| ) | |
| if __name__ == "__main__": | |
| import uvicorn | |
| print("π Starting uvicorn on port", int(os.getenv("PORT", 7860))) | |
| uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PORT", 7860))) | |