SIN-Deploy-Bot
feat: mount Gradio UI at /ui instead of root
ee92f4d
Raw
History Blame Contribute Delete
9.38 kB
import json
import os
import subprocess
import threading
from pathlib import Path
from typing import Any, Dict
from uuid import uuid4
import gradio as gr
from fastapi import FastAPI, Request, HTTPException
from fastapi.responses import JSONResponse
# Import our coder agent
from app.coder_agent import CloudCoderAgent
ROOT = Path(__file__).resolve().parent
AGENT_META_PATH = ROOT / "agent.json"
# Global agent instance
_agent: CloudCoderAgent | None = None
_tasks: dict[str, dict[str, Any]] = {}
_tasks_lock = threading.Lock()
def _load_agent_meta() -> dict[str, Any]:
try:
return json.loads(AGENT_META_PATH.read_text(encoding="utf-8"))
except Exception:
return {"id": "a2a-sin-code-unknown", "name": "A2A-SIN-Code Agent", "version": "1.0.0"}
def _build_agent_card(base_url: str) -> dict[str, Any]:
meta = _load_agent_meta()
meta = dict(meta)
meta["endpoints"] = dict(meta.get("endpoints") or {})
meta["endpoints"]["gradio_ui"] = base_url
meta["endpoints"]["a2a_api"] = f"{base_url}/a2a/v1"
meta["endpoints"]["agent_card"] = f"{base_url}/.well-known/agent-card.json"
return meta
def _ensure_agent() -> CloudCoderAgent:
global _agent
if _agent is None:
_agent = CloudCoderAgent()
return _agent
def _public_base_url(request: Request) -> str:
"""Extract the public base URL from the request."""
base_url = str(request.base_url)
if base_url.endswith("/"):
base_url = base_url[:-1]
return base_url
# A2A JSON-RPC 2.0 endpoint
async def handle_a2a_message(request: Request) -> JSONResponse:
"""Handle A2A JSON-RPC 2.0 message/send requests."""
try:
data = await request.json()
except Exception:
raise HTTPException(status_code=400, detail="Invalid JSON")
# Validate JSON-RPC structure
if data.get("jsonrpc") != "2.0":
raise HTTPException(status_code=400, detail="Invalid JSON-RPC version")
method = data.get("method")
params = data.get("params", {})
msg_id = data.get("id")
if method != "message/send":
raise HTTPException(status_code=400, detail=f"Unsupported method: {method}")
message = params.get("message", {})
role = message.get("role")
parts = message.get("parts", [])
if role != "user":
raise HTTPException(status_code=400, detail="Only user messages supported")
# Extract task data from parts
task_data = None
for part in parts:
if part.get("type") == "data":
task_data = part.get("data", {})
break
elif part.get("type") == "text":
# Plain text request
task_data = {"description": part.get("text", "")}
break
if not task_data:
raise HTTPException(status_code=400, detail="No valid task data found")
# Get agent and queue task
agent = _ensure_agent()
task_id = str(uuid4())
with _tasks_lock:
_tasks[task_id] = {"status": "queued", "data": task_data, "result": None, "error": None}
# Process task async
thread = threading.Thread(
target=agent.process_task, args=(task_id, task_data, _tasks), daemon=True
)
thread.start()
# Return immediate response with task ID
return JSONResponse(
{"jsonrpc": "2.0", "id": msg_id, "result": {"taskId": task_id, "status": "queued"}}
)
# Gradio interface functions
def get_status() -> str:
"""Get current agent status for Gradio UI."""
agent = _ensure_agent()
status = agent.get_status()
task_count = len([t for t in _tasks.values() if t["status"] == "running"])
return f"""**Agent Status:** {status}
**Queued Tasks:** {task_count}
**Total Tasks:** {len(_tasks)}
**Recent Tasks:**
{_format_recent_tasks()}
"""
def _format_recent_tasks(limit: int = 5) -> str:
recent = sorted(_tasks.items(), key=lambda x: x[1].get("timestamp", ""), reverse=True)[:limit]
lines = []
for task_id, task in recent:
status = task["status"]
desc = task["data"].get("description", "No description")[:50]
lines.append(f"- `{task_id[:8]}`: {status} - {desc}...")
return "\n".join(lines) if lines else "No tasks yet"
def submit_task(description: str, task_type: str, target_branch: str) -> str:
"""Submit a new task via Gradio UI."""
agent = _ensure_agent()
task_id = str(uuid4())
task_data = {
"description": description,
"type": task_type,
"target_branch": target_branch,
"source": "gradio_ui",
}
with _tasks_lock:
_tasks[task_id] = {
"status": "queued",
"data": task_data,
"result": None,
"error": None,
"timestamp": str(uuid4().hex), # simplified timestamp
}
thread = threading.Thread(
target=agent.process_task, args=(task_id, task_data, _tasks), daemon=True
)
thread.start()
return f"Task submitted! Task ID: `{task_id}`\nCheck status with 'Get Status' button."
def get_task_log(task_id: str) -> str:
"""Get detailed log for a specific task."""
task = _tasks.get(task_id)
if not task:
return f"Task ID `{task_id}` not found."
result = task.get("result", {})
error = task.get("error")
status = task["status"]
output = f"""**Task ID:** {task_id}
**Status:** {status}
**Description:** {task["data"].get("description", "N/A")}
**Result:**
```json
{json.dumps(result, indent=2) if result else "No result yet"}
```
"""
if error:
output += f"\n**Error:**\n```\n{error}\n```"
return output
# Build Gradio UI
def build_gradio_ui() -> gr.Blocks:
with gr.Blocks(title="A2A Cloud Coder", theme=gr.themes.Soft()) as demo:
gr.Markdown("# πŸ€– A2A Cloud Coder Agent")
gr.Markdown(
"This agent receives tasks via A2A protocol and executes them using the opencode CLI."
)
with gr.Row():
with gr.Column():
status_btn = gr.Button("πŸ”„ Get Status")
status_out = gr.Markdown("")
gr.Markdown("### Submit Manual Task")
desc_input = gr.Textbox(
label="Task Description",
placeholder="e.g., Create a new plugin for...",
lines=3,
)
type_select = gr.Dropdown(
choices=["plugin", "command", "tool", "backend", "fullstack", "frontend"],
label="Task Type",
value="plugin",
)
branch_input = gr.Textbox(
label="Target Branch", placeholder="e.g., feat/issue-1065-auto-fix"
)
submit_btn = gr.Button("πŸš€ Submit Task")
submit_out = gr.Markdown("")
gr.Markdown("### Task Log")
task_id_input = gr.Textbox(label="Task ID", placeholder="Enter task ID to view log")
log_btn = gr.Button("πŸ“‹ View Log")
log_out = gr.Markdown("")
# Wire up events
status_btn.click(fn=get_status, outputs=status_out)
submit_btn.click(
fn=submit_task, inputs=[desc_input, type_select, branch_input], outputs=submit_out
)
log_btn.click(fn=get_task_log, inputs=task_id_input, outputs=log_out)
return demo
# FastAPI app
app = FastAPI(title="A2A Cloud Coder Agent")
# A2A endpoint
app.post("/a2a/v1")(handle_a2a_message)
# Health check (HF expects 200 on /)
@app.get("/health")
@app.get("/healthz")
async def health() -> JSONResponse:
agent = _ensure_agent()
return JSONResponse(
{
"status": "healthy" if agent.is_healthy() else "unhealthy",
"agent": agent.get_status(),
"tasks": len(_tasks),
}
)
# Agent card endpoint
@app.get("/.well-known/agent-card.json")
async def agent_card(request: Request) -> JSONResponse:
base_url = _public_base_url(request)
card = _build_agent_card(base_url)
return JSONResponse(card)
@app.get("/.well-known/agent.json")
async def agent_json() -> JSONResponse:
meta = _load_agent_meta()
return JSONResponse(meta)
@app.get("/.well-known/oauth-client.json")
async def oauth_client() -> JSONResponse:
# CIMD/SEP-991 identity - for now return minimal
return JSONResponse(
{
"client_id": os.getenv("OAUTH_CLIENT_ID", "a2a-sin-code-agent"),
"redirect_uris": ["https://delqhi-a2a-sin-code-agent.hf.space/auth/callback"],
"grant_types": ["authorization_code", "refresh_token"],
"token_endpoint_auth_method": "client_secret_post",
}
)
# Build Gradio UI and mount under /ui
gradio_app = build_gradio_ui()
app.mount("/ui", gradio_app.app)
# Root endpoint - simple redirect/info
@app.get("/")
async def root_info() -> JSONResponse:
return JSONResponse(
{
"service": "A2A Cloud Coder",
"status": "running",
"endpoints": {
"health": "/health",
"a2a_api": "/a2a/v1",
"agent_card": "/.well-known/agent-card.json",
"ui": "/ui",
},
}
)
if __name__ == "__main__":
import uvicorn
print("πŸš€ Starting uvicorn on port", int(os.getenv("PORT", 7860)))
uvicorn.run(app, host="0.0.0.0", port=int(os.getenv("PORT", 7860)))