Spaces:
Running
Running
File size: 11,195 Bytes
8e2543b | 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 | """
Cloud Coder Agent - Uses opencode CLI exclusively for code generation.
This agent receives A2A tasks, uses the opencode CLI to generate code,
commits to GitHub, and optionally creates PRs.
"""
import json
import os
import subprocess
import sys
from pathlib import Path
from typing import Any, Dict, Optional
from app.github_ops import GitHubOps
from app.prompt_templates import get_agent_prompt
from app.utils.logger import logger
# Configure paths - will be set by HF Space or environment
REPO_DIR = Path(os.getenv("REPO_DIR", "/workspace/OpenSIN-Code"))
if not REPO_DIR.exists():
# Try fallback paths
for path in [
"/home/user/project/OpenSIN-Code",
"/home/user/OpenSIN-Code",
Path.cwd().parent / "OpenSIN-Code",
]:
if Path(path).exists():
REPO_DIR = Path(path)
break
class CloudCoderAgent:
"""Autonomous cloud coder using opencode CLI."""
def __init__(self):
self.status = "idle"
self.current_task: Optional[str] = None
self.agent_type = self._detect_agent_type()
def _detect_agent_type(self) -> str:
"""Detect agent type from agent.json."""
try:
meta = json.loads((ROOT / "agent.json").read_text())
agent_id = meta.get("id", "")
# Extract type from id: a2a-sin-code-{type}
if "a2a-sin-code-" in agent_id:
return agent_id.split("a2a-sin-code-")[-1]
except Exception:
pass
return "backend" # default fallback
def get_status(self) -> str:
"""Return human-readable status."""
if self.current_task:
return f"working on {self.current_task[:20]}..."
return "idle"
def is_healthy(self) -> bool:
"""Check if agent is healthy."""
# Basic health: can we run opencode?
try:
result = subprocess.run(
["opencode", "--version"], capture_output=True, text=True, timeout=5
)
return result.returncode == 0
except Exception:
return False
def process_task(self, task_id: str, task_data: Dict[str, Any], tasks_store: Dict):
"""Process a single A2A task."""
try:
with tasks_store._lock if hasattr(tasks_store, "_lock") else None:
tasks_store[task_id]["status"] = "running"
tasks_store[task_id]["timestamp"] = str(
subprocess.check_output(["date"]).decode().strip()
)
self.status = "processing"
self.current_task = task_data.get("description", "")[:50]
# Extract task parameters
description = task_data.get("description", "")
task_type = task_data.get("type", self.agent_type) # Use detected type as fallback
target_branch = task_data.get("target_branch")
issue_number = task_data.get("issue_number")
# Generate prompt using template
prompt = get_agent_prompt(task_type, description, issue_number)
# Call opencode CLI
result = self._call_opencode(prompt)
# Commit changes
commit_msg = (
f"Fix #{issue_number}: {description[:50]}"
if issue_number
else f"Auto-fix: {description[:50]}"
)
commit_sha = self._commit_changes(commit_msg)
# Push branch if requested
if target_branch:
gh = GitHubOps(REPO_DIR)
if gh.branch_exists_on_remote(target_branch):
logger.info("Branch already exists on remote", branch=target_branch)
else:
push_success = gh.push_branch(target_branch)
logger.info("Pushed branch", branch=target_branch, success=push_success)
# Update task result
with tasks_store._lock if hasattr(tasks_store, "_lock") else None:
tasks_store[task_id]["status"] = "completed"
tasks_store[task_id]["result"] = {
"commit_sha": commit_sha,
"branch": target_branch,
"summary": result.get("summary", ""),
"agent_type": self.agent_type,
}
logger.task(task_id, "completed", agent_type=self.agent_type)
self.status = "idle"
self.current_task = None
except Exception as e:
logger.error("Task failed", task_id=task_id, error=str(e))
with tasks_store._lock if hasattr(tasks_store, "_lock") else None:
tasks_store[task_id]["status"] = "failed"
tasks_store[task_id]["error"] = str(e)
self.status = "idle"
self.current_task = None
except Exception as e:
with tasks_store._lock if hasattr(tasks_store, "_lock") else None:
tasks_store[task_id]["status"] = "failed"
tasks_store[task_id]["error"] = str(e)
self.status = "idle"
self.current_task = None
def _build_prompt(self, description: str, task_type: str, issue_number: Optional[int]) -> str:
"""Build a detailed prompt for opencode CLI."""
base_prompt = f"""You are an expert {task_type} developer working on the OpenSIN-Code project.
TASK: {description}
Issue Number: {issue_number if issue_number else "Not specified"}
CRITICAL RULES:
1. Use ONLY opencode CLI for any tool/API calls. Never use direct git commands or external APIs.
2. Make minimal, surgical changes - fix the exact issue without refactoring unrelated code.
3. Follow existing code style and patterns in the repository.
4. Write or update tests if they are missing for the fix.
5. Ensure all imports are correct and dependencies are in package.json requirements.
6. If the fix requires new dependencies, add them to the appropriate files.
7. Do NOT create new branches - work on the existing branch.
8. After making changes, EXPLICITLY run tests if a test suite exists.
9. Output format: First a brief summary of what you changed, then the exact commands you ran (with --format json), then the results.
WORKFLOW:
1. First, explore the codebase to understand the structure.
2. Locate the files that need to be changed.
3. Apply the fix.
4. Run tests (if available).
5. Provide a summary of changes.
Begin by exploring the repository structure."""
# Add type-specific guidance
if task_type == "plugin":
base_prompt += "\n\nPLUGIN SPECIFIC: Ensure the plugin follows the OpenCode plugin architecture with proper manifest.json, entry points, and A2A integration."
elif task_type == "command":
base_prompt += "\n\nCOMMAND SPECIFIC: Implement CLI commands following the OpenCode command pattern with proper argument parsing and help text."
elif task_type == "tool":
base_prompt += "\n\nTOOL SPECIFIC: Create reusable tool functions with clear docstrings, type hints, and error handling."
elif task_type == "backend":
base_prompt += "\n\nBACKEND SPECIFIC: Focus on API endpoints, database models, business logic, and integrations."
elif task_type == "frontend":
base_prompt += "\n\nFRONTEND SPECIFIC: Implement UI components with proper accessibility, responsive design, and modern patterns."
elif task_type == "fullstack":
base_prompt += "\n\nFULLSTACK SPECIFIC: Ensure both frontend and backend are consistent; update API contracts and UI together."
return base_prompt
def _call_opencode(self, prompt: str) -> Dict[str, Any]:
"""Call opencode CLI with JSON-RPC streaming."""
try:
# Run opencode with format=json to get structured output
proc = subprocess.run(
[
"opencode",
"run",
prompt,
"--format",
"json",
"--fallback",
"opencode/minimax-m2.5-free",
],
capture_output=True,
text=True,
timeout=300, # 5 minute timeout
cwd=str(REPO_DIR),
)
if proc.returncode != 0:
raise RuntimeError(f"opencode failed: {proc.stderr}")
# Parse streaming JSON responses
result_parts = []
commit_sha = None
files_changed = []
for line in proc.stdout.splitlines():
try:
event = json.loads(line)
if event.get("type") == "text":
content = event.get("part", {}).get("text", "")
result_parts.append(content)
elif event.get("type") == "tool_call":
# Track tool calls (e.g., shell commands)
tool_name = event.get("tool_name", "")
args = event.get("args", {})
if tool_name == "execute_shell_command":
cmd = args.get("cmd", "")
if "git commit" in cmd and commit_sha is None:
# Extract commit SHA if available
pass
except json.JSONDecodeError:
continue
summary = "\n".join(result_parts)
return {
"summary": summary,
"commit_sha": commit_sha,
"files_changed": files_changed,
"raw_output": proc.stdout,
}
except subprocess.TimeoutExpired:
raise RuntimeError("opencode call timed out after 5 minutes")
except Exception as e:
raise RuntimeError(f"opencode execution failed: {e}")
def _commit_changes(self, commit_msg: str) -> str:
"""Commit any pending changes to the repository."""
try:
# Check git status
subprocess.run(
["git", "status", "--porcelain"],
capture_output=True,
text=True,
cwd=str(REPO_DIR),
check=True,
)
# Add all changes
subprocess.run(
["git", "add", "."], capture_output=True, text=True, cwd=str(REPO_DIR), check=True
)
# Commit
result = subprocess.run(
["git", "commit", "-m", commit_msg],
capture_output=True,
text=True,
cwd=str(REPO_DIR),
)
if result.returncode == 0:
# Extract commit SHA
sha = result.stdout.strip().split()[-1] if result.stdout else "unknown"
return sha
else:
# No changes to commit?
if "nothing to commit" in result.stderr.lower():
return None
raise RuntimeError(f"git commit failed: {result.stderr}")
except subprocess.CalledProcessError as e:
raise RuntimeError(f"git operation failed: {e}")
|