import json import subprocess from datetime import datetime from pathlib import Path from typing import Any, Dict from uuid import uuid4 import gradio as gr from fastapi import FastAPI, Request from fastapi.responses import HTMLResponse, JSONResponse import uvicorn ROOT = Path(__file__).resolve().parent META_PATH = ROOT / "agent.json" def load_meta() -> Dict[str, Any]: try: return json.loads(META_PATH.read_text(encoding="utf-8")) except Exception: return {"id": "sin-code-base", "name": "A2A SIN-Code Base", "version": "1.0.0"} def build_card(base_url: str) -> Dict[str, Any]: meta = load_meta() meta = dict(meta) meta["endpoints"] = dict(meta.get("endpoints") or {}) meta["endpoints"]["gradio_ui"] = base_url meta["endpoints"]["a2a_api"] = f"{base_url}/a2a/v1" meta["endpoints"]["agent_card"] = f"{base_url}/.well-known/agent-card.json" return meta status = {"running": False, "last_task": None} def call_opencode(prompt: str, timeout: int = 300) -> str: try: result = subprocess.run( ["opencode", "run", prompt, "--format", "json"], capture_output=True, text=True, timeout=timeout, ) parts = [] for line in result.stdout.splitlines(): try: ev = json.loads(line) if ev.get("type") == "text": parts.append(ev.get("part", {}).get("text", "")) except json.JSONDecodeError: pass return "".join(parts).strip() except subprocess.TimeoutExpired: return "Error: opencode call timed out" except Exception as e: return f"Error: {str(e)}" app = FastAPI() @app.get("/health") async def health() -> Dict[str, Any]: meta = load_meta() return { "ok": True, "agent": meta.get("id"), "running": status["running"], "version": meta.get("version"), } @app.get("/", response_class=HTMLResponse) async def root() -> str: return "
See UI
" @app.get("/.well-known/agent-card.json") @app.get("/.well-known/agent.json") async def agent_card(request: Request) -> JSONResponse: base_url = f"{'https' if request.url.scheme == 'https' else 'http'}://{request.headers.get('host', '')}" return JSONResponse(build_card(base_url)) @app.post("/a2a/v1") async def a2a(request: Request) -> JSONResponse: rpc = await request.json() rpc_id = rpc.get("id") method = rpc.get("method") if rpc.get("jsonrpc") != "2.0" or not isinstance(method, str): return JSONResponse( { "jsonrpc": "2.0", "id": rpc_id, "error": {"code": -32600, "message": "invalid_request"}, }, status_code=400, ) if method == "agent/getCard": base_url = f"{'https' if request.url.scheme == 'https' else 'http'}://{request.headers.get('host', '')}" return JSONResponse({"jsonrpc": "2.0", "id": rpc_id, "result": build_card(base_url)}) if method == "message/send": params = rpc.get("params", {}) message = params.get("message", {}) parts = message.get("parts", []) data = None for part in parts: if part.get("type") == "data": data = part.get("data", {}) break action = data.get("action", "agent.help") if isinstance(data, dict) else "agent.help" if action == "agent.help": result = {"help": "Specialized coding agent", "skills": load_meta().get("skills", [])} elif action.startswith("sin.code."): prompt = f"You are {load_meta().get('name')}. {data.get('description', '')}" generated = call_opencode(prompt) result = {"generated_code": generated, "action": action, "status": "completed"} else: result = {"error": f"Unknown action: {action}"} task = { "id": str(uuid4()), "kind": "task", "status": { "state": "completed", "timestamp": datetime.utcnow().isoformat() + "Z", "message": {"role": "agent", "parts": [{"type": "text", "text": "completed"}]}, }, "artifacts": [ { "id": str(uuid4()), "name": "result", "description": "Task result", "parts": [{"type": "data", "data": result}], } ], "metadata": {"action": action}, } return JSONResponse({"jsonrpc": "2.0", "id": rpc_id, "result": task}) return JSONResponse( {"jsonrpc": "2.0", "id": rpc_id, "error": {"code": -32601, "message": "method_not_found"}}, status_code=404, ) demo = gr.Blocks(title="A2A SIN-Code Agent") with demo: meta = load_meta() gr.Markdown(f"# {meta.get('name', 'A2A SIN-Code Agent')}") gr.Textbox(label="Agent ID", value=meta.get("id"), interactive=False) gr.Textbox(label="Status", value="Running", interactive=False) app = gr.mount_gradio_app(app, demo, path="/ui") if __name__ == "__main__": uvicorn.run(app, host="0.0.0.0", port=7860)